Source code for engine.evt.handler

"""An event handler, which stores events and passes Pygame events to them."""

import pygame as pg

from ..conf import conf
from . import inputs
from .evts import BaseEvent, Event
from . import conffile


[docs]class EventHandler (object): """Handles events. EventHandler(scheduler) :arg scheduler: :class:`sched.Scheduler <engine.sched.Scheduler>` instance to use for determining the current framerate. You probably want to call :meth:`normalise`, then call :meth:`update` every frame to process and progagate Pygame events and call callbacks. Some notes: - An event may be placed in a 'domain', which is represented by a string name. - Events are named or unnamed, and an :class:`EventHandler` acts like a ``dict`` of named events (only supports getting, setting and deleting items). - The ``'domain'`` name is reserved. - The ``__contains__`` method (``event in event_handler``) works for :class:`BaseEvent <engine.evt.evts.BaseEvent>` instances as well as names. """ def __init__ (self, scheduler): #: As passed to the constructor. self.scheduler = scheduler #: A ``set`` of domains that will receive relevant events. self.active_domains = set() #: A ``set`` of domains that have been disabled through #: :meth:`disable`. self.inactive_domains = set() self._evts_by_domain = {} #: A ``set`` of all registered unnamed events. self.evts = set() # {name: event} for named events; wrapped by this class like a dict self._named_evts = {} #: All inputs registered with events in this handler. self.inputs = set() # inputs prefiltered by Input.filters self._filtered_inputs = ('type', {inputs.UNFILTERABLE: set()}) # identifiers for initialised devices self._init_data = set() # all registered modifiers self._mods = {} #: Whether to capture the mouse cursor by centring it on the window #: every frame. You might also want to grab all input #: (``pygame.event.set_grab``). self.autocentre_mouse = False def __contains__ (self, item): return (item in self._named_evts or item in self.evts or item in self._named_evts.itervalues()) def __getitem__ (self, item): return self._named_evts[item] def __setitem__ (self, item, val): self.add(**{item: val}) def __delitem__ (self, item): self.rm(item)
[docs] def add (self, *evts, **named_evts): """Register events. add(*evts, **named_evts) -> unnamed Arguments are any number of events. Keyword arguments define named events with the key as the name. An event can be a :class:`BaseEvent <engine.evt.evts.BaseEvent>` instance, or a sequence of Pygame event IDs and functions to create an :class:`Event <engine.evt.evts.BaseEvent>` that listens for the given Pygame events and has the functions as callbacks. For example, :: handler.add( (pygame.KEYDOWN, f1, f2), (f3, pygame.KEYDOWN, f4, pygame.KEYUP) ) will register callbacks ``f1`` and ``f2`` for ``keydown`` events, and ``f3`` and ``f4`` for both ``keydown`` and ``keyup`` events. :return: a list of added unnamed events (positional arguments) (possibly created in this call). """ new_unnamed = [] unnamed = self.evts all_named = self._named_evts by_domain = self._evts_by_domain # extract domain from keyword args if 'domain' in named_evts: domain = named_evts['domain'] if domain is not None and not isinstance(domain, basestring): raise ValueError('invalid domain (or, \'domain\' is an ' 'invalid event name)') del named_evts['domain'] else: domain = None if domain not in by_domain: # domain doesn't exist yet by_domain[domain] = set() if domain is not None: self.active_domains.add(domain) for evts in (((None, evt) for evt in evts), named_evts.iteritems()): for name, evt in evts: if not isinstance(evt, BaseEvent): # got (possibly mixed) list of pgevts/cbs: create event pgevts = [] cbs = [] for item in evt: (cbs if callable(item) else pgevts).append(item) evt = Event(*(inputs.BasicInput(pgevt) for pgevt in pgevts)).cb(*cbs) if evt.eh is not None: if evt.eh is self: # already own this event prev_domain = evt._domain if domain != prev_domain: # change registered domain by_domain[prev_domain].remove(evt) if not by_domain[prev_domain]: del by_domain[prev_domain] evt._domain = domain by_domain[domain].add(evt) prev_name = evt._regname if name != prev_name: # change registered name if prev_name is None: unnamed.remove(evt) else: del all_named[prev_name] evt._regname = name if name is None: unnamed.add(evt) else: all_named[name] = evt else: # owned by another handler raise RuntimeError('an event should not be added to ' 'more than one EventHandler') else: # new event evt.eh = self evt._changed = False evt._domain = domain evt._regname = name by_domain[domain].add(evt) if name is None: unnamed.add(evt) new_unnamed.append(evt) else: all_named[name] = evt self._add_inputs(*evt.inputs) return new_unnamed
[docs] def rm (self, *evts): """Takes any number of registered event names or events to remove them. Raises ``KeyError`` if any arguments are missing. """ unnamed = self.evts named = self._named_evts by_domain = self._evts_by_domain active = self.active_domains inactive = self.inactive_domains for evt in evts: if isinstance(evt, basestring): # got name evt = named[evt] # raises KeyError if evt.eh is self: evt.eh = None domain = evt._domain by_domain[domain].remove(evt) if not by_domain[domain]: del by_domain[domain] if domain in active: active.remove(domain) else: inactive.remove(domain) evt._domain = None if evt._regname is None: unnamed.remove(evt) else: del named[evt._regname] evt._regname = None self._rm_inputs(*evt.inputs) else: raise KeyError(evt)
[docs] def cb (self, pos_cbs={}, **kw_cbs): """Attach callbacks to named events. Each dict has keys as event names and values as callback functions or sequences of callback functions. For example:: evthandler.cb({'jump': jump}, walk=[e.walk for e in entities]) """ for evt_cbs in (pos_cbs, kw_cbs): for evt_name, cbs in evt_cbs.iteritems(): if callable(cbs): cbs = [cbs] self[evt_name].cb(*cbs)
def _prefilter (self, filtered, filters, i): attr, filtered = filtered filters = dict(filters) # Input guarantees that this is non-empty vals = filters.pop(attr, (inputs.UNFILTERABLE,)) for val in vals: if val in filtered: child = filtered[val] else: # create new branch filtered[val] = child = set() # add input to child if isinstance(child, tuple): self._prefilter(child, filters, i) else: # reached the end of a branch: child is a set of inputs if filters: # create new levels for each remaining filter for attr, vals in filters.iteritems(): child = (attr, {inputs.UNFILTERABLE: child}) filtered[val] = child self._prefilter(child, filters, i) else: child.add(i) def _unprefilter (self, filtered, filters, i): filters = dict(filters) attr, filtered = filtered # Input guarantees that this is non-empty vals = filters.pop(attr, (inputs.UNFILTERABLE,)) for val in vals: assert val in filtered child = filtered[val] if isinstance(child, tuple): self._unprefilter(child, filters, i) child = child[1] else: # reached the end of a branch: child is a set of inputs assert i in child child.remove(i) if not child: # child is now empty if val is inputs.UNFILTERABLE: # retain the UNFILTERABLE branch filtered[val] = set() else: del filtered[val] if attr != 'type' and not any(filtered.itervalues()): # all branches are empty (but always retain the 'type' branch) filtered.clear() def _add_inputs (self, *inps): mods = self._mods inps = list(inps) while inps: i = inps.pop() if isinstance(i, BaseEvent): inps.extend(i.inputs) if i in self.inputs: # already added (might happen if events share an input) continue i._init() if isinstance(i, inputs.ButtonInput): # add mods, sorted by device and device ID for m in i.mods: added = False if m.device in inputs.mod_devices[i.device]: this_mods = (mods.setdefault(m.device, {}) .setdefault(i._device_id, {})) if m in this_mods: this_mods[m].add(i) # already added as an input else: this_mods[m] = set((i,)) if not added: added = True self._add_inputs(m) self.inputs.add(i) self._prefilter(self._filtered_inputs, i.filters, i) def _rm_inputs (self, *inps): mods = self._mods for i in inps: if i not in self.inputs: # already removed (might happen if events share an input) continue if isinstance(i, inputs.ButtonInput): for m in i.mods: rmd = False if m.device in inputs.mod_devices[i.device]: d1 = mods[m.device] d2 = d1[i._device_id] d3 = d2[m] assert i in d3 d3.remove(i) if not d3: del d2[m] if not rmd: rmd = True self._rm_inputs(m) if not d2: del d1[i._device_id] if not d1: del mods[m.device] self.inputs.remove(i) self._unprefilter(self._filtered_inputs, i.filters, i)
[docs] def update (self): """Process Pygame events and call callbacks.""" all_inputs = self._filtered_inputs mods = self._mods pgevts = pg.event.get() # centre mouse if self.autocentre_mouse: sfc = pg.display.get_surface() if sfc is not None: pg.mouse.set_pos(sfc.get_rect().center) # remove the Pygame event this sends pg.event.clear(pg.MOUSEMOTION) for pgevt in pgevts: # find matching inputs sources = [all_inputs] inps = [] while sources: source = sources.pop() if isinstance(source, tuple): attr, filtered = source if hasattr(pgevt, attr): val = getattr(pgevt, attr) if val in filtered: sources.append(filtered[val]) sources.append(filtered[inputs.UNFILTERABLE]) else: inps.append(source) for i in set().union(*inps): if i.handle(pgevt) and i.evt is not None: evt = i.evt while evt is not None: evt._changed = True evt = evt.evt # call callbacks by_domain = self._evts_by_domain for domains in ((None,), self.active_domains): for domain in domains: if domain is not None or domain in by_domain: for evt in by_domain[domain]: changed = evt._changed evt._changed = False evt.respond(changed)
[docs] def domains (self, *domains): """Get a set of all events in the given domains. domains(*domains) -> evts """ evts = set() for domain in domains: if domain is None: raise KeyError(domain) evts.update(self._evts_by_domain[domain]) # raises KeyError return evts
def _load_evts (self, evts, domain): # load events as parsed from config file if 'domain' in evts: raise ValueError('\'domain\' may not be used as an event name') evts['domain'] = domain self.add(**evts) return evts
[docs] def load (self, filename, domain = None): """Load events from a configuration file (see :mod:`conffile <engine.evt.conffile>`). load(filename, domain = None) -> evts :arg filename: a filename to load as the configuration file, under :data:`conf.EVT_DIR`. :arg domain: domain to place loaded events in. :return: ``{name: event}`` for loaded events. """ with open(conf.EVT_DIR + filename) as f: evts = conffile.parse(f) return self._load_evts(evts, domain)
[docs] def load_s (self, s, domain = None): """Load events from a configuration string (see :mod:`conffile <engine.evt.conffile>`). load_s(s, domain = None) -> evts :arg s: string to parse as an event configuration. :arg domain: domain to place loaded events in. :return: ``{name: event}`` for loaded events. """ return self._load_evts(conffile.parse_s(s), domain)
[docs] def save (self, name, *domains): """Not implemented.""" # save everything in the domains to file pass
[docs] def save_s (self, *domains): """Not implemented.""" pass
[docs] def unload (self, *domains): """Remove all events in the given domains. unload(*domains) -> evts :return: all removed events as ``(unnamed, named)``, like :meth:`domains`. Raises ``KeyError`` if a domain is missing. """ unnamed, named = self.domains(*domains) # raises KeyError # now all domains exist so we can safely make changes # this removes empty domains self.rm(*unnamed) self.rm(*named) return (unnamed, named)
[docs] def disable (self, *domains): """Disable event handling in all of the given domains. Missing or already disabled domains are ignored (a domain is missing if it is empty). """ active = self.active_domains inactive = self.inactive_domains for domain in domains: if domain in active: active.remove(domain) inactive.add(domain)
[docs] def enable (self, *domains): """Re-enable event handling in all of the given domains. Missing or already active domains are ignored. Beware that state is preserved, so buttons that were held when disabled remain held when enabled, no matter how much time has passed, without sending a :data:`DOWN <engine.evt.evts.bmode.DOWN>`. """ active = self.active_domains inactive = self.inactive_domains for domain in domains: if domain in inactive: inactive.remove(domain) active.add(domain)
[docs] def assign_devices (self, **devices): """Assign device IDs to inputs by device variable. :arg devices: keyword arguments with the argument name the variable and the value the new device ID for each input with this device variable. See :attr:`Input.device_var <engine.evt.inputs.Input.device_var>` and :attr:`Input.device_id <engine.evt.inputs.Input.device_id>` for details (including possible device ID values). """ for i in self.inputs: if i.device_var is not None and i.device_var in devices: i.device_id = devices[i.device_var]
[docs] def grab (self, cb, *types): """Not implemented.""" # grabs next button-type input from given devices/types and passes it to cb # types are device name or (device, type_name) (see inputs_by_name) # need to be able to track _every_ button-type input, so, eg. axes can be used pass
[docs] def normalise (self): """Determine and set states of all inputs, where possible. This includes axis positions, button held states, etc.. You should generally call this whenever you start using this event handler, either for the first time, or after a period of letting something else handle events. """ for i in self.inputs: i.normalise() for es in (self._named_evts.itervalues(), self.evts): for e in es: e._changed = True
[docs] def monitor_deadzones (self, *deadzones): """Not implemented.""" # takes list of (device, id, *args); do for all if none given pass
[docs] def stop_monitor_deadzones (self): """Not implemented.""" # returns {(device, id, attrs): deadzone}, attrs is required attribute values on the input (pad axis: {'axis': axis_input.axis}) # can register other deadzone events? pass
[docs] def set_deadzones (self, *deadzones): """Set deadzones for all registered inputs that support it. :attr deadzones: any number of ``((device, device_id=True, attrs={}), deadzone)`` tuples to set the ``deadzone`` attribute of each matching input to ``deadzone``. ``device_id`` may be a variable (:attr:`Input.device_var <engine.evt.inputs.Input.device_var>`) or non-string ID (:attr:`Input.device_id <engine.evt.inputs.Input.device_id>`). ``attrs`` is a dict of attributes the input must have. See also :attr:`Input.device_id <engine.evt.inputs.Input.device_id>`. An item may also be just ``(device, deadzone)``. """ for ident, dz in deadzones: if isinstance(ident, basestring): # just got a device ident = [ident] else: ident = list(ident) if len(ident) == 0: raise ValueError('invalid input identifier: empty sequence') if len(ident) == 1: # accept any device ID ident.append(True) if len(ident) == 2: # no more constraints ident.append({}) device, dev_id, attrs = ident got_var = isinstance(dev_id, basestring) for i in self.inputs: if got_var: match_dev_id = i.device_var == dev_id else: match_dev_id = (i.device_id is not None and (dev_id is True or i.device_id == dev_id)) if i.device == device and match_dev_id: if all(getattr(i, attr) == val for attr, val in attrs.iteritems()): try: getattr(i, 'deadzone') except AttributeError: # doesn't have a deadzone pass else: i.deadzone = dz