Source code for omniduct.databases.base

from __future__ import absolute_import, print_function

import hashlib
import inspect
import itertools
import logging
import os
import sys
from abc import abstractmethod

import jinja2
import jinja2.meta
import sqlparse
from decorator import decorator
from interface_meta import quirk_docs, override

from omniduct.caches.base import cached_method
from omniduct.duct import Duct
from omniduct.filesystems.local import LocalFsClient
from omniduct.utils.debug import logger, logging_scope
from omniduct.utils.decorators import require_connection
from omniduct.utils.magics import (MagicsProvider, process_line_arguments,
                                   process_line_cell_arguments)

from . import _cursor_formatters
from ._cursor_serializer import CursorSerializer
from ._namespaces import ParsedNamespaces

logging.getLogger('requests').setLevel(logging.WARNING)


@decorator
def render_statement(method, self, statement, *args, **kwargs):
    """
    Pre-render a statement template prior to wrapped function execution.

    This decorator expects to act as wrapper on functions which
    takes statements as the second argument.
    """
    if kwargs.pop('template', True):
        statement = self.template_render(
            statement,
            context=kwargs.pop('context', {}),
            by_name=False,
        )
    return method(self, statement, *args, **kwargs)


[docs]class DatabaseClient(Duct, MagicsProvider): """ An abstract class providing the common API for all database clients. Note: `DatabaseClient` subclasses are callable, so that one can use `DatabaseClient(...)` as a short-hand for `DatabaseClient.query(...)`. Class Attributes: DUCT_TYPE (`Duct.Type`): The type of `Duct` protocol implemented by this class. DEFAULT_PORT (int): The default port for the database service (defined by subclasses). CURSOR_FORMATTERS (dict<str, CursorFormatter): asdsd DEFAULT_CURSOR_FORMATTER (str): ... """ DUCT_TYPE = Duct.Type.DATABASE DEFAULT_PORT = None CURSOR_FORMATTERS = { 'pandas': _cursor_formatters.PandasCursorFormatter, 'hive': _cursor_formatters.HiveCursorFormatter, 'csv': _cursor_formatters.CsvCursorFormatter, 'tuple': _cursor_formatters.TupleCursorFormatter, 'dict': _cursor_formatters.DictCursorFormatter, 'raw': _cursor_formatters.RawCursorFormatter, } DEFAULT_CURSOR_FORMATTER = 'pandas' SUPPORTS_SESSION_PROPERTIES = False NAMESPACE_NAMES = ['database', 'table'] NAMESPACE_QUOTECHAR = '"' NAMESPACE_SEPARATOR = '.' NAMESPACE_DEFAULT = None # DEPRECATED (use NAMESPACE_DEFAULTS_READ instead): Will be removed in Omniduct 2.0.0 @property def NAMESPACE_DEFAULTS_READ(self): """ Backwards compatible shim for `NAMESPACE_DEFAULTS`. """ return self.NAMESPACE_DEFAULT @property def NAMESPACE_DEFAULTS_WRITE(self): """ Unless overridden, this is the same as `NAMESPACE_DEFAULTS_READ`. """ return self.NAMESPACE_DEFAULTS_READ
[docs] @quirk_docs('_init', mro=True) def __init__( self, session_properties=None, templates=None, template_context=None, default_format_opts=None, **kwargs ): """ session_properties (dict): A mapping of default session properties to values. Interpretation is left up to implementations. templates (dict): A dictionary of name to template mappings. Additional templates can be added using `.template_add`. template_context (dict): The default template context to use when rendering templates. default_format_opts (dict): The default formatting options passed to cursor formatter. """ Duct.__init_with_kwargs__(self, kwargs, port=self.DEFAULT_PORT) self.session_properties = session_properties or {} self._templates = templates or {} self._template_context = template_context or {} self._sqlalchemy_engine = None self._sqlalchemy_metadata = None self._default_format_opts = default_format_opts or {} self._init(**kwargs)
@abstractmethod def _init(self): pass # Session property management and configuration @property def session_properties(self): """dict: The default session properties used in statement executions.""" return self._session_properties @session_properties.setter def session_properties(self, properties): self._session_properties = self._get_session_properties(default=properties) def _get_session_properties(self, overrides=None, default=None): """ Retrieve the default session properties with optional overrides. Properties with a value of None will be skipped, in order to allow overrides to remove default properties. Args: overrides (dict, None): A dictionary of session property overrides. default (dict, None): A dictionary of default session properties, if it is necessary to override `self.session_properties`. Returns: dict: A dictionary of session properties. """ if (default or overrides) and not self.SUPPORTS_SESSION_PROPERTIES: raise RuntimeError("Session properties are not supported by this backend.") props = (default if default is not None else self.session_properties).copy() props.update(overrides or {}) # Remove any properties with value set to None. for key, value in props.items(): if value is None: del props[key] return props def __call__(self, query, **kwargs): return self.query(query, **kwargs) # Querying def _statement_prepare(self, statement, session_properties, **kwargs): """ Prepare a statement for execution. This is a hook that can be used by subclasses to transform and/or validate statements before execution in `_execute`; for example inserting session properties in query headers. Args: statement (str): The statement to be executed. session_properties (dict): A mutable dictionary of session properties and their values (this method can mutate it depending on statement contents). **kwargs (dict): Any additional keyword arguments passed through to `self.execute` (will match the extra keyword arguments added to `self._execute`). Returns: statement (str): The statement to be executed (potentially transformed). """ return statement def _statement_split(self, statements): """ Split a statement into separate SQL statements. This method converts a single string containing one or more SQL statements into an iterator of strings, each corresponding to one SQL statement. If the statement's language is not to be SQL, this method should be overloaded appropriately. Args: statements (str): A string containing one or more SQL statements. Returns: iterator<str>: An iterator of SQL statements. """ for statement in sqlparse.split(statements): statement = statement.strip() if statement.endswith(';'): statement = statement[:-1].strip() if statement: # remove empty statements yield statement
[docs] @classmethod def statement_hash(cls, statement, cleanup=True): """ Retrieve the hash to use to identify query statements to the cache. Args: statement (str): A string representation of the statement to be hashed. cleanup (bool): Whether the statement should first be consistently reformatted using `statement_cleanup`. Returns: str: The hash used to identify a statement to the cache. """ if cleanup: statement = cls.statement_cleanup(statement) if ( sys.version_info.major == 3 or sys.version_info.major == 2 and isinstance(statement, unicode) # noqa: F821 ): statement = statement.encode('utf8') return hashlib.sha256(statement).hexdigest()
[docs] @classmethod def statement_cleanup(cls, statement): """ Clean up statements prior to hash computation. This classmethod takes an SQL statement and reformats it by consistently removing comments and replacing all whitespace. It is used by the `statement_hash` method to avoid functionally identical queries hitting different cache keys. If the statement's language is not to be SQL, this method should be overloaded appropriately. Args: statement (str): The statement to be reformatted/cleaned-up. Returns: str: The new statement, consistently reformatted. """ statement = sqlparse.format(statement, strip_comments=True, reindent=True) statement = os.linesep.join([line for line in statement.splitlines() if line]) return statement
[docs] @render_statement @cached_method( key=lambda self, kwargs: self.statement_hash( statement=kwargs['statement'], cleanup=kwargs.pop('cleanup', True) ), serializer=lambda self, kwargs: CursorSerializer(), use_cache=lambda self, kwargs: kwargs.pop('use_cache', False), metadata=lambda self, kwargs: { 'statement': kwargs['statement'], 'session_properties': kwargs['session_properties'] } ) @quirk_docs('_execute') @require_connection def execute(self, statement, wait=True, cursor=None, session_properties=None, **kwargs): """ Execute a statement against this database and return a cursor object. Where supported by database implementations, this cursor can the be used in future executions, by passing it as the `cursor` keyword argument. Args: statement (str): The statement to be executed by the query client (possibly templated). wait (bool): Whether the cursor should be returned before the server-side query computation is complete and the relevant results downloaded. cursor (DBAPI2 cursor): Rather than creating a new cursor, execute the statement against the provided cursor. session_properties (dict): Additional session properties and/or overrides to use for this query. Setting a session property value to `None` will cause it to be omitted. **kwargs (dict): Extra keyword arguments to be passed on to `_execute`, as implemented by subclasses. template (bool): Whether the statement should be treated as a Jinja2 template. [Used by `render_statement` decorator.] context (dict): The context in which the template should be evaluated (a dictionary of parameters to values). [Used by `render_statement` decorator.] use_cache (bool): True or False (default). Whether to use the cache (if present). [Used by `cached_method` decorator.] renew (bool): True or False (default). If cache is being used, renew it before returning stored value. [Used by `cached_method` decorator.] cleanup (bool): Whether statement should be cleaned up before computing the hash used to cache results. [Used by `cached_method` decorator.] Returns: DBAPI2 cursor: A DBAPI2 compatible cursor instance. """ session_properties = self._get_session_properties(overrides=session_properties) statements = list(self._statement_split( self._statement_prepare(statement, session_properties=session_properties, **kwargs) )) assert len(statements) > 0, "No non-empty statements were provided." for statement in statements[:-1]: cursor = self._execute(statement, cursor=cursor, wait=True, session_properties=session_properties, **kwargs) cursor = self._execute(statements[-1], cursor=cursor, wait=wait, session_properties=session_properties, **kwargs) return cursor
[docs] @logging_scope("Query", timed=True) @render_statement def query(self, statement, format=None, format_opts={}, use_cache=True, **kwargs): """ Execute a statement against this database and collect formatted data. Args: statement (str): The statement to be executed by the query client (possibly templated). format (str): A subclass of CursorFormatter, or one of: 'pandas', 'hive', 'csv', 'tuple' or 'dict'. Defaults to `self.DEFAULT_CURSOR_FORMATTER`. format_opts (dict): A dictionary of format-specific options. use_cache (bool): Whether to cache the cursor returned by `DatabaseClient.execute()` (overrides the default of False for `.execute()`). (default=True) **kwargs (dict): Additional arguments to pass on to `DatabaseClient.execute()`. Returns: The results of the query formatted as nominated. """ cursor = self.execute(statement, wait=True, template=False, use_cache=use_cache, **kwargs) # Some DBAPI2 cursor implementations error if attempting to extract # data from an empty cursor, and if so, we simply return None. if self._cursor_empty(cursor): return None formatter = self._get_formatter(format, cursor, **format_opts) return formatter.dump()
[docs] def stream(self, statement, format=None, format_opts={}, batch=None, **kwargs): """ Execute a statement against this database and stream formatted results. This method returns a generator over objects representing rows in the result set. If `batch` is not `None`, then the iterator will be over lists of length `batch` containing formatted rows. Args: statement (str): The statement to be executed against the database. format (str): A subclass of CursorFormatter, or one of: 'pandas', 'hive', 'csv', 'tuple' or 'dict'. Defaults to `self.DEFAULT_CURSOR_FORMATTER`. format_opts (dict): A dictionary of format-specific options. batch (int): If not `None`, the number of rows from the resulting cursor to be returned at once. **kwargs (dict): Additional keyword arguments to pass onto `DatabaseClient.execute`. Returns: iterator: An iterator over objects of the nominated format or, if batched, a list of such objects. """ cursor = self.execute(statement, wait=True, **kwargs) formatter = self._get_formatter(format, cursor, **format_opts) for row in formatter.stream(batch=batch): yield row
def _get_formatter(self, formatter, cursor, **kwargs): formatter = formatter or self.DEFAULT_CURSOR_FORMATTER if not (inspect.isclass(formatter) and issubclass(formatter, _cursor_formatters.CursorFormatter)): assert formatter in self.CURSOR_FORMATTERS, "Invalid format '{}'. Choose from: {}".format(formatter, ','.join(self.CURSOR_FORMATTERS.keys())) formatter = self.CURSOR_FORMATTERS[formatter] format_opts = dict(itertools.chain(self._default_format_opts.items(), kwargs.items())) return formatter(cursor, **format_opts)
[docs] def stream_to_file(self, statement, file, format='csv', fs=None, **kwargs): """ Execute a statement against this database and stream results to a file. This method is a wrapper around `DatabaseClient.stream` that enables the iterative writing of cursor results to a file. This is especially useful when there are a very large number of results, and loading them all into memory would require considerable resources. Note that 'csv' is the default format for this method (rather than `pandas`). Args: statement (str): The statement to be executed against the database. file (str, file-like-object): The filename where the data should be written, or an open file-like resource. format (str): The format to be used ('csv' by default). Format options can be passed via `**kwargs`. fs (None, FileSystemClient): The filesystem wihin which the nominated file should be found. If `None`, the local filesystem will be used. **kwargs: Additional keyword arguments to pass onto `DatabaseClient.stream`. """ close_later = False if isinstance(file, str): file = (fs or LocalFsClient()).open(file, 'w') close_later = True try: file.writelines(self.stream(statement, format=format, **kwargs)) finally: if close_later: file.close()
[docs] def execute_from_file(self, file, fs=None, **kwargs): """ Execute a statement stored in a file. Args: file (str, file-like-object): The path of the file containing the query statement to be executed against the database, or an open file-like resource. fs (None, FileSystemClient): The filesystem wihin which the nominated file should be found. If `None`, the local filesystem will be used. **kwargs (dict): Extra keyword arguments to pass on to `DatabaseClient.execute`. Returns: DBAPI2 cursor: A DBAPI2 compatible cursor instance. """ close_later = False if isinstance(file, str): file = (fs or LocalFsClient()).open(file, 'r') close_later = True try: return self.execute(file.read(), **kwargs) finally: if close_later: file.close()
[docs] def query_from_file(self, file, fs=None, **kwargs): """ Query using a statement stored in a file. Args: file (str, file-like-object): The path of the file containing the query statement to be executed against the database, or an open file-like resource. fs (None, FileSystemClient): The filesystem wihin which the nominated file should be found. If `None`, the local filesystem will be used. **kwargs (dict): Extra keyword arguments to pass on to `DatabaseClient.query`. Returns: object: The results of the query formatted as nominated. """ close_later = False if isinstance(file, str): file = (fs or LocalFsClient()).open(file, 'r') close_later = True try: return self.query(file.read(), **kwargs) finally: if close_later: file.close()
@property def template_names(self): """ list: A list of names associated with the templates associated with this client. """ return list(self._templates)
[docs] def template_add(self, name, body): """ Add a named template to the internal dictionary of templates. Note: Templates are interpreted as `jinja2` templates. See `.template_render` for more information. Args: name (str): The name of the template. body (str): The (typically) multiline body of the template. Returns: PrestoClient: A reference to this object. """ self._templates[name] = body return self
[docs] def template_get(self, name): """ Retrieve a named template. Args: name (str): The name of the template to retrieve. Raises: ValueError: If `name` is not associated with a template. Returns: str: The requested template. """ if name not in self._templates: raise ValueError("No such template named: '{}'.".format(name)) return self._templates[name]
[docs] def template_variables(self, name_or_statement, by_name=False): """ Return the set of undeclared variables required for this template. Args: name_or_statement (str): The name of a template (if `by_name` is True) or else a string representation of a `jinja2` template. by_name (bool): `True` if `name_or_statement` should be interpreted as a template name, or `False` (default) if `name_or_statement` should be interpreted as a template body. Returns: set<str>: A set of names which the template requires to be rendered. """ ast = jinja2.Environment().parse( self.template_render(name_or_statement, by_name=by_name, meta_only=True) ) return jinja2.meta.find_undeclared_variables(ast)
[docs] def template_render(self, name_or_statement, context=None, by_name=False, cleanup=False, meta_only=False): """ Render a template by name or value. In addition to the `jinja2` templating syntax, described in more detail in the official `jinja2` documentation, a meta-templating extension is also provided. This meta-templating allows you to reference other reference other templates. For example, if you had two SQL templates named 'template_a' and 'template_b', then you could render them into one SQL query using (for example): .. code-block:: python .template_render(''' WITH a AS ( {{{template_a}}} ), b AS ( {{{template_b}}} ) SELECT * FROM a JOIN b ON a.x = b.x ''') Note that template substitution in this way is iterative, so you can chain template embedding, provided that such embedding is not recursive. Args: name_or_statement (str): The name of a template (if `by_name` is True) or else a string representation of a `jinja2` template. context (dict, None): A dictionary to use as the template context. If not specified, an empty dictionary is used. by_name (bool): `True` if `name_or_statement` should be interpreted as a template name, or `False` (default) if `name_or_statement` should be interpreted as a template body. cleanup (bool): `True` if the rendered statement should be formatted, `False` (default) otherwise meta_only (bool): `True` if rendering should only progress as far as rendering nested templates (i.e. don't actually substitute in variables from the context); `False` (default) otherwise. Returns: str: The rendered template. """ if by_name: if name_or_statement not in self._templates: raise ValueError("No such template of name: '{}'.".format(name_or_statement)) statement = self._templates[name_or_statement] else: statement = name_or_statement try: from sqlalchemy.sql.base import Executable if isinstance(statement, Executable): statement = str(statement.compile(compile_kwargs={"literal_binds": True})) except ImportError: pass if context is None or context is False: context = {} template_context = {} template_context.update(self._template_context) # default context template_context.update(context) # context passed in intersection = set(self._template_context.keys()) & set(context.keys()) if intersection: logger.warning( "The following default template context keys have been overridden " "by the local context: {}." .format(intersection) ) # Substitute in any other named statements recursively while '{{{' in statement or '{{%' in statement: statement = ( jinja2.Template( statement, block_start_string='{{%', block_end_string='%}}', variable_start_string='{{{', variable_end_string='}}}', comment_start_string='{{#', comment_end_string='#}}', undefined=jinja2.StrictUndefined ) .render(getattr(self, '_templates', {})) ) if not meta_only: statement = ( jinja2.Template(statement, undefined=jinja2.StrictUndefined) .render(template_context) ) if cleanup: statement = self.statement_cleanup(statement) return statement
[docs] def execute_from_template(self, name, context=None, **kwargs): """ Render and then execute a named template. Args: name (str): The name of the template to be rendered and executed. context (dict): The context in which the template should be rendered. **kwargs (dict): Additional parameters to pass to `.execute()`. Returns: DBAPI2 cursor: A DBAPI2 compatible cursor instance. """ statement = self.template_render(name, context, by_name=True) return self.execute(statement, **kwargs)
[docs] def query_from_template(self, name, context=None, **kwargs): """ Render and then query using a named tempalte. Args: name (str): The name of the template to be rendered and used to query the database. context (dict): The context in which the template should be rendered. **kwargs (dict): Additional parameters to pass to `.query()`. Returns: object: The results of the query formatted as nominated. """ statement = self.template_render(name, context, by_name=True) return self.query(statement, **kwargs)
# Uploading/querying data into data store
[docs] @logging_scope('Query [CTAS]', timed=True) @quirk_docs('_query_to_table') def query_to_table(self, statement, table, if_exists='fail', **kwargs): """ Run a query and store the results in a table in this database. Args: statement: The statement to be executed. table (str): The name of the table into which the dataframe should be uploaded. if_exists (str): if nominated table already exists: 'fail' to do nothing, 'replace' to drop, recreate and insert data into new table, and 'append' to add data from this table into the existing table. **kwargs (dict): Additional keyword arguments to pass onto `DatabaseClient._query_to_table`. Returns: DB-API cursor: The cursor object associated with the execution. """ assert if_exists in {'fail', 'replace', 'append'} table = self._parse_namespaces(table, write=True) return self._query_to_table(statement, table, if_exists=if_exists, **kwargs)
[docs] @logging_scope('Dataframe Upload', timed=True) @quirk_docs('_dataframe_to_table') @require_connection def dataframe_to_table(self, df, table, if_exists='fail', **kwargs): """ Upload a local pandas dataframe into a table in this database. Args: df (pandas.DataFrame): The dataframe to upload into the database. table (str, ParsedNamespaces): The name of the table into which the dataframe should be uploaded. if_exists (str): if nominated table already exists: 'fail' to do nothing, 'replace' to drop, recreate and insert data into new table, and 'append' to add data from this table into the existing table. **kwargs (dict): Additional keyword arguments to pass onto `DatabaseClient._dataframe_to_table`. """ assert if_exists in {'fail', 'replace', 'append'} self._dataframe_to_table(df, self._parse_namespaces(table, write=True), if_exists=if_exists, **kwargs)
# Table properties @abstractmethod def _execute(self, statement, cursor, wait, session_properties): pass def _query_to_table(self, statement, table, if_exists, **kwargs): raise NotImplementedError def _dataframe_to_table(self, df, table, if_exists='fail', **kwargs): raise NotImplementedError def _cursor_empty(self, cursor): return cursor is None def _parse_namespaces(self, name, level=0, defaults=None, write=False): return ParsedNamespaces.from_name( name, self.NAMESPACE_NAMES[:-level] if level > 0 else self.NAMESPACE_NAMES, quote_char=self.NAMESPACE_QUOTECHAR, separator=self.NAMESPACE_SEPARATOR, defaults=defaults if defaults else (self.NAMESPACE_DEFAULTS_WRITE if write else self.NAMESPACE_DEFAULTS_READ), )
[docs] @quirk_docs('_table_list') def table_list(self, namespace=None, renew=True, **kwargs): """ Return a list of table names in the data source as a DataFrame. Args: namespace (str): The namespace in which to look for tables. renew (bool): Whether to renew the table list or use cached results (default: True). **kwargs (dict): Additional arguments passed through to implementation. Returns: list<str>: The names of schemas in this database. """ return self._table_list(self._parse_namespaces(namespace, level=1), renew=renew, **kwargs)
@abstractmethod def _table_list(self, namespace, **kwargs): pass
[docs] @quirk_docs('_table_exists') def table_exists(self, table, renew=True, **kwargs): """ Check whether a table exists. Args: table (str): The table for which to check. renew (bool): Whether to renew the table list or use cached results (default: True). **kwargs (dict): Additional arguments passed through to implementation. Returns: bool: `True` if table exists, and `False` otherwise. """ return self._table_exists(table=self._parse_namespaces(table), renew=renew, **kwargs)
@abstractmethod def _table_exists(self, table, **kwargs): pass
[docs] @quirk_docs('_table_drop') def table_drop(self, table, **kwargs): """ Remove a table from the database. Args: table (str): The table to drop. **kwargs (dict): Additional arguments passed through to implementation. Returns: DB-API cursor: The cursor associated with this execution. """ return self._table_drop(table=self._parse_namespaces(table, write=True), **kwargs)
@abstractmethod def _table_drop(self, table, **kwargs): pass
[docs] @quirk_docs('_table_desc') def table_desc(self, table, renew=True, **kwargs): """ Describe a table in the database. Args: table (str): The table to describe. renew (bool): Whether to renew the results (default: True). **kwargs (dict): Additional arguments passed through to implementation. Returns: pandas.DataFrame: A dataframe description of the table. """ return self._table_desc(table=self._parse_namespaces(table), renew=renew, **kwargs)
@abstractmethod def _table_desc(self, table, **kwargs): pass
[docs] @quirk_docs('_table_partition_cols') def table_partition_cols(self, table, renew=True, **kwargs): """ Extract the columns by which a table is partitioned (if database supports partitions). Args: table (str): The table from which to extract data. renew (bool): Whether to renew the results (default: True). **kwargs (dict): Additional arguments passed through to implementation. Returns: list<str>: A list of columns by which table is partitioned. """ return self._table_partition_cols(table=self._parse_namespaces(table), renew=renew, **kwargs)
def _table_partition_cols(self, table, **kwargs): raise NotImplementedError( "Database backend `{}` does not support, or has not implemented, " "support for extracting partition columns." .format(self.__class__.__name__) )
[docs] @quirk_docs('_table_head') def table_head(self, table, n=10, renew=True, **kwargs): """ Retrieve the first `n` rows from a table. Args: table (str): The table from which to extract data. n (int): The number of rows to extract. renew (bool): Whether to renew the table list or use cached results (default: True). **kwargs (dict): Additional arguments passed through to implementation. Returns: pandas.DataFrame: A dataframe representation of the first `n` rows of the nominated table. """ return self._table_head(table=self._parse_namespaces(table), n=n, renew=renew, **kwargs)
@abstractmethod def _table_head(self, table, n=10, **kwargs): pass
[docs] @quirk_docs('_table_props') def table_props(self, table, renew=True, **kwargs): """ Retrieve the properties associated with a table. Args: table (str): The table from which to extract data. renew (bool): Whether to renew the table list or use cached results (default: True). **kwargs (dict): Additional arguments passed through to implementation. Returns: pandas.DataFrame: A dataframe representation of the table properties. """ return self._table_props(table=self._parse_namespaces(table), renew=renew, **kwargs)
@abstractmethod def _table_props(self, table, **kwargs): pass @override def _register_magics(self, base_name): """ The following magic functions will be registered (assuming that the base name is chosen to be 'hive'): - Cell Magics: - `%%hive`: For querying the database. - `%%hive.execute`: For executing a statement against the database. - `%%hive.stream`: For executing a statement against the database, and streaming the results. - `%%hive.template`: The defining a new template. - `%%hive.render`: Render a provided query statement. - Line Magics: - `%hive`: For querying the database using a named template. - `%hive.execute`: For executing a named template statement against the database. - `%hive.stream`: For executing a named template against the database, and streaming the results. - `%hive.render`: Render a provided a named template. - `%hive.desc`: Describe the table nominated. - `%hive.head`: Return the first rows in a specified table. - `%hive.props`: Show the properties specified for a nominated table. Documentation for these magics is provided online. """ from IPython import get_ipython from IPython.core.magic import register_line_magic, register_cell_magic, register_line_cell_magic def statement_executor_magic(executor, statement, variable=None, show='head', transpose=False, template=True, context=None, **kwargs): ip = get_ipython() if context is None: context = ip.user_ns # Line magic if statement is None: return self.query_from_template(variable, context=context, **kwargs) # Cell magic result = getattr(self, executor)(statement, template=template, context=context, **kwargs) if variable is not None: ip.user_ns[variable] = result if executor != 'query': if variable is None: return result return elif variable is None: return result format = kwargs.get('format', self.DEFAULT_CURSOR_FORMATTER) if show == 'head': show = 10 if isinstance(show, int): r = result.head(show) if format == 'pandas' else result[:show] elif show == 'all': r = result elif show in (None, 'none'): return None else: raise ValueError("Omniduct does not recognise the argument show='{0}' in cell magic.".format(show)) if format == 'pandas' and transpose: return r.T return r @register_line_cell_magic(base_name) @process_line_cell_arguments def query_magic(*args, **kwargs): return statement_executor_magic('query', *args, **kwargs) @register_line_cell_magic("{}.{}".format(base_name, 'execute')) @process_line_cell_arguments def execute_magic(*args, **kwargs): return statement_executor_magic('execute', *args, **kwargs) @register_line_cell_magic("{}.{}".format(base_name, 'stream')) @process_line_cell_arguments def stream_magic(*args, **kwargs): return statement_executor_magic('stream', *args, **kwargs) @register_cell_magic("{}.{}".format(base_name, 'template')) @process_line_arguments def template_add(body, name): self.template_add(name, body) @register_line_cell_magic("{}.{}".format(base_name, 'render')) @process_line_cell_arguments def template_render_magic(body=None, name=None, context=None, show=True, cleanup=False, meta_only=False): ip = get_ipython() if body is None: assert name is not None, "Name must be specified in line-mode." rendered = self.template_render( name, context=context or ip.user_ns, by_name=True, cleanup=cleanup, meta_only=meta_only ) else: rendered = self.template_render( body, context=context or ip.user_ns, by_name=False, cleanup=cleanup, meta_only=meta_only ) if name is not None: ip.user_ns[name] = rendered if show: print(rendered) else: return rendered @register_line_magic("{}.{}".format(base_name, 'desc')) @process_line_arguments def table_desc(table_name, **kwargs): return self.table_desc(table_name, **kwargs) @register_line_magic("{}.{}".format(base_name, 'head')) @process_line_arguments def table_head(table_name, **kwargs): return self.table_head(table_name, **kwargs) @register_line_magic("{}.{}".format(base_name, 'props')) @process_line_arguments def table_props(table_name, **kwargs): return self.table_props(table_name, **kwargs)