from __future__ import absolute_import
from interface_meta import override
from omniduct.utils.debug import logger
from .base import DatabaseClient
[docs]class DruidClient(DatabaseClient):
"""
This Duct connects to a Druid server using the `pydruid` python library.
"""
PROTOCOLS = ['druid']
DEFAULT_PORT = 80
NAMESPACE_NAMES = ['table']
NAMESPACE_QUOTECHAR = '"'
NAMESPACE_SEPARATOR = '.'
@override
def _init(self):
self.__druid = None
# Connection
@override
def _connect(self):
from pydruid.db import connect
logger.info('Connecting to Druid database ...')
self.__druid = connect(self.host, self.port, path='/druid/v2/sql/', scheme='http')
if self.username or self.password:
logger.warning(
'Duct username and password not passed to pydruid connection. '
'pydruid connection currently does not allow these fields to be passed.'
)
@override
def _is_connected(self):
return self.__druid is not None
@override
def _disconnect(self):
logger.info('Disconnecting from Druid database ...')
try:
self.__druid.close()
except Exception:
pass
self.__druid = None
# Querying
@override
def _execute(self, statement, cursor, wait, session_properties):
cursor = cursor or self.__druid.cursor()
cursor.execute(statement)
return cursor
@override
def _table_list(self, namespace, like=None, **kwargs):
cmd = "SELECT * FROM INFORMATION_SCHEMA.TABLES"
return self.query(cmd, **kwargs)
@override
def _table_exists(self, table, **kwargs):
logger.disabled = True
try:
self.table_desc(table, **kwargs)
return True
except:
return False
finally:
logger.disabled = False
@override
def _table_drop(self, table, **kwargs):
raise NotImplementedError
@override
def _table_desc(self, table, **kwargs):
query = ("""
SELECT
TABLE_SCHEMA
, TABLE_NAME
, COLUMN_NAME
, ORDINAL_POSITION
, COLUMN_DEFAULT
, IS_NULLABLE
, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{}'""").format(table)
return self.query(query, **kwargs)
@override
def _table_head(self, table, n=10, **kwargs):
return self.query("SELECT * FROM {} LIMIT {}".format(table, n), **kwargs)
@override
def _table_props(self, table, **kwargs):
raise NotImplementedError