from __future__ import absolute_import
from interface_meta import override
from omniduct.utils.debug import logger
from .base import DatabaseClient
from ._schemas import SchemasMixin
from . import _pandas
[docs]class SQLAlchemyClient(DatabaseClient, SchemasMixin):
"""
This Duct connects to several different databases using one of several
SQLAlchemy drivers. In general, these are provided for their potential
utility, but will be less functional than the specially crafted database
clients.
"""
PROTOCOLS = ['sqlalchemy', 'firebird', 'mssql', 'mysql', 'oracle', 'postgresql', 'sybase', 'snowflake']
NAMESPACE_NAMES = ['database', 'table']
NAMESPACE_QUOTECHAR = '"' # TODO: Apply overrides depending on protocol?
NAMESPACE_SEPARATOR = '.'
@property
@override
def NAMESPACE_DEFAULT(self):
return {
'database': self.database
}
@override
def _init(self, dialect=None, driver=None, database='', engine_opts=None):
assert self._port is not None, "Omniduct requires SQLAlchemy databases to manually specify a port, as " \
"it will often be the case that ports are being forwarded."
if self.protocol != 'sqlalchemy':
self.dialect = self.protocol
else:
self.dialect = dialect
assert self.dialect is not None, "Dialect not specified."
self.driver = driver
self.database = database
self.connection_fields += ('schema',)
self.engine_opts = engine_opts or {}
self.engine = None
self.connection = None
@property
def db_uri(self):
return '{dialect}://{login}@{host_port}/{database}'.format(
dialect=self.dialect + ("+{}".format(self.driver) if self.driver else ''),
login=self.username + (":{}".format(self.password) if self.password else ''),
host_port=self.host + (":{}".format(self.port) if self.port else ''),
database=self.database
)
@override
def _connect(self):
import sqlalchemy
if self.protocol not in ['mysql']:
logger.warning("While querying and executing should work as "
"expected, some operations on this database client "
"(such as listing tables, querying to tables, etc) "
"may not function as expected due to the backend "
"not supporting ANSI SQL.")
self.engine = sqlalchemy.create_engine(self.db_uri, **self.engine_opts)
self._sqlalchemy_metadata = sqlalchemy.MetaData(self.engine)
@override
def _is_connected(self):
return self.engine is not None
@override
def _disconnect(self):
self.engine = None
self._sqlalchemy_metadata = None
self._schemas = None
@override
def _execute(self, statement, cursor, wait, session_properties, query=True, **kwargs):
assert wait, "`SQLAlchemyClient` does not support asynchronous operations."
if cursor:
cursor.execute(statement)
else:
cursor = self.engine.execute(statement).cursor
return cursor
@override
def _query_to_table(self, statement, table, if_exists, **kwargs):
statements = []
if if_exists == 'fail' and self.table_exists(table):
raise RuntimeError("Table {} already exists!".format(table))
elif if_exists == 'replace':
statements.append('DROP TABLE IF EXISTS {};'.format(table))
elif if_exists == 'append':
raise NotImplementedError("Append operations have not been implemented for {}.".format(self.__class__.__name__))
statement = "CREATE TABLE {table} AS ({statement})".format(
table=table,
statement=statement
)
return self.execute(statement, **kwargs)
@override
def _dataframe_to_table(self, df, table, if_exists='fail', **kwargs):
return _pandas.to_sql(
df=df, name=table.table, schema=table.database, con=self.engine,
index=False, if_exists=if_exists, **kwargs
)
@override
def _table_list(self, namespace, **kwargs):
return self.query("SHOW TABLES IN {}".format(namespace), **kwargs)
@override
def _table_exists(self, table, **kwargs):
logger.disabled = True
try:
self.table_desc(table, **kwargs)
return True
except:
return False
finally:
logger.disabled = False
@override
def _table_drop(self, table, **kwargs):
return self.execute("DROP TABLE {table}".format(table=table))
@override
def _table_desc(self, table, **kwargs):
return self.query("DESCRIBE {0}".format(table), **kwargs)
@override
def _table_head(self, table, n=10, **kwargs):
return self.query("SELECT * FROM {} LIMIT {}".format(table, n), **kwargs)
@override
def _table_props(self, table, **kwargs):
raise NotImplementedError