Source code for omniduct.registry

import six
import yaml

from omniduct.duct import Duct
from omniduct.errors import DuctNotFound, DuctProtocolUnknown
from omniduct.utils.debug import logger
from omniduct.utils.magics import MagicsProvider
from omniduct.utils.proxies import TreeProxy


[docs]class DuctRegistry(object): """ A convenient registry for `Duct` instances. This class provides a simple interface to a pool of configured services, allowing convenient lookups of available services and the creation of new ones. It also allows for the batch creation of services from a shared configuration, which is especially useful in a company deployment. """
[docs] def __init__(self, config=None): """ Args: config (iterable, dict, str, None): Refer to `.import_from_config` for more details (default: `None`). """ self._registry = {} if config: self.register_from_config(config)
def __repr__(self): return "<DuctRegistry with {} registered ducts>".format(len(self._registry)) # Registration methods
[docs] def register(self, duct, name=None, override=False, register_magics=True): """ Register an existing Duct instance into the registry. Names of ducts can consist of any valid Python identifier, and multiple names can be provided as a comma separated list in which case the names will be aliases referring to the same `Duct` instance. Keep in mind that any name must uniquely identify one `Duct` instance. Args: duct (Duct): The `Duct` instance to be registered. name (str): An optional name to use when registering. If not provided this will fall back to `duct.name`. If neither is configured, an error will be thrown. Name can be a comma-separated list of names, in which case the names are aliases and will point to the same `Duct` instance. override (bool): Whether to override any existing `Duct` instance of the same name. If `False`, any overrides will result in an exception. Returns: Duct: The `Duct` instance being registered. """ name = name or duct.name if name is None: raise ValueError("`Duct` instances must be named to be registered. Please either specify a name to this method call, or add a name to the Duct using `duct.name = '...'`.") names = [n.strip() for n in name.split(',')] for name in names: if name in self._registry and not override: raise ValueError("`Duct` with the same name ('{}') already present in the registry. Please pass `override=True` if you want to override the existing instance, or `name='...'` to specify a new name.".format(name)) if register_magics and isinstance(duct, MagicsProvider): duct.register_magics(base_name=name) self._registry[name] = duct return duct
[docs] def new(self, name, protocol, override=False, register_magics=True, **kwargs): """ Create a new service and register it into the registry. Args: name (str): The name (or names) of the target service. If multiple aliases are to be used, names should be a comma separated list. See `.register` for more details. protocol (str): The protocol of the new service. override (bool): Whether to override any existing `Duct` instance of the same name. If `False`, any overrides will result in an exception. register_magics (bool): Whether to register the magics if running in and IPython session (default: `True`). **kwargs (dict): Additional arguments to pass to the constructor of the class associated with the nominated protocol. Returns: Duct: The `Duct` instance registered into the registry. """ return self.register( Duct.for_protocol(protocol)( name=name.split(',')[0].strip(), registry=self, **kwargs ), name=name, override=override, register_magics=register_magics )
# Inspection and retrieval methods @property def names(self): """list: The names of all ducts in the registry.""" return sorted(self._registry.keys()) def __getitem__(self, name): return self._registry[name] def __contains__(self, name): return name in self._registry def __iter__(self): return iter(self._registry.values())
[docs] def lookup(self, name, kind=None): """ Look up an existing registered `Duct` by name and (optionally) kind. Args: name (str): The name of the `Duct` instance. kind (str, Duct.Type): The kind of `Duct` to which the lookup should be restricted. Returns: `Duct`: The looked up `Duct` instance. Raises: DuctNotFound: If no `Duct` can be found for requested name and/or type. """ if kind and not isinstance(kind, Duct.Type): kind = Duct.Type(kind) if name not in self._registry: raise DuctNotFound(name) duct = self._registry[name] if kind and duct.DUCT_TYPE != kind: raise DuctNotFound("Duct named '{}' exists, but is not of kind '{}'.".format(name, kind.value)) return duct
# Exposing `Duct` instances.
[docs] def populate_namespace(self, namespace=None, names=None, kinds=None): """ Populate a nominated namespace with references to a subset of ducts. While a registry object is a great way to store and configure `Duct` instances, it is sometimes desirable to surface frequently used instances in other more convenient namespaces (such as the globals of your module). Args: namespace (dict, None): The namespace to populate. If using from a module you can pass `globals()`. If `None`, a new dictionary is created, populated and then returned. names (list<str>, None): The names to include in the population. If not specified then all names will be exported. kinds (list<str>, None): The kinds of ducts to include in the population. If not specified, all kinds will be exported. Returns: dict: The populated namespace. """ if namespace is None: namespace = {} if kinds is not None: kinds = [Duct.Type(kind) if not isinstance(kind, Duct.Type) else kind for kind in kinds] for name, duct in self._registry.items(): if (kinds is None or duct.DUCT_TYPE in kinds) and (names is None or name in names): namespace[name.split('/')[-1]] = duct return namespace
[docs] def get_proxy(self, by_kind=True): """ Return a structured proxy object for easy exploration of services. This method returns a proxy object to the registry upon which the `Duct` instances are available as attributes. This object is also by default structured such that one first accesses an attribute associated with a kind, which makes larger collections of services more easily navigatable. For example, if you have `DatabaseClient` subclass registered as 'my_service', you could access it on the proxy using: >>> proxy = registry.get_proxy(by_kind=True) >>> proxy.databases.my_service Args: by_kind (bool): Whether to nest proxy of `Duct` instances by kind. Returns: ServicesProxy: The proxy object. """ def key_parser(k, v): keys = k.split('/') if by_kind and getattr(v, 'DUCT_TYPE', None) is not None: keys.insert(0, v.DUCT_TYPE.value) return keys dct = self._registry.copy() dct['registry'] = self return TreeProxy._for_dict(dct, key_parser=key_parser, name='services')
# Batch registration of duct configurations
[docs] def register_from_config(self, config, override=False): """ Register a collection of Duct service configurations. The configuration format must be one of the following: - An iterable sequence of dictionaries containing a mapping between the keyword arguments required to instantiate the `Duct` subclass. - A dictionary mapping names of `Duct` instances to dictionaries of keyword arguments. - A dictionary mapping Duct types ('databases', 'filesystems', etc) to mappings like those immediately above. - A string YAML representation of one of the above (with at least one newline character). - A string filename containing such a YAML representation. There are three special keyword arguments that are required by the `DuctRegistry` instance: - name: Should be present only in the configuration dictionary when config is provided as an iterable sequence of dictionaries. - protocol: Which specifies which `Duct` subclass to fetch. Failure to correctly set this will result in a warning and an ignoring of this configuration. - register_magics (optional): A boolean flag indicating whether to register any magics defined by this Duct class (default: True). Args: config (iterable, dict, str, None): A configuration specified in one of the above described formats. override (bool): Whether to override any existing `Duct` instance of the same name(s). If `False`, any overrides will result in an exception. """ # Extract configuration from a file if necessary, and then process it. if isinstance(config, six.string_types): if '\n' in config: config = yaml.safe_load(config) else: with open(config) as f: config = yaml.safe_load(f.read()) config = self._process_config(config) for duct_config in config: names = duct_config.pop('name') protocol = duct_config.pop('protocol') register_magics = duct_config.pop('register_magics', True) try: self.new(names, protocol, register_magics=register_magics, override=override, **duct_config) except DuctProtocolUnknown as e: logger.error("Failed to configure `Duct` instance(s) '{}'. {}".format("', '".join(names.split(',')), str(e))) return self
def _process_config(self, config, name=None): """ Coerce the configuration into a generator of dictionaries of keyword arguments; each corresponding to a duct instance. """ if isinstance(config, dict) and (name is not None or 'name' in config) and 'protocol' in config and not config.get('__OMNIDUCT_SKIP__', False): kwargs = config.copy() if 'name' not in config: kwargs['name'] = name yield kwargs elif isinstance(config, dict): for name, subconfig in config.items(): for config in self._process_config(subconfig, name=name): yield config elif isinstance(config, list): for subconfig in config: for config in self._process_config(subconfig): yield config