Source code for engine.util.cb
"""Callback management utilities."""
import inspect
[docs]def takes_args (func):
"""Determine whether the given function takes any arguments.
:return: ``True`` if the function can take arguments, or if the result could
not be determined (the argument is not a function, or not a
pure-Python function), else ``False``.
"""
try:
args, varargs, kwargs, defaults = inspect.getargspec(func)
except TypeError:
return True
want = 2 if inspect.ismethod(func) else 1
return varargs is not None or len(args) >= want
[docs]def wrap_fn (func):
"""Return a function that calls ``func``, possibly omitting arguments.
wrap_fn(func) -> wrapper
When ``wrapper`` is called, it calls ``func`` (and returns its return value),
but only passes any arguments on to ``func`` if it is determined that ``func``
takes any arguments (using :func:`takes_args`).
"""
pass_args = takes_args(func)
def wrapper (*args, **kwargs):
if pass_args:
return func(*args, **kwargs)
else:
return func()
return wrapper
[docs]class CbManager (object):
"""Simple callback manager."""
def __init__ (self):
# {given cb: cb to call}
self._cbs = {}
@property
def cbs (self):
"""Set-like container of registered callback functions."""
return self._cbs.viewkeys()
[docs] def cb (self, *cbs):
"""Register callbacks.
cb(*cbs) -> self
If a callback is determined not to be able to take arguments, it is not passed
any.
"""
self._cbs.update((cb, wrap_fn(cb)) for cb in cbs)
return self
[docs] def rm_cbs (self, *cbs):
"""Remove any number of callbacks from :attr:`cbs`.
rm_cbs(*cbs) -> self
Missing items are ignored.
"""
all_cbs = self._cbs
for cb in cbs:
if cb in all_cbs:
del all_cbs[cb]
return self
[docs] def call (self, *args, **kwargs):
"""Call all registered callbacks with the given arguments.
call(*args, **kwargs) -> results
:return: value returned by each callback, as ``{cb: result}``.
"""
return dict((cb, real_cb(*args, **kwargs))
for cb, real_cb in self._cbs.iteritems())
[docs]class GroupedCbManager (object):
"""Manage callbacks grouped by type.
GroupedCbManager([groups])
:arg groups: allowed groups; overrides the class's default :attr:`groups`.
A group identifier may be hashable object. Groups are equivalent if they
produce the same hash value.
"""
#: Set of groups available to place callbacks in. If methods are given any
#: other group identifier not in here, ``ValueError`` is raised.
groups = frozenset()
def __init__ (self, groups=None):
if groups is not None:
self.groups = groups
# {group: manager}
try:
self._cbs = dict((group, CbManager()) for group in self.groups)
except TypeError:
raise TypeError('invalid callback groups specified: {}'
.format(self.groups))
@property
def cbs (self):
"""Registered callback functions, as ``{group: callbacks}``."""
return dict((group, mgr.cbs) for group, mgr in self._cbs.iteritems())
def _group (self, group):
# get callback manager for the given group, else throw
if group in self.groups:
return self._cbs[group]
else:
raise ValueError('unknown callback group: {}'.format(group))
[docs] def cb (self, group, *cbs):
"""Register callbacks for a group.
cb(group, *cbs) -> self
If a callback is determined not to be able to take arguments, it is not passed
any.
"""
self._group(group).cb(*cbs)
return self
[docs] def rm_cbs (self, group, *cbs):
"""Remove any number of callbacks in a group from :attr:`cbs`.
rm_cbs(group, *cbs) -> self
Missing items are ignored.
"""
self._group(group).rm_cbs(*cbs)
return self
[docs] def call (self, group, *args, **kwargs):
"""Call all registered callbacks in a group with the given arguments.
call(group, *args, **kwargs) -> results
:return: value returned by each callback, as ``{cb: result}``.
"""
return self._group(group).call(*args, **kwargs)