from __future__ import absolute_import
import json
import os
import re
import shutil
import tempfile
import time
import pandas as pd
from interface_meta import override
from jinja2 import Template
from omniduct.utils.debug import logger
from omniduct.utils.processes import Timeout, run_in_subprocess
from .base import DatabaseClient
from ._schemas import SchemasMixin
from . import _pandas
[docs]class HiveServer2Client(DatabaseClient, SchemasMixin):
"""
This Duct connects to an Apache HiveServer2 server instance using the
`pyhive` or `impyla` libraries.
Attributes:
schema (str, None): The default schema to use for queries (will
default to server-default if not specified).
driver (str): One of 'pyhive' (default) or 'impyla', which specifies
how the client communicates with Hive.
auth_mechanism (str): The authorisation protocol to use for connections.
Defaults to 'NOSASL'. Authorisation methods differ between drivers.
Please refer to `pyhive` and `impyla` documentation for more details.
push_using_hive_cli (bool): Whether the `.push()` operation should
directly add files using `LOAD DATA LOCAL INPATH` rather than the
`INSERT` operation via SQLAlchemy. Note that this requires the
presence of the `hive` executable on the local PATH, or if
connecting via a `RemoteClient` instance, on the remote's PATH.
This is mostly useful for older versions of Hive which do not
support the `INSERT` statement.
default_table_props (dict): A dictionary of table properties to use by
default when creating tables.
connection_options (dict): Additional options to pass through to the
`.connect()` methods of the drivers.
"""
PROTOCOLS = ['hiveserver2']
DEFAULT_PORT = 3623
SUPPORTS_SESSION_PROPERTIES = True
NAMESPACE_NAMES = ['schema', 'table']
NAMESPACE_QUOTECHAR = '`'
NAMESPACE_SEPARATOR = '.'
@property
@override
def NAMESPACE_DEFAULT(self):
return {
'schema': self.schema
}
@property
@override
def NAMESPACE_DEFAULTS_WRITE(self):
defaults = self.NAMESPACE_DEFAULTS_READ.copy()
defaults['schema'] = self.username
return defaults
@override
def _init(self, schema=None, driver='pyhive', auth_mechanism='NOSASL',
push_using_hive_cli=False, default_table_props=None,
thrift_transport=None, **connection_options
):
"""
schema (str, None): The default database/schema to use for queries (will
default to server-default if not specified).
driver (str): One of 'pyhive' (default) or 'impyla', which specifies
how the client communicates with Hive.
auth_mechanism (str): The authorisation protocol to use for connections.
Defaults to 'NOSASL'. Authorisation methods differ between drivers.
Please refer to `pyhive` and `impyla` documentation for more details.
push_using_hive_cli (bool): Whether the `.push()` operation should
directly add files using `LOAD DATA LOCAL INPATH` rather than the
`INSERT` operation via SQLAlchemy. Note that this requires the
presence of the `hive` executable on the local PATH, or if
connecting via a `RemoteClient` instance, on the remote's PATH.
This is mostly useful for older versions of Hive which do not
support the `INSERT` statement. False by default.
default_table_props (dict): A dictionary of table properties to use by
default when creating tables (default is an empty dict).
thrift_transport (TTransportBase): A thrift transport object for custom advanced usage.
Incompatible with host, port, auth_mechanism, and password.
Typically used to enable Thrift http transport to hiveserver2.
**connection_options (dict): Additional options to pass through to the
`.connect()` methods of the drivers.
"""
self.schema = schema
self.driver = driver
self.auth_mechanism = auth_mechanism
self.connection_options = connection_options
self.push_using_hive_cli = push_using_hive_cli
self.default_table_props = default_table_props or {}
self._thrift_transport = thrift_transport
self.__hive = None
self.connection_fields += ('schema',)
assert self.driver in ('pyhive', 'impyla'), "Supported drivers are pyhive and impyla."
@override
def _connect(self):
from sqlalchemy import create_engine, MetaData
if self.driver == 'pyhive':
try:
import pyhive.hive
except ImportError:
raise ImportError("""
Omniduct is attempting to use the 'pyhive' driver, but it
is not installed. Please either install the pyhive package,
or reconfigure this Duct to use the 'impyla' driver.
""")
self.__hive = pyhive.hive.connect(host=None if self._thrift_transport else self.host,
port=None if self._thrift_transport else self.port,
auth=None if self._thrift_transport else self.auth_mechanism,
database=self.schema,
username=self.username,
password=None if self._thrift_transport else self.password,
thrift_transport=self._thrift_transport,
**self.connection_options)
self._sqlalchemy_engine = create_engine('hive://{}:{}/{}'.format(self.host, self.port, self.schema))
self._sqlalchemy_metadata = MetaData(self._sqlalchemy_engine)
elif self.driver == 'impyla':
try:
import impala.dbapi
except ImportError:
raise ImportError("""
Omniduct is attempting to use the 'impyla' driver, but it
is not installed. Please either install the impyla package,
or reconfigure this Duct to use the 'pyhive' driver.
""")
self.__hive = impala.dbapi.connect(host=self.host,
port=self.port,
auth_mechanism=self.auth_mechanism,
database=self.schema,
user=self.username,
password=self.password,
**self.connection_options)
self._sqlalchemy_engine = create_engine('impala://{}:{}/{}'.format(self.host, self.port, self.schema))
self._sqlalchemy_metadata = MetaData(self._sqlalchemy_engine)
def __hive_cursor(self):
if self.driver == 'impyla': # Impyla seems to have all manner of connection issues, attempt to restore connection
try:
with Timeout(1):
return self.__hive.cursor()
except:
self._connect()
return self.__hive.cursor()
@override
def _is_connected(self):
return self.__hive is not None
@override
def _disconnect(self):
logger.info('Disconnecting from Hive coordinator...')
try:
self.__hive.close()
except:
pass
self.__hive = None
self._sqlalchemy_engine = None
self._sqlalchemy_metadata = None
self._schemas = None
@override
def _statement_prepare(self, statement, session_properties, **kwargs):
return (
"\n".join(
"SET {key} = {value};".format(key=key, value=value)
for key, value in session_properties.items()
) + statement
)
@override
def _execute(self, statement, cursor, wait, session_properties, poll_interval=1):
"""
Additional Args:
poll_interval (int): Default delay in seconds between consecutive
query status (defaults to 1).
"""
cursor = cursor or self.__hive_cursor()
log_offset = 0
if self.driver == 'pyhive':
from TCLIService.ttypes import TOperationState # noqa: F821
cursor.execute(statement, **{'async': True})
if wait:
status = cursor.poll().operationState
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
log_offset = self._log_status(cursor, log_offset)
time.sleep(poll_interval)
status = cursor.poll().operationState
elif self.driver == 'impyla':
cursor.execute_async(statement)
if wait:
while cursor.is_executing():
log_offset = self._log_status(cursor, log_offset)
time.sleep(poll_interval)
return cursor
@override
def _cursor_empty(self, cursor):
if self.driver == 'impyla':
return not cursor.has_result_set
elif self.driver == 'pyhive':
return cursor.description is None
return False
def _cursor_wait(self, cursor, poll_interval=1):
from TCLIService.ttypes import TOperationState # noqa: F821
status = cursor.poll().operationState
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
time.sleep(poll_interval)
status = cursor.poll().operationState
def _log_status(self, cursor, log_offset=0):
matcher = re.compile('[0-9/]+ [0-9:]+ (INFO )?')
if self.driver == 'pyhive':
log = cursor.fetch_logs()
else:
log = cursor.get_log().strip().split('\n')
for line in log[log_offset:]:
if not line:
continue
m = matcher.match(line)
if m:
line = line[len(m.group(0)):]
logger.info(line)
return len(log)
@override
def _query_to_table(self, statement, table, if_exists, **kwargs):
statements = []
if if_exists == 'fail' and self.table_exists(table):
raise RuntimeError("Table {} already exists!".format(table))
elif if_exists == 'replace':
statements.append('DROP TABLE IF EXISTS {};'.format(table))
elif if_exists == 'append':
raise NotImplementedError("Append operations have not been implemented for {}.".format(self.__class__.__name__))
statements.append("CREATE TABLE {table} AS ({statement})".format(
table=table,
statement=statement
))
return self.execute('\n'.join(statements), **kwargs)
@override
def _dataframe_to_table(
self, df, table, if_exists='fail', use_hive_cli=None,
partition=None, sep=chr(1), table_props=None, dtype_overrides=None, **kwargs
):
"""
If `use_hive_cli` (or if not specified `.push_using_hive_cli`) is
`True`, a `CREATE TABLE` statement will be automatically generated based
on the datatypes of the DataFrame (unless overwritten by
`dtype_overrides`). The `DataFrame` will then be exported to a CSV
compatible with Hive and uploaded (if necessary) to the remote, before
being loaded into Hive using a `LOAD DATA LOCAL INFILE ...` query using
the `hive` cli executable. Note that if a table is not partitioned, you
cannot convert it to a parititioned table without deleting it first.
If `use_hive_cli` (or if not specified `.push_using_hive_cli`) is
`False`, an attempt will be made to push the `DataFrame` to Hive using
`pandas.DataFrame.to_sql` and the SQLAlchemy binding provided by
`pyhive` and `impyla`. This may be slower, does not support older
versions of Hive, and does not support table properties or partitioning.
If if the schema namespace is not specified, `table.schema` will be
defaulted to your username.
Additional Args:
use_hive_cli (bool, None): A local override for the global
`.push_using_hive_cli` attribute. If not specified, the global
default is used. If True, then pushes are performed using the
`hive` CLI executable on the local/remote PATH.
**kwargs (dict): Additional arguments to send to `pandas.DataFrame.to_sql`.
Further Parameters for CLI method (specifying these for the pandas
method will cause a `RuntimeError` exception):
partition (dict): A mapping of column names to values that specify
the partition into which the provided data should be uploaded,
as well as providing the fields by which new tables should be
partitioned.
sep (str): Field delimiter for data (defaults to CTRL-A, or `chr(1)`).
table_props (dict): Properties to set on any newly created tables
(extends `.default_table_props`).
dtype_overrides (dict): Mapping of column names to Hive datatypes to
use instead of default mapping.
"""
use_hive_cli = use_hive_cli or self.push_using_hive_cli
partition = partition or {}
table_props = table_props or {}
dtype_overrides = dtype_overrides or {}
# Try using SQLALchemy method
if not use_hive_cli:
if partition or table_props or dtype_overrides:
raise RuntimeError(
"At least one of `partition` or `table_props` or "
"`dtype_overrides` has been specified. Setting table "
"properties or partition information is not supported "
"via the SQLAlchemy backend. If this is important, please "
"pass `use_hive_cli=True`, otherwise remove these values "
"and try again."
)
try:
return _pandas.to_sql(
df=df, name=table.table, schema=table.schema, con=self._sqlalchemy_engine,
index=False, if_exists=if_exists, **kwargs
)
except Exception as e:
raise RuntimeError(
"Push unsuccessful. Your version of Hive may be too old to "
"support the `INSERT` keyword. You might want to try setting "
"`.push_using_hive_cli = True` if your local or remote "
"machine has access to the `hive` CLI executable. The "
"original exception was: {}".format(e.args[0])
)
# Try using Hive CLI
# If `partition` is specified, the associated columns must not be
# present in the dataframe.
assert len(set(partition).intersection(df.columns)) == 0, "The dataframe to be uploaded must not have any partitioned fields. Please remove the field(s): {}.".format(','.join(set(partition).intersection(df.columns)))
# Save dataframe to file and send it to the remote server if necessary
temp_dir = tempfile.mkdtemp(prefix='omniduct_hiveserver2')
tmp_fname = os.path.join(temp_dir, 'data_{}.csv'.format(time.time()))
logger.info('Saving dataframe to file... {}'.format(tmp_fname))
df.fillna(r'\N').to_csv(tmp_fname, index=False, header=False,
sep=sep, encoding='utf-8')
if self.remote:
logger.info("Uploading data to remote host...")
self.remote.upload(tmp_fname)
# Generate create table statement.
auto_table_props = set(self.default_table_props).difference(table_props)
if len(auto_table_props) > 0:
logger.warning(
"In addition to any specified table properties, this "
"HiveServer2Client has added the following default table "
"properties:\n{default_props}\nTo override them, please "
"specify overrides using: `.push(..., table_props={{...}}).`"
.format(default_props=json.dumps({
prop: value for prop, value in self.default_table_props.items()
if prop in auto_table_props
}, indent=True))
)
tblprops = self.default_table_props.copy()
tblprops.update(table_props or {})
cts = self._create_table_statement_from_df(
df=df,
table=table,
drop=(if_exists == 'replace') and not partition,
text=True,
sep=sep,
table_props=tblprops,
partition_cols=list(partition),
dtype_overrides=dtype_overrides
)
# Generate load data statement.
partition_clause = (
''
if not partition
else 'PARTITION ({})'.format(
','.join("{key} = '{value}'".format(key=key, value=value) for key, value in partition.items())
)
)
lds = '\nLOAD DATA LOCAL INPATH "{path}" {overwrite} INTO TABLE {table} {partition_clause};'.format(
path=os.path.basename(tmp_fname) if self.remote else tmp_fname,
overwrite="OVERWRITE" if if_exists == "replace" else "",
table=table,
partition_clause=partition_clause
)
# Run create table statement and load data statments
logger.info(
"Creating hive table `{table}` if it does not "
"already exist, and inserting the provided data{partition}."
.format(
table=table,
partition=" into {}".format(partition_clause) if partition_clause else ""
)
)
try:
stmts = '\n'.join([cts, lds])
logger.debug(stmts)
proc = self._run_in_hivecli(stmts)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.decode('utf-8'))
finally:
# Clean up files
if self.remote:
self.remote.execute('rm -rf {}'.format(tmp_fname))
shutil.rmtree(temp_dir, ignore_errors=True)
logger.info("Successfully uploaded dataframe {partition}`{table}`.".format(
table=table,
partition="into {} of ".format(partition_clause) if partition_clause else ""
))
@override
def _table_list(self, namespace, like='*', **kwargs):
schema = namespace.name or self.schema
return self.query("SHOW TABLES IN {0} '{1}'".format(schema, like),
**kwargs)
@override
def _table_exists(self, table, **kwargs):
logger.disabled = True
try:
self.table_desc(table, **kwargs)
return True
except:
return False
finally:
logger.disabled = False
@override
def _table_drop(self, table, **kwargs):
return self.execute("DROP TABLE {table}".format(table=table))
@override
def _table_desc(self, table, **kwargs):
records = self.query("DESCRIBE {0}".format(table), **kwargs)
# pretty hacky but hive doesn't return DESCRIBE results in a nice format
# TODO is there any information we should pull out of DESCRIBE EXTENDED
for i, record in enumerate(records):
if record[0] == '':
break
columns = ['col_name', 'data_type', 'comment']
fields_df = pd.DataFrame(records[:i], columns=columns)
partitions_df = pd.DataFrame(records[i + 4:], columns=columns)
partitions_df['comment'] = "PARTITION " + partitions_df['comment']
return pd.concat((fields_df, partitions_df))
@override
def _table_head(self, table, n=10, **kwargs):
return self.query("SELECT * FROM {} LIMIT {}".format(table, n), **kwargs)
@override
def _table_props(self, table, **kwargs):
return self.query('SHOW TBLPROPERTIES {0}'.format(table), **kwargs)
def _run_in_hivecli(self, cmd):
"""Run a query using hive cli in a subprocess."""
# Turn hive command into quotable string.
double_escaped = re.sub('\\' * 2, '\\' * 4, cmd)
backtick_escape = '\\\\\\`' if self.remote else '\\`'
sys_cmd = 'hive -e "{0}"'.format(re.sub('"', '\\"', double_escaped)) \
.replace('`', backtick_escape)
# Execute command in a subprocess.
if self.remote:
proc = self.remote.execute(sys_cmd)
else:
proc = run_in_subprocess(sys_cmd, check_output=True)
return proc
@classmethod
def _create_table_statement_from_df(cls, df, table, drop=False,
text=True, sep=chr(1), loc=None,
table_props=None, partition_cols=None,
dtype_overrides=None):
"""
Return create table statement for new hive table based on pandas dataframe.
Args:
df (pandas.DataFrame, pandas.Series): Used to determine column names
and types for create table statement.
table (ParsedNamespaces): The parsed name of the target table.
drop (bool): Whether to include a drop table statement before the
create table statement.
text (bool): Whether data will be stored as a textfile.
sep (str): The separator used by the text data store (defaults to
CTRL-A, i.e. `chr(1)`, which is the default Hive separator).
loc (str): Desired HDFS location (if not the default).
table_props (dict): The table properties (if any) to set on the table.
partition_cols (list): The columns by which the created table should
be partitioned.
Returns:
str: The Hive SQL required to create the table with the above
configuration.
"""
table_props = table_props or {}
partition_cols = partition_cols or []
dtype_overrides = dtype_overrides or {}
# dtype kind to hive type mapping dict.
DTYPE_KIND_HIVE_TYPE = {
'b': 'BOOLEAN', # boolean
'i': 'BIGINT', # signed integer
'u': 'BIGINT', # unsigned integer
'f': 'DOUBLE', # floating-point
'c': 'STRING', # complex floating-point
'O': 'STRING', # object
'S': 'STRING', # (byte-)string
'U': 'STRING', # Unicode
'V': 'STRING' # void
}
# Sanitise column names and map numpy/pandas data-types to hive types.
columns = []
for col, dtype in df.dtypes.iteritems():
col_sanitized = re.sub(r'\W', '', col.lower().replace(' ', '_'))
hive_type = dtype_overrides.get(col) or DTYPE_KIND_HIVE_TYPE.get(dtype.kind)
if hive_type is None:
hive_type = DTYPE_KIND_HIVE_TYPE['O']
logger.warning(
'Unable to determine hive type for dataframe column {col} of pandas dtype {dtype}. '
'Defaulting to hive type {hive_type}. If other column type is desired, '
'please specify via `dtype_overrides`'
.format(**locals())
)
columns.append(
' {column} {type}'.format(column=col_sanitized, type=hive_type)
)
partition_columns = ['{} STRING'.format(col) for col in partition_cols]
tblprops = ["'{key}' = '{value}'".format(key=key, value=value) for key, value in table_props.items()]
tblprops = "TBLPROPERTIES({})".format(",".join(tblprops)) if len(tblprops) > 0 else ""
cmd = Template("""
{% if drop %}
DROP TABLE IF EXISTS {{ table }};
{% endif -%}
CREATE TABLE IF NOT EXISTS {{ table }} (
{%- for col in columns %}
{{ col }} {% if not loop.last %}, {% endif %}
{%- endfor %}
)
{%- if partition_columns %}
PARTITIONED BY (
{%- for col in partition_columns %}
{{ col }} {% if not loop.last %}, {% endif %}
{%- endfor %}
)
{%- endif %}
{%- if text %}
ROW FORMAT DELIMITED
FIELDS TERMINATED BY "{{ sep }}"
STORED AS TEXTFILE
{% endif %}
{%- if loc %}
LOCATION "{{ loc }}"
{%- endif %}
{{ tblprops }}
;
""").render(**locals())
return cmd