Source code for pystellibs.helpers

"""
This is a first collection of tools making the design easier
"""
import sys
from functools import partial, wraps, update_wrapper
from inspect import getargspec, ismethod
import warnings
import numpy as np
import itertools

from .ezunits import unit, hasUnit


__all__ = ['NameSpace', 'Pipe', 'Pipeable', 'Pipegroup', 'chunks',
           'deprecated', 'generator', 'isNestedInstance', 'keywords_first',
           'kfpartial', 'merge_records', 'missing_units_warning', 'nbytes',
           'path_of_module', 'pretty_size_print', 'type_checker',
           'val_in_unit']


[docs]class NameSpace(dict): """A dict subclass that exposes its items as attributes. """ def __init__(self, name, *args, **kwargs): self.__name__ = name dict.__init__(self, *args, **kwargs) def __dir__(self): return tuple(self) def __repr__(self): names = ', '.join([k for k in dir(self) if k[0] != '_']) return "{s.__name__:s}: {r:s}".format(s=self, r=names) def __getattribute__(self, name): try: return self[name] except KeyError: msg = "'{s.__name__:s}' has no attribute '{name:s}'" raise AttributeError(msg.format(s=self, name=name)) def __setattr__(self, name, value): self[name] = value def __delattr__(self, name): del self[name] def __enter__(self): return self def __exit__(self, *args, **kwargs): return False
[docs]def generator(func): """ A dummy decorator that only make codes mode readable. It allow to explicitly mark a function as generator (yielding values) and does nothing more than calling the initial function """ @wraps(func) def deco(*args, **kwargs): return func(*args, **kwargs) return deco
[docs]def deprecated(func): """ A dummy decorator that warns against using a deprecated function """ @wraps(func) def deco(*args, **kwargs): txt = 'Function {0:s} is deprecated. You should avoid its usage' warnings.warn(txt.format(func.__name__)) return func(*args, **kwargs) return deco
[docs]@generator def chunks(l, n): """ Yield successive n-sized chunks from l. Parameters ---------- l: iterable object to iter over n: int number of elements per slice Returns ------- chunk: tuple n values from l """ it = iter(l) while True: chunk = tuple(itertools.islice(it, n)) if chunk: yield chunk else: raise StopIteration
[docs]def isNestedInstance(obj, cl): """ Test for sub-classes types I could not find a universal test Parameters ---------- obj: object instance object to test cl: Class top level class to test returns ------- r: bool True if obj is indeed an instance or subclass instance of cl """ tree = [ cl ] if hasattr(cl, '__subclasses'): for k in cl.__subclasses(): if hasattr(k, '__subclasses'): tree += k.__subclasses__() return issubclass(obj.__class__, tuple(tree))
[docs]def type_checker(name, obj, tp): """ Check a given type and raise a type error if not correct Parameters ---------- name: str name of the variable to show in the exception text obj: object object to check tp: type expected type of obj Raises ------ :exc:TypeError: raises a TypeError if object is not of the correct type of a subclass of it """ if not isNestedInstance(obj, tp): txt = 'Expected "{0:s}" of type {1:s}, got {2:s} instead.' raise TypeError(txt.format(name, str(tp.__name__), str(type(obj).__name__)))
[docs]class Pipeable(object): """ Decorator overloading | operator (__ror__) such that you can pipe functions where the first argument is the variable on the left side of the | operator. This decorator allows you to use the decorated function normally and uses the provided values when using in pipes. >>> import pylab as plt >>> _p = Pipeable(plt.plot, color='red', linestyle='--') >>> _p(range(10), 'o-') # works >>> range(10) | _p # will plot a red dashed line """ def __init__(self, func, *args, **kwargs): self.func = func self.args = args self.kwargs = kwargs def __ror__(self, lhs): return self.func(lhs, *self.args, **self.kwargs) def __call__(self, *args, **kwargs): return self.func(*args, **kwargs) def __repr__(self): return self.func.__repr__()
[docs]class Pipe(object): """ Decorator overloading | operator (__ror__) such that you can pipe functions where the first argument is the variable on the left side of the | operator. The difference with Pipeable is that you cannot use decorated function outside of pipes but you gain the possibility to update the calling parameters Used with keywords_first make this a powerful Task """ def __init__(self, func, *args, **kwargs): self.func = func self.args = args self.kwargs = kwargs update_wrapper(self, func) def __or__(self, other): if isinstance(other, Pipe): return Pipegroup( (self, other), mode='multi' ) def __and__(self, other): if isinstance(other, Pipe): return Pipegroup( (self, other) ) def __ror__(self, other): return self.func(other, *self.args, **self.kwargs) def __call__(self, *args, **kwargs): return Pipeable(self.func, *args, **kwargs) def __repr__(self): return 'Pipe: {}'.format(self.func.__repr__()) def __str__(self): return '{}'.format(self.func.__name__)
[docs]class Pipegroup(object): def __init__(self, pipes, mode='sequential'): self.pipes = list(pipes) self.mode = mode self.func = self def __len__(self): return len(self.pipes)
[docs] def seq_call(self, val, *args, **kwargs): r = self.pipes[0].func(val) if len(self) > 1: for pk in self.pipes[1:]: r = pk.func(r) return r
[docs] def append(self, other): self.pipes.append(other)
def __or__(self, other): if isinstance(other, Pipe): if self.mode in ['multi', '|']: self.append(other) return self else: return Pipegroup( (self, other), mode='multi') def __and__(self, other): if isinstance(other, Pipe): if self.mode in ['sequential', '&']: self.append(other) return self else: return Pipegroup( (self, other), mode='sequential')
[docs] def multi_call(self, vals, iter=True): return [ pk.func(vals) for pk in self.pipes ]
def __call__(self, val, *args, **kwargs): mode = kwargs.get('mode', self.mode).lower() if mode in ['sequential', '&']: return self.seq_call(val, *args, **kwargs) elif mode in ['multi', '|']: return self.multi_call(val, *args, **kwargs) else: raise NotImplemented def __ror__(self, other): return self(other) def __repr__(self): txt = 'Pipegroup: mode={},\n\t | '.format(self.mode) if self.mode == 'sequential': txt += ' & '.join([str(pk) for pk in self.pipes]) else: txt += '\n\t | '.join([str(pk) for pk in self.pipes]) return txt def __str__(self): if self.mode == 'sequential': delim = ' & ' else: delim = ' | ' return '({})'.format(delim.join([str(pk) for pk in self.pipes])).replace('Pipe: ', '')
[docs]def keywords_first(f): """ Decorator that enables to access any argument or keyword as a keyword """ # http://code.activestate.com/recipes/577922/ (r2) @wraps(f) def wrapper(*a, **k): a = list(a) for idx, arg in enumerate(getargspec(f).args, -ismethod(f)): if arg in k: if idx < len(a): a.insert(idx, k.pop(arg)) else: break return f(*a, **k) return wrapper
[docs]def kfpartial(fun, *args, **kwargs): """ Allows to create partial functions with arbitrary arguments/keywords """ return partial(keywords_first(fun), *args, **kwargs)
def warning_on_one_line(message, category, filename, lineno, file=None, line=None): return " {0:s}:{1:d} {2:s}:{3:s}".format(filename, lineno, category.__name__, str(message))
[docs]def missing_units_warning(name, defaultunit): """ Warn if any unit is missing Parameters ---------- name: str name of the variable defaultunit: str default unit definition Raises ------ warning: warnings.warn warn if units are assumed """ warnings.formatwarning = warning_on_one_line msg = 'Variable {0:s} does not have explicit units. Assuming `{1:s}`\n' # stacklevel makes the correct code reference warnings.warn(msg.format(name, defaultunit), stacklevel=4)
[docs]def val_in_unit(varname, value, defaultunit): """ check units and convert to defaultunit or create the unit information Parameters ---------- varname: str name of the variable value: value value of the variable, which may be unitless defaultunit: str default units is unitless Returns ------- quantity: ezunits.Quantity value with units Example ------- >>> r = 0.5 >>> print(val_in_unit('r', r, 'degree')) # UserWarning: Variable r does not have explicit units. Assuming `degree` <Quantity(0.5, 'degree')> >>> r = 0.5 * unit['degree'] >>> print(val_in_unit('r', r, 'degree')) <Quantity(0.5, 'degree')> """ if not hasUnit(value): missing_units_warning(varname, defaultunit) return value * unit[defaultunit] else: return value.to(defaultunit)
[docs]def merge_records(lst): """ generates a stack of records even with slightly different but compatible dtypes Parameters ---------- lst: sequence of np.recarray sequence of individual records Returns ------- val: np.recarray array of stacked records Note if if lst is empty, returns an empty list """ r = [] for rk in lst: r.append(rk.tolist()[0]) names = rk.dtype.names if len(r) > 0: return np.rec.fromrecords(r, names=names) else: return []
[docs]def path_of_module(mod=None): """ returns the definition code path of a given module, object or function If nothing is provided, the current frame will be query, i.e., the current working directory of the calling function. Parameters ---------- mod: module, class, function object to find the defintion if None, inspect.currentframe is used returns ------- path: str path of the definition """ import os import inspect if mod is None: mod = inspect.currentframe() return '/'.join(os.path.abspath(inspect.getfile(mod)).split('/')[:-1])
[docs]def pretty_size_print(num_bytes): """ Output number of bytes in a human readable format Parameters ---------- num_bytes: int number of bytes to convert returns ------- output: str string representation of the size with appropriate unit scale """ if num_bytes is None: return KiB = 1024 MiB = KiB * KiB GiB = KiB * MiB TiB = KiB * GiB PiB = KiB * TiB EiB = KiB * PiB ZiB = KiB * EiB YiB = KiB * ZiB if num_bytes > YiB: output = '%.3g YB' % (num_bytes / YiB) elif num_bytes > ZiB: output = '%.3g ZB' % (num_bytes / ZiB) elif num_bytes > EiB: output = '%.3g EB' % (num_bytes / EiB) elif num_bytes > PiB: output = '%.3g PB' % (num_bytes / PiB) elif num_bytes > TiB: output = '%.3g TB' % (num_bytes / TiB) elif num_bytes > GiB: output = '%.3g GB' % (num_bytes / GiB) elif num_bytes > MiB: output = '%.3g MB' % (num_bytes / MiB) elif num_bytes > KiB: output = '%.3g KB' % (num_bytes / KiB) else: output = '%.3g Bytes' % (num_bytes) return output
[docs]def nbytes(obj, pprint=False): """ return the number of bytes of the object, which includes size of nested structures Parameters ---------- obj: object object to find the size of pprint: bool, optional (default=False) if set, returns the result after calling pretty_size_print returns ------- num_bytes: int or str total number of bytes or human readable corresponding string """ num_bytes = sum(k.nbytes if hasattr(k, 'nbytes') else sys.getsizeof(k) for k in obj.__dict__.values()) if pprint: return pretty_size_print(num_bytes) else: return num_bytes