Source code for omniduct.duct

import atexit
import functools
import getpass
import inspect
import os
import pwd
import re
from abc import abstractmethod
from builtins import input
from enum import Enum

import six
from future.utils import raise_with_traceback, with_metaclass
from interface_meta import InterfaceMeta, quirk_docs

from omniduct.errors import DuctProtocolUnknown, DuctServerUnreachable
from omniduct.utils.debug import logger, logging_scope
from omniduct.utils.dependencies import check_dependencies
from omniduct.utils.ports import is_port_bound, naive_load_balancer

[docs]class Duct(with_metaclass(InterfaceMeta, object)): """ The abstract base class for all protocol implementations. This class defines the basic lifecycle of service connections, along with some magic that provides automatic registration of Duct protocol implementations. All connections made by `Duct` instances are lazy, meaning that instantiation is "free", and no protocol connections are made until required by subsequent interactions (i.e. when the value of any attribute in the list of `connection_fields` is accessed). All `Ducts` will automatically connnect and disconnect as required, and so manual intervention is not typically required to maintain connections. """ __doc_attrs = """ protocol (str): The name of the protocol for which this instance was created (especially useful if a `Duct` subclass supports multiple protocols). name (str): The name given to this `Duct` instance (defaults to class name). host (str): The host name providing the service (will be '', if service is port forwarded from remote; use `._host` to see remote host). port (int): The port number of the service (will be the port-forwarded local port, if relevant; for remote port use `._port`). username (str, bool): The username to use for the service. password (str, bool): The password to use for the service. registry (None, omniduct.registry.DuctRegistry): A reference to a `DuctRegistry` instance for runtime lookup of other services. remote (None, omniduct.remotes.base.RemoteClient): A reference to a `RemoteClient` instance to manage connections to remote services. cache (None, omniduct.caches.base.Cache): A reference to a `Cache` instance to add support for caching, if applicable. connection_fields (tuple<str>, list<str>): A list of instance attributes to monitor for changes, whereupon the `Duct` instance should automatically disconnect. By default, the following attributes are monitored: 'host', 'port', 'remote', 'username', and 'password'. prepared_fields (tuple<str>, list<str>): A list of instance attributes to be populated (if their values are callable) when the instance first connects to a service. Refer to `Duct.prepare` and `Duct._prepare` for more details. By default, the following attributes are prepared: '_host', '_port', '_username', and '_password'. Additional attributes including `host`, `port`, `username` and `password` are documented inline. Class Attributes: AUTO_LOGGING_SCOPE (bool): Whether this class should be used by omniduct logging code as a "scope". Should be overridden by subclasses as appropriate. DUCT_TYPE (Duct.Type): The type of `Duct` service that is provided by this Duct instance. Should be overridden by subclasses as appropriate. PROTOCOLS (list<str>): The name(s) of any protocols that should be associated with this class. Should be overridden by subclasses as appropriate. """ __doc_cls_attrs__ = None INTERFACE_SKIPPED_NAMES = {'__init__', '_init'}
[docs] class Type(Enum): """ The `Duct.Type` enum specifies all of the permissible values of `Duct.DUCT_TYPE`. Also determines the order in which ducts are loaded by DuctRegistry. """ REMOTE = 'remotes' FILESYSTEM = 'filesystems' CACHE = 'caches' RESTFUL = 'rest_clients' DATABASE = 'databases' OTHER = 'other'
[docs] def __init__(self, protocol=None, name=None, registry=None, remote=None, host=None, port=None, username=None, password=None, cache=None, cache_namespace=None): """ protocol (str, None): Name of protocol (used by Duct registries to inform Duct instances of how they were instantiated). name (str, None): The name to used by the `Duct` instance (defaults to class name if not specified). registry (DuctRegistry, None): The registry to use to lookup remote and/or cache instance specified by name. remote (str, RemoteClient): The remote by which the ducted service should be contacted. host (str): The hostname of the service to be used by this client. port (int): The port of the service to be used by this client. username (str, bool, None): The username to authenticate with if necessary. If True, then users will be prompted at runtime for credentials. password (str, bool, None): The password to authenticate with if necessary. If True, then users will be prompted at runtime for credentials. cache(Cache, None): The cache client to be attached to this instance. Cache will only used by specific methods as configured by the client. cache_namespace(str, None): The namespace to use by default when writing to the cache. """ check_dependencies([protocol]) self.protocol = protocol = name or self.__class__.__name__ self.registry = registry self.remote = remote = host self.port = port self.username = username self.password = password self.cache = cache self.cache_namespace = cache_namespace self.connection_fields = ('host', 'port', 'remote', 'username', 'password') self.prepared_fields = ('_host', '_port', '_username', '_password') atexit.register(self.disconnect) self.__prepared = False self.__getting = False self.__connected = False self.__disconnecting = False self.__cached_auth = {} self.__prepreparation_values = {}
@classmethod def __register_implementation__(cls): if not hasattr(cls, '_protocols'): cls._protocols = {} cls._protocols[cls.__name__] = cls registry_keys = getattr(cls, 'PROTOCOLS', []) or [] if registry_keys: for key in registry_keys: if key in cls._protocols and cls.__name__ != cls._protocols[key].__name__:"Ignoring attempt by class `{}` to register key '{}', which is already registered for class `{}`.".format(cls.__name__, key, cls._protocols[key].__name__)) else: cls._protocols[key] = cls
[docs] @classmethod def for_protocol(cls, protocol): """ Retrieve a `Duct` subclass for a given protocol. Args: protocol (str): The protocol of interest. Returns: functools.partial object: The appropriate class for the provided, partially constructed with the `protocol` keyword argument set appropriately. Raises: DuctProtocolUnknown: If no class has been defined that offers the named protocol. """ if protocol not in cls._protocols: raise DuctProtocolUnknown("Missing `Duct` implementation for protocol: '{}'.".format(protocol)) return functools.partial(cls._protocols[protocol], protocol=protocol)
@property def __prepare_triggers(self): return ( ('cache',) + object.__getattribute__(self, 'connection_fields') ) @classmethod def __init_with_kwargs__(cls, self, kwargs, **fallbacks): if not hasattr(self, '_Duct__inited_using_kwargs'): self._Duct__inited_using_kwargs = {} for cls_parent in reversed([ parent for parent in inspect.getmro(cls) if issubclass(parent, Duct) and parent not in self._Duct__inited_using_kwargs and '__init__' in parent.__dict__ ]): self._Duct__inited_using_kwargs[cls_parent] = True if six.PY3: argspec = inspect.getfullargspec(cls_parent.__init__) keys = argspec.args[1:] + argspec.kwonlyargs else: keys = inspect.getargspec(cls_parent.__init__).args[1:] params = {} for key in keys: if key in kwargs: params[key] = kwargs.pop(key) elif key in fallbacks: params[key] = fallbacks[key] cls_parent.__init__(self, **params) def __getattribute__(self, key): try: if (not object.__getattribute__(self, '_Duct__prepared') and not object.__getattribute__(self, '_Duct__getting') and not object.__getattribute__(self, '_Duct__disconnecting') and key in object.__getattribute__(self, '_Duct__prepare_triggers')): object.__setattr__(self, '_Duct__getting', True) object.__getattribute__(self, 'prepare')() object.__setattr__(self, '_Duct__getting', False) except AttributeError: pass except Exception as e: object.__setattr__(self, '_Duct__getting', False) raise_with_traceback(e) return object.__getattribute__(self, key) def __setattr__(self, key, value): try: if (getattr(self, '_Duct__prepared', False) and getattr(self, 'connection_fields', None) and key in self.connection_fields and self.is_connected()): logger.warn('Disconnecting prior to changing field that connection is based on: {}.'.format(key)) self.disconnect() self.__prepared = False except AttributeError: pass object.__setattr__(self, key, value)
[docs] @quirk_docs('_prepare') def prepare(self): """ Prepare a Duct subclass for use (if not already prepared). This method is called before the value of any of the fields referenced in `self.connection_fields` are retrieved. The fields include, by default: 'host', 'port', 'remote', 'cache', 'username', and 'password'. Subclasses may add or subtract from these special fields. When called, it first checks whether the instance has already been prepared, and if not calls `_prepare` and then records that the instance has been successfully prepared. """ if not self.__prepared: self._prepare() self.__prepared = True
[docs] def _prepare(self): """ This method may be overridden by subclasses, but provides the following default behaviour: - Ensures `self.registry`, `self.remote` and `self.cache` values are instances of the right types. - It replaces string values of `self.remote` and `self.cache` with remotes and caches looked up using `self.registry.lookup`. - It looks through each of the fields nominated in `self.prepared_fields` and, if the corresponding value is callable, sets the value of that field to result of calling that value with a reference to `self`. By default, `prepared_fields` contains '_host', '_port', '_username', and '_password'. - Ensures value of self.port is an integer (or None). """ # Import necessary classes lazily (to prevent dependency cycles) from omniduct.registry import DuctRegistry from omniduct.caches.base import Cache from omniduct.remotes.base import RemoteClient # Check registry is of an appropriate type (if present) assert (self.registry is None) or isinstance(self.registry, DuctRegistry), "Provided registry is not an instance of `omniduct.registry.DuctRegistry`." # If registry is present, lookup remotes and caches if necessary if self.registry is not None: if self.remote and isinstance(self.remote, six.string_types): self.__prepreparation_values['remote'] = self.remote self.remote = self.registry.lookup(self.remote, kind=Duct.Type.REMOTE) if self.cache and isinstance(self.cache, six.string_types): self.__prepreparation_values['cache'] = self.cache self.cache = self.registry.lookup(self.cache, kind=Duct.Type.CACHE) # Check if remote and cache objects are of correct type (if present) assert (self.remote is None) or isinstance(self.remote, RemoteClient), "Provided remote is not an instance of `omniduct.remotes.base.RemoteClient`." assert (self.cache is None) or isinstance(self.cache, Cache), "Provided cache is not an instance of `omniduct.caches.base.Cache`." # Replace prepared fields with the result of calling existing values # with a reference to `self`. for field in self.prepared_fields: value = getattr(self, field) if hasattr(value, '__call__'): self.__prepreparation_values[field] = value setattr(self, field, value(self)) if isinstance(self._host, (list, tuple)): if '_host' not in self.__prepreparation_values: self.__prepreparation_values['_host'] = self._host self._host = naive_load_balancer(self._host, port=self._port) # If host has a port included in it, override the value of self._port if self._host is not None and re.match(r'[^\:]+:[0-9]{1,5}', self._host): self._host, self._port = self._host.split(':') # Ensure port is an integer value self.port = int(self._port) if self._port else None
[docs] def reset(self): """ Reset this `Duct` instance to its pre-preparation state. This method disconnects from the service, resets any temporary authentication and restores the values of the attributes listed in `prepared_fields` to their values as of when `Duct.prepare` was called. Returns: `Duct` instance: A reference to this object. """ self.disconnect() self.__cached_auth = {} for key, value in self.__prepreparation_values.items(): setattr(self, key, value) self.__prepreparation_values = {} self.__prepared = False return self
@property def host(self): """ str: The host name providing the service, or '' if `self.remote` is not `None`, whereupon the service will be port-forwarded locally. You can view the remote hostname using `duct._host`, and change the remote host at runtime using: ` = '<host>'`. """ if self.remote: return '' # TODO: Make this configurable. return self._host @host.setter def host(self, host): self._host = host @property def port(self): """ int: The local port for the service. If `self.remote` is not `None`, the port will be port-forwarded from the remote host. To see the port used on the remote host refer to `duct._port`. You can change the remote port at runtime using: `duct.port = <port>`. """ if self.remote: return self.remote.port_forward('{}:{}'.format(self._host, self._port)) return self._port @port.setter def port(self, port): self._port = port @property def username(self): """ str: Some services require authentication in order to connect to the service, in which case the appropriate username can be specified. If not specified at instantiation, your local login name will be used. If `True` was provided, you will be prompted to type your username at runtime as necessary. If `False` was provided, then `None` will be returned. You can specify a different username at runtime using: `duct.username = '<username>'`. """ if self._username is True: if 'username' not in self.__cached_auth: self.__cached_auth['username'] = input("Enter username for '{}':".format( return self.__cached_auth['username'] elif self._username is False: return None elif not self._username: try: username = os.getlogin() except OSError: username = pwd.getpwuid(os.geteuid()).pw_name return username return self._username @username.setter def username(self, username): self._username = username @property def password(self): """ str: Some services require authentication in order to connect to the service, in which case the appropriate password can be specified. If `True` was provided at instantiation, you will be prompted to type your password at runtime when necessary. If `False` was provided, then `None` will be returned. You can specify a different password at runtime using: `duct.password = '<password>'`. """ if self._password is True: if 'password' not in self.__cached_auth: self.__cached_auth['password'] = getpass.getpass("Enter password for '{}':".format( return self.__cached_auth['password'] elif self._password is False: return None return self._password @password.setter def password(self, password): self._password = password def __assert_server_reachable(self): if is not None or self.port is not None: if is None: raise ValueError("Port specified but no host provided.") if self.port is None: raise ValueError("Host specified but no port specified.") else: return if not is_port_bound(, self.port): if self.remote and not self.remote.is_port_bound(self._host, self._port): self.disconnect() raise DuctServerUnreachable( "Remote '{}' cannot connect to '{}:{}'. Please check your settings before trying again.".format(, self._host, self._port)) elif not self.remote: self.disconnect() raise DuctServerUnreachable( "Cannot connect to '{}:{}' on your current connection. Please check your connection before trying again.".format(, self.port)) # Connection
[docs] @logging_scope("Connecting") @quirk_docs('_connect') def connect(self): """ Connect to the service backing this client. It is not normally necessary for a user to manually call this function, since when a connection is required, it is automatically created. Returns: `Duct` instance: A reference to the current object. """ if "Connecting to {host}:{port}{remote}.".format( host=self._host, port=self._port, remote="on {}".format( if self.remote else "" ) ) self.__assert_server_reachable() if not self.is_connected(): try: self._connect() except Exception as e: self.reset() raise_with_traceback(e) self.__connected = True if "Connected to {host}:{port}{remote}.".format( host=self._host, port=self._port, remote="on {}".format( if self.remote else "" ) ) return self
@abstractmethod def _connect(self): raise NotImplementedError
[docs] @quirk_docs('_is_connected') def is_connected(self): """ Check whether this `Duct` instances is currently connected. This method checks to see whether a `Duct` instance is currently connected. This is performed by verifying that the remote host and port are still accessible, and then by calling `Duct._is_connected`, which should be implemented by subclasses. Returns: bool: Whether this `Duct` instance is currently connected. """ if not self.__connected: return False if self.remote: if not self.remote.has_port_forward(self._host, self._port): return False elif not is_port_bound(, self.port): self.disconnect() return False return self._is_connected()
@abstractmethod def _is_connected(self): raise NotImplementedError
[docs] @quirk_docs('_disconnect') def disconnect(self): """ Disconnect this client from backing service. This method is automatically called during reconnections and/or at Python interpreter shutdown. It first calls `Duct._disconnect` (which should be implemented by subclasses) and then notifies the `RemoteClient` subclass, if present, to stop port-forwarding the remote service. Returns: `Duct` instance: A reference to this object. """ if not self.__prepared: return self.__disconnecting = True self.__connected = False try: self._disconnect() if self.remote and self.remote.has_port_forward(self._host, self._port):'Freeing up local port {0}...'.format(self.port)) self.remote.port_forward_stop(local_port=self.port) finally: self.__disconnecting = False return self
@abstractmethod def _disconnect(self): raise NotImplementedError
[docs] def reconnect(self): """ Disconnects, and then reconnects, this client. Note: This is equivalent to `duct.disconnect().connect()`. Returns: `Duct` instance: A reference to this object. """ return self.disconnect().connect()