import datetime
import functools
import sys
from abc import abstractmethod
import dateutil
import pandas
import six
import yaml
from decorator import decorator
from interface_meta import quirk_docs
from omniduct.duct import Duct
from omniduct.utils.config import config
from omniduct.utils.debug import logger
from omniduct.utils.decorators import function_args_as_kwargs, require_connection
from ._serializers import PickleSerializer
config.register(
'cache_fail_hard',
description='Raise an exception if a cache fails to save (otherwise errors are logged and suppressed).',
default=False
)
[docs]def cached_method(
key,
namespace=lambda self, kwargs: (
self.cache_namespace or "{}.{}".format(self.__class__.__name__, self.name)
),
cache=lambda self, kwargs: self.cache,
use_cache=lambda self, kwargs: kwargs.pop('use_cache', True),
renew=lambda self, kwargs: kwargs.pop('renew', False),
serializer=lambda self, kwargs: PickleSerializer(),
metadata=lambda self, kwargs: None
):
"""
Wrap a method of a `Duct` class and add caching capabilities.
All arguments of this function are expected to be functions taking two
arguments: a reference to current instance of the class (`self`) and a
dictionary of arguments passed to the function (`kwargs`).
Args:
key (function -> str): The key under which the value returned by the
wrapped function should be stored.
namespace (function -> str): The namespace under which the key should be
stored (default: `"<duct class name>.<duct instance name>"`).
cache (function -> Cache): The instance of cache via which to store the
output of the wrapped function (default: `self.cache`).
use_cache (function -> bool): Whether or not to use the caching
functionality (default: `True`).
renew (function -> bool): Whether to renew the stored cache, overriding
if a value has already been stored (default: `False`).
serializer (function -> Serializer): The `Serializer` subclass to use
when storing the return object (default: `PickleSerializer`).
metadata (function -> None, dict): A dictionary of additional metadata
to be stored alongside the wrapped function's output
(default: `None`).
Returns:
object: The (potentially cached) object returned when calling the
wrapped function.
Raises:
Exception: If cache fails to store the output of the wrapped function,
and the omniduct configuration key `cache_fail_hard` is `True`, then
the underlying exceptions raised by the Cache instance will be
reraised.
"""
@decorator
def wrapped(method, self, *args, **kwargs):
kwargs = function_args_as_kwargs(method, self, *args, **kwargs)
kwargs.pop('self')
_key = key(self, kwargs)
_namespace = namespace(self, kwargs)
_cache = cache(self, kwargs)
_use_cache = use_cache(self, kwargs)
_renew = renew(self, kwargs)
_serializer = serializer(self, kwargs)
_metadata = metadata(self, kwargs)
if _cache is None or not _use_cache:
return method(self, **kwargs)
if _cache.has_key(_key, namespace=_namespace) and not _renew: # noqa: has_key is not of a dictionary here
try:
return _cache.get(
_key,
namespace=_namespace,
serializer=_serializer
)
except:
logger.warning("Failed to retrieve results from cache. Renewing the cache...")
if config.cache_fail_hard:
six.reraise(*sys.exc_info())
finally:
logger.caveat('Loaded from cache')
# Renewing/creating cache
value = method(self, **kwargs)
if value is None:
logger.warning("Method value returned None. Not saving to cache.")
return
try:
_cache.set(
_key,
value=value,
namespace=_namespace,
serializer=_serializer,
metadata=_metadata
)
# Return from cache every time, just in case serialization operation was
# destructive (e.g. reading from cursors)
return _cache.get(
_key,
namespace=_namespace,
serializer=_serializer
)
except:
logger.warning("Failed to save results to cache. If needed, please save them manually.")
if config.cache_fail_hard:
six.reraise(*sys.exc_info())
return value # As a last resort, return value object (which could be mutated by serialization).
return wrapped
[docs]class Cache(Duct):
"""
An abstract class providing the common API for all cache clients.
"""
DUCT_TYPE = Duct.Type.CACHE
[docs] @quirk_docs('_init', mro=True)
def __init__(self, **kwargs):
Duct.__init_with_kwargs__(self, kwargs)
self._init(**kwargs)
@abstractmethod
def _init(self):
pass
# Data insertion and retrieval
[docs] @require_connection
def set(self, key, value, namespace=None, serializer=None, metadata=None):
"""
Set the value of a key.
Args:
key (str): The key for which `value` should be stored.
value (object): The value to be stored.
namespace (str, None): The namespace to be used.
serializer (Serializer): The `Serializer` subclass to use for the
serialisation of value into the cache. (default=PickleSerializer)
metadata (dict, None): Additional metadata to be stored with the value
in the cache. Values must be serializable via `yaml.safe_dump`.
"""
namespace, key = self._namespace(namespace), self._key(key)
serializer = serializer or PickleSerializer()
try:
with self._get_stream_for_key(namespace, key, 'data{}'.format(serializer.file_extension), mode='wb', create=True) as fh:
serializer.serialize(value, fh)
self.set_metadata(key, metadata, namespace=namespace, replace=True)
except:
self.unset(key, namespace=namespace)
six.reraise(*sys.exc_info())
[docs] @require_connection
def get(self, key, namespace=None, serializer=None):
"""
Retrieve the value associated with the nominated key from the cache.
Args:
key (str): The key for which `value` should be retrieved.
namespace (str, None): The namespace to be used.
serializer (Serializer): The `Serializer` subclass to use for the
deserialisation of value from the cache. (default=PickleSerializer)
Returns:
object: The (appropriately deserialized) object stored in the cache.
"""
namespace, key = self._namespace(namespace), self._key(key)
serializer = serializer or PickleSerializer()
if not self._has_key(namespace, key):
raise KeyError("{} (namespace: {})".format(key, namespace))
try:
with self._get_stream_for_key(namespace, key, 'data{}'.format(serializer.file_extension), mode='rb', create=False) as fh:
return serializer.deserialize(fh)
finally:
self.set_metadata(key, namespace=namespace, metadata={'last_accessed': datetime.datetime.utcnow()})
[docs] @require_connection
def get_bytecount(self, key, namespace=None):
"""
Retrieve the number of bytes used by a stored key.
This bytecount may or may not include metadata storage, depending on
the backend.
Args:
key (str): The key for which to extract the bytecount.
namespace (str, None): The namespace to be used.
Returns:
int: The number of bytes used by the stored value associated with
the nominated key and namespace.
"""
namespace, key = self._namespace(namespace), self._key(key)
if not self._has_key(namespace, key):
raise KeyError("{} (namespace: {})".format(key, namespace))
return self._get_bytecount_for_key(namespace, key)
[docs] @require_connection
def unset(self, key, namespace=None):
"""
Remove the nominated key from the cache.
Args:
key (str): The key which should be unset.
namespace (str, None): The namespace to be used.
"""
namespace, key = self._namespace(namespace), self._key(key)
if not self._has_key(namespace, key):
raise KeyError("{} (namespace: {})".format(key, namespace))
self._remove_key(namespace, key)
[docs] @require_connection
def unset_namespace(self, namespace=None):
"""
Remove an entire namespace from the cache.
Args:
namespace (str, None): The namespace to be removed.
"""
namespace = self._namespace(namespace)
if not self._has_namespace(namespace):
raise KeyError("namespace: {}".format(namespace))
self._remove_namespace(namespace)
# Top-level descriptions
@property
@require_connection
def namespaces(self):
"list <str,None>: A list of the namespaces stored in the cache."
return self._get_namespaces()
[docs] @require_connection
def has_namespace(self, namespace=None):
"""
Check whether the cache has the nominated namespace.
Args:
namespace (str,None): The namespace for which to check for existence.
Returns:
bool: Whether the cache has the nominated namespaces.
"""
namespace = self._namespace(namespace)
return self._has_namespace(namespace)
[docs] @require_connection
def keys(self, namespace=None):
"""
Collect a list of all the keys present in the nominated namespaces.
Args:
namespace (str,None): The namespace from which to extract all of the
keys.
Returns:
list<str>: The keys stored in the cache for the nominated namespace.
"""
namespace = self._namespace(namespace)
return self._get_keys(namespace)
[docs] @require_connection
def has_key(self, key, namespace=None):
"""
Check whether the cache as a nominated key.
Args:
key (str): The key for which to check existence.
namespace (str,None): The namespace from which to extract all of the
keys.
Returns:
bool: Whether the cache has a value for the nominated namespace and
key.
"""
namespace, key = self._namespace(namespace), self._key(key)
return self._has_key(namespace, key)
[docs] def get_total_bytecount(self, namespaces=None):
"""
Retrieve the total number of bytes used by the cache.
This method iterates over all (nominated) namespaces and the keys
therein, summing the result of `.get_bytecount(...)` on each.
Args:
namespaces (list<str,None>): The namespaces to which the bytecount
should be restricted.
Returns:
int: The total number of bytes used by the nominated namespaces.
"""
total_bytes = 0
if namespaces is None:
namespaces = self.namespaces
for namespace in namespaces:
for key in self.keys(namespace=namespace):
total_bytes += self.get_bytecount(key, namespace=namespace)
return total_bytes
[docs] def describe(self, namespaces=None):
"""
Return a pandas DataFrame showing all keys and their metadata.
Args:
namespaces (list<str,None>): The namespaces to which the summary
should be restricted.
Returns:
pandas.DataFrame: A representation of keys in the cache. Will include
at least the following columns: ['bytes', 'namespace', 'key',
'created', 'last_accessed']. Any additional metadata for keys
will be appended to these columns.
"""
out = []
if namespaces is None:
namespaces = self.namespaces
for namespace in namespaces:
for key in self.keys(namespace=namespace):
usage = {
'bytes': self.get_bytecount(key, namespace=namespace),
'namespace': namespace,
'key': key,
'created': None,
'last_accessed': None
}
usage.update(self.get_metadata(key, namespace=namespace))
out.append(usage)
required_columns = ['bytes', 'namespace', 'key', 'created', 'last_accessed']
if out:
df = pandas.DataFrame(out)
order = required_columns + sorted(set(df.columns).difference(required_columns))
return (
df
.sort_values('last_accessed', ascending=False)
.reset_index(drop=True)
[order]
)
return pandas.DataFrame(
data=[],
columns=required_columns
)
# Cache pruning
[docs] def prune(self, namespaces=None, max_age=None, max_bytes=None, total_bytes=None, total_count=None):
"""
Remove keys from the cache in order to satisfy nominated constraints.
Args:
namespaces (list<str, None>): The namespaces to consider for pruning.
max_age (None, int, timedelta, relativedelta, date, datetime): The
number of days, a timedelta, or a relativedelta, indicating the
maximum age of items in the cache (based on last accessed date).
Deltas are expected to be positive.
max_bytes (None, int): The maximum number of bytes for *each* key,
allowing the pruning of larger keys.
total_bytes (None, int): The total number of bytes for the entire
cache. Keys will be removed from least recently accessed to most
recently accessed until the constraint is satisfied. This
constraint will be applied after max_age and max_bytes.
total_count (None, int): The maximum number of items to keep in the
cache. Keys will be removed from least recently accessed to most
recently accessed until the constraint is satisfied. This
constraint will be applied after max_age and max_bytes.
"""
usage = self.describe(namespaces=namespaces)
if usage.shape[0] == 0: # Abort early if the cache is empty (and hence has no index, which would cause problems later on)
return
constraints = []
# Unset keys according to per-key constraints
if max_age is not None:
if isinstance(max_age, int):
max_age = datetime.timedelta(max_age)
if isinstance(max_age, (datetime.timedelta, dateutil.relativedelta.relativedelta)):
max_age = datetime.datetime.now() - max_age
if not isinstance(max_age, (datetime.datetime, datetime.date)):
raise ValueError("Invalid type specified for `max_age`: {}".format(max_age.__repr__()))
constraints.append(usage.last_accessed < max_age)
if max_bytes is not None:
if not isinstance(max_bytes, int):
raise ValueError("Invalid type specified for `max_bytes`: {}".format(max_bytes.__repr__()))
constraints.append(usage.bytes > max_bytes)
if constraints:
to_unset = usage[functools.reduce(lambda x, y: x | y, constraints, False)]
for i, row in to_unset.iterrows():
logger.info("Unsetting key '{}' (namespace: '{}')...".format(row.key, row.namespace))
self.unset(row.key, namespace=row.namespace)
# Unset keys according to global constraints
if total_bytes is not None or total_count is not None:
if total_bytes is not None and not isinstance(total_bytes, int):
raise ValueError("Invalid type specified for `total_bytes`: {}".format(total_bytes.__repr__()))
if total_count is not None and not isinstance(total_count, int):
raise ValueError("Invalid type specified for `total_count`: {}".format(total_bytes.__repr__()))
usage = self.describe(namespaces=namespaces).assign(cum_bytes=lambda x: x.bytes.cumsum())
unset_index = total_count if total_count is not None else len(usage)
if total_bytes is not None:
unset_index = min(unset_index, usage.cum_bytes.searchsorted(total_bytes, side='right'))
for i, row in usage.loc[unset_index:].iterrows():
logger.info("Unsetting key '{}' (namespace: '{}')...".format(row.key, row.namespace))
self.unset(row.key, namespace=row.namespace)
# Methods for subclasses to implement
def _namespace(self, namespace):
return namespace
def _key(self, key):
return key
@abstractmethod
def _get_namespaces(self):
raise NotImplementedError
def _has_namespace(self, namespace):
return namespace in self._get_namespaces()
@abstractmethod
def _remove_namespace(self, namespace):
raise NotImplementedError
@abstractmethod
def _get_keys(self, namespace):
raise NotImplementedError
def _has_key(self, namespace, key):
return key in self._get_keys(namespace=namespace)
@abstractmethod
def _remove_key(self, namespace, key):
raise NotImplementedError
@abstractmethod
def _get_bytecount_for_key(self, namespace, key):
raise NotImplementedError
@abstractmethod
def _get_stream_for_key(self, namespace, key, stream_name, mode, create):
pass