# coding=utf-8
"""Event scheduler and interpolation."""
from time import time
from bisect import bisect
from math import cos, atan, exp
from random import randrange, expovariate
from functools import partial
from pygame.time import wait
from .conf import conf
from .util import ir, call_in_nest, bezier
from .util.cb import CbManager
def _match_in_nest (obj, x):
"""Check if every object in a data structure is equal to some given object.
_match_in_nest(obj, x)
obj: data structure to look in: an arbitrarily nested list of lists.
x: object to compare to (not a list or tuple).
"""
if isinstance(obj, (tuple, list)):
return all(_match_in_nest(o, x) == x for o in obj)
else:
return obj == x
def _cmp_structure (x, y):
"""Find whether the (nested list) structure of two objects is the same."""
is_list = isinstance(x, (tuple, list))
if is_list != isinstance(y, (tuple, list)):
# one is a list, one isn't
return False
elif is_list:
# both are lists: check length and contents
return len(x) == len(y) and \
all(_cmp_structure(xi, yi) for xi, yi in zip(x, y))
else:
# neither is a list
return True
[docs]def interp_linear (*waypoints):
"""Linear interpolation for :meth:`Scheduler.interp`.
interp_linear(*waypoints) -> f
:arg waypoints: each is ``(v, t)`` to set the value to ``v`` at time ``t``.
``t`` can be omitted for any but the last waypoint: the first
is ``0``, and other gaps are filled in with equal spacing.
``v`` is like the arguments taken by
:func:`call_in_nest <engine.util.call_in_nest>`, and we
interpolate for each number in the nested list structure of
``v``. Some objects in the ``v`` structures may be
non-numbers, in which case they will not be varied (maybe your
function takes another argument you don't want to vary);
objects may be ``None`` to always use the initial value in that
position.
:return: a function for which ``f(t) = v`` for every waypoint ``(v, t)``, with
intermediate values linearly interpolated between waypoints.
"""
# fill in missing times
vs = []
ts = []
last = waypoints[-1]
for w in waypoints:
if w is last or _cmp_structure(w, last):
vs.append(w[0])
ts.append(w[1])
else:
vs.append(w)
ts.append(None)
ts[0] = 0
# get groups with time = None
groups = []
group = None
for i, (v, t) in enumerate(zip(vs, ts)):
if t is None:
if group is None:
group = [i]
groups.append(group)
else:
if group is not None:
group.append(i)
group = None
# and assign times within those groups
for i0, i1 in groups:
t0 = ts[i0 - 1]
dt = float(ts[i1] - t0) / (i1 - (i0 - 1))
for i in xrange(i0, i1):
ts[i] = t0 + dt * (i - (i0 - 1))
interp_val = lambda r, v1, v2, v0: (r * (v2 - v1) + v1) \
if isinstance(v2, (int, float)) else v0
def val_gen ():
t = yield
while True:
# get waypoints we're between
i = bisect(ts, t)
if i == 0:
# before start
t = yield vs[0]
elif i == len(ts):
# past end: use final value, then end
last_val = lambda vl, v0: vl if isinstance(vl, (int, float)) \
else v0
t = yield call_in_nest(last_val, vs[-1], vs[0])
yield None
else:
v1 = vs[i - 1]
v2 = vs[i]
t1 = ts[i - 1]
t2 = ts[i]
# get ratio of the way between waypoints
r = 1 if t2 == t1 else (t - t1) / (t2 - t1) # t is always float
t = yield call_in_nest(interp_val, r, v1, v2, vs[0])
# start the generator; get_val is its send method
g = val_gen()
g.next()
return g.send
[docs]def interp_bezier (*pts, **kwargs):
"""Interpolate along a Bézier curve.
interp_bezier(*pts[, transform_t]) -> f
:arg pts: points to use in constructing the curve, each with the same nested
sequence form as taken by
:func:`call_in_nest <engine.util.call_in_nest>`.
:arg transform_t: function to use to transform the time before computing the
curve point.
:return: the interpolation function.
"""
transform_t = kwargs.get('transform_t')
if isinstance(transform_t, (int, float)):
scale = transform_t
transform_t = lambda t: scale * t
def get_val (t):
if transform_t is not None:
t = transform_t(t)
if t is None or t > 1:
return None
else:
return call_in_nest(bezier, t, *pts)
return get_val
[docs]def interp_target (v0, target, damp, freq=0, speed=0, threshold=0,
divisor=None):
"""Move towards a target.
interp_target(v0, target, damp, freq=0, speed=0, threshold=0, divisor=None)
-> f
:arg v0: the initial value (a structure of numbers like arguments to
:func:`call_in_nest <engine.util.call_in_nest>`). Elements which are
not numbers are ignored.
:arg target: the target value (has the same form as ``v0``).
:arg damp: rate we move towards the target (``> 0``).
:arg freq: if ``damp`` is small, oscillation around ``target`` can occur, and
this controls the frequency. If ``0``, there is no oscillation.
:arg speed: if ``freq`` is non-zero, this is the initial 'speed', in the same
form as ``v0``.
:arg threshold: stop when within this distance of ``target``; in the same form
as ``v0``. If ``None``, never stop. If varying more than one
number, only stop when every number is within its threshold.
:arg divisor: if given, use values of ``target`` and ``v0`` that are nearest to
each other, modulo this number (has the same form as ``v0``; use
``None`` to do nothing) -- eg. use ``2 * pi`` for angles in
radians.
:return: a function that returns position given the current time.
"""
if v0 == target: # nothing to do
return lambda t: None
def modulo_v0 (v0, divisor):
return v0 if divisor is None else v0 % divisor
def find_nearest (v0, target, divisor):
if divisor is None:
return target
else:
target %= divisor
alt_target = target + divisor
return (target if abs(target - v0) <= abs(alt_target - v0)
else alt_target)
v0 = modulo_v0(v0, divisor)
target = find_nearest(v0, target, divisor)
def get_phase (v0, target, speed):
if freq == 0 or not isinstance(v0, (int, float)) or v0 == target:
return 0
else:
return atan(-(float(speed) / (v0 - target) + damp) / freq)
phase = call_in_nest(get_phase, v0, target, speed)
def get_amplitude (v0, target, phase):
if isinstance(v0, (int, float)):
return (v0 - target) / cos(phase) # cos(atan(x)) is never 0
amplitude = call_in_nest(get_amplitude, v0, target, phase)
def get_val (t):
def interp_val (v0, target, amplitude, phase, threshold):
if not isinstance(v0, (int, float)):
return v0
# amplitude is None if non-number
if amplitude is None or v0 == target:
if threshold is not None:
return None
return v0
else:
dist = amplitude * exp(-damp * t)
if threshold is not None and abs(dist) <= threshold:
return None
return dist * cos(freq * t + phase) + target
rtn = call_in_nest(interp_val, v0, target, amplitude, phase, threshold)
if _match_in_nest(rtn, None):
# all done
rtn = None
return rtn
return get_val
[docs]def interp_shake (centre, amplitude = 1, threshold = 0, signed = True):
"""Shake randomly.
interp_shake(centre, amplitude = 1, threshold = 0, signed = True) -> f
:arg centre: the value to shake about; a nested list (a structure of numbers
like arguments to
:func:`call_in_nest <engine.util.call_in_nest>`). Elements which
are not numbers are ignored.
:arg amplitude: a number to multiply the value by. This can be a function that
takes the elapsed time in seconds to vary in time. Has the
same form as ``centre`` (return value does, if a function).
:arg threshold: stop when ``amplitude`` is this small; in the same form as
``centre``. If ``None``, never stop. If varying more than one
number, only stop when every number is within its threshold.
:arg signed: whether to shake around ``centre``. If ``False``, values are
always greater than ``centre`` (note that ``amplitude`` may be
signed).
:return: a function that returns position given the current time.
"""
def get_val (t):
def interp_val (centre, amplitude, threshold):
if not isinstance(centre, (int, float)):
return centre
if threshold is not None and abs(amplitude) <= threshold:
return None
val = amplitude * expovariate(1)
if signed:
val *= 2 * randrange(2) - 1
return centre + val
a = amplitude(t) if callable(amplitude) else amplitude
rtn = call_in_nest(interp_val, centre, a, threshold)
if _match_in_nest(rtn, None):
# all done
rtn = None
return rtn
return get_val
[docs]def interp_round (get_val, do_round = True):
"""Round the output of an existing interpolation function to integers.
interp_round(get_val, do_round=True) -> f
:arg get_val: the existing function.
:arg do_round: determines which values to round. This is in the form of the
values ``get_val`` returns, a structure of lists and booleans
corresponding to each number (see
:func:`call_in_nest <engine.util.call_in_nest>`).
:return: the ``get_val`` wrapper that rounds the returned value.
"""
def round_val (do, v):
return ir(v) if isinstance(v, (int, float)) and do else v
def round_get_val (t):
return call_in_nest(round_val, do_round, get_val(t))
return round_get_val
[docs]def interp_repeat (get_val, period = None, t_min = 0, t_start = None):
"""Repeat an existing interpolation function.
interp_repeat(get_val[, period], t_min = 0, t_start = t_min) -> f
:arg get_val: an existing interpolation function, as taken by
:meth:`Scheduler.interp`.
Times passed to the returned function are looped around to fit in the range
[``t_min``, ``t_min + period``), starting at ``t_start``, and the result is
passed to ``get_val``.
If ``period`` is not given, repeats end at the end of ``get_val``. Note that
this will not be entirely accurate, and you're probably better off specifying a
value if you can easily do so.
:return: the ``get_val`` wrapper that repeats ``get_val`` over the given
period.
"""
if t_start is None:
t_start = t_min
def val_gen ():
pd = period
val = None
t = yield
while True:
# transform time and get the corresponding value
t = t_min + (t_start - t_min + t)
if pd is not None:
t %= pd
# else still in the first period (and want the whole thing)
new_val = get_val(t)
# if we got a value, yield it
if new_val is not None:
val = new_val
elif pd is None:
# else get_val has ended: we know the period size now
pd = t - t_min
# else yield the previous value (which may be None: if get_val
#: returns None on the first call, we want to yield None)
t = yield val
# start the generator
g = val_gen()
g.next()
return g.send
[docs]def interp_oscillate (get_val, t_max = None, t_min = 0, t_start = None):
"""Repeat a linear oscillation over an existing interpolation function.
interp_oscillate(get_val[, t_max], t_min = 0, t_start = t_min) -> f
:arg get_val: an existing interpolation function, as taken by
:meth:`Scheduler.interp`.
Times passed to the returned function are looped and reversed to fit in the
range [``t_min``, ``t_max``), starting at ``t_start``. If ``t_start`` is in
the range [``t_max``, ``2 * t_max - t_min``), it is mapped to the 'return
journey' of the oscillation.
If ``t_max`` is not given, it is taken to be the end of ``get_val``. Note that
this will not be entirely accurate, and you're probably better off specifying a
value if you can easily do so.
:return: the ``get_val`` wrapper that oscillates ``get_val`` over the given
range.
"""
if t_start is None:
t_start = t_min
if t_max is not None:
period = t_max - t_min
else:
period = None
def val_gen ():
pd = period
val = None
t = yield
while True:
# transform time and get the corresponding value
t = t_start - t_min + t
if pd is not None:
t %= 2 * pd
if t >= pd:
t = 2 * pd - t
# else still in the first period (and want the whole thing)
new_val = get_val(t)
# if we got a value, yield it
if new_val is not None:
val = new_val
elif pd is None:
# else get_val has ended: we know the period size now
pd = t - t_min
# else yield the previous value (which may be None: if get_val
#: returns None on the first call, we want to yield None)
t = yield val
# start the generator
g = val_gen()
g.next()
return g.send
[docs]def interp_combine (combine, *get_vals):
"""Combine multiple interpolation functions.
:arg combine: combination function that takes the results from the
interpolation functions as arguments.
:arg get_vals: any number of interpolation functions.
Ignores all non-number parts of each result (results may be arbitrarily nested
sequences). Finishes when all interpolation functions return ``None``.
:return: interpolation function that combines results.
"""
def get_val (t):
vals = [g(t) for g in get_vals]
if all(v is None for v in vals):
return None
return call_in_nest(combine, *vals)
return get_val
[docs]def interp_sum (*get_vals):
"""Sum over multiple interpolation functions.
:arg get_vals: any number of interpolation functions.
Ignores all non-number parts of each result (results may be arbitrarily nested
sequences). Finishes when all interpolation functions return ``None``.
:return: interpolation function that sums over results.
"""
def add (*vals):
vals = [v for v in vals if isinstance(v, (int, float))]
return sum(vals, 0)
return interp_combine(add, *get_vals)
[docs]def interp_avg (*get_vals):
"""Average over multiple interpolation functions.
:arg get_vals: any number of interpolation functions.
Ignores all non-number parts of each result (results may be arbitrarily nested
sequences). Finishes when all interpolation functions return ``None``.
:return: interpolation function that averages over results.
"""
def avg (*vals):
safe_vals = [v for v in vals if isinstance(v, (int, float))]
return sum(safe_vals, 0.) / len(vals) if vals else 0
return interp_combine(avg, *get_vals)
[docs]class Timer (object):
"""Frame-based timer.
Timer(fps=60)
:arg fps: frames per second to aim for.
"""
def __init__ (self, fps=60):
#: The current length of a frame in seconds.
self.frame = None
#: The current average frame time in seconds (like
#: :attr:`current_fps`).
self.current_frame_time = None
self.fps = fps
#: The amount of time in seconds that has elapsed since the start of
#: the current call to :meth:`run`, if any.
self.t = 0
#: How many seconds the last frame took to run (including calling the
#: ``cb`` argument to :meth:`run` and any sleeping to make up a full
#: frame).
self.elapsed = None
@property
def fps (self):
"""The target FPS. Set this directly."""
return self._fps
@fps.setter
def fps (self, fps):
self._fps = int(round(fps))
self.current_frame_time = self.frame = 1. / fps
@property
def current_fps (self):
"""The current framerate, an average based on
:data:`conf.FPS_AVERAGE_RATIO`.
If this is less than :attr:`fps`, then the timer isn't running at full speed
because of slow calls to the ``cb`` argument to :meth:`run`.
"""
return 1 / self.current_frame_time
[docs] def run (self, cb, *args, **kwargs):
"""Run indefinitely or for a specified amount of time.
run(cb, *args[, seconds][, frames]) -> remain
:arg cb: a function to call every frame.
:arg args: extra arguments to pass to cb.
:arg seconds: the number of seconds to run for; can be a float. Accounts for
changes to :attr:`fps`.
:arg frames: the number of frames to run for; can be a float. Ignored if
``seconds`` is passed.
If neither ``seconds`` nor ``frames`` is given, run forever (until :meth:`stop`
is called). Time passed is based on the number of frames that have passed, so
it does not necessarily reflect real time.
:return: the number of seconds/frames left until the timer has been running for
the requested amount of time (or ``None``, if neither were given).
This may be less than ``0`` if ``cb`` took a long time to run.
"""
r = conf.FPS_AVERAGE_RATIO
self.t = 0
self._stopped = False
seconds = kwargs.get('seconds')
frames = kwargs.get('frames')
if seconds is not None:
seconds = max(seconds, 0)
elif frames is not None:
frames = max(frames, 0)
# main loop
t0 = time()
while True:
# call the callback
frame = self.frame
cb(*args)
t_gone = time() - t0
# return if necessary
if self._stopped:
if seconds is not None:
return seconds - t_gone
elif frames is not None:
return frames - t_gone / frame
else:
return None
# check how long to wait until the end of the frame by aiming for a
# rolling frame average equal to the target frame time
frame_t = (1 - r) * self.current_frame_time + r * t_gone
t_left = (frame - frame_t) / r
# reduce wait if we would go over the requested running time
if seconds is not None:
t_left = min(seconds, t_left)
elif frames is not None:
t_left = min(frames * frame, t_left)
# wait
if t_left > 0:
wait(int(1000 * t_left))
t_gone += t_left
frame_t += r * t_left
# update some attributes
t0 += t_gone
self.elapsed = t_gone
self.current_frame_time = frame_t
self.t += t_gone
# return if necessary
if seconds is not None:
seconds -= t_gone
if seconds <= 0:
return seconds
elif frames is not None:
frames -= t_gone / frame
if frames <= 0:
return frames
[docs] def stop (self):
"""Stop the current call to :meth:`run`, if any."""
self._stopped = True
[docs]class Scheduler (Timer):
"""Frame-based event scheduler.
Scheduler(fps=60)
:arg fps: frames per second to aim for.
"""
def __init__ (self, fps=60):
Timer.__init__(self, fps)
self._cbs = {}
self._max_id = 0
self._interps = {}
self._interp_timers = {}
[docs] def run (self, seconds = None, frames = None):
"""Start the scheduler.
run([seconds][, frames]) -> remain
Arguments and return value are as for :meth:`Timer.run`.
"""
return Timer.run(self, self._update, seconds = seconds,
frames = frames)
[docs] def add_timeout (self, cb, seconds=None, frames=None, repeat_seconds=None,
repeat_frames=None):
"""Call a function after a delay.
add_timeout(cb[, seconds][, frames][, repeat_seconds][, repeat_frames])
-> ident
:arg cb: the function to call.
:arg seconds: how long to wait before calling, in seconds (respects changes to
:attr:`Timer.fps`). If passed, ``frames`` is ignored.
:arg frames: how long to wait before calling, in frames (same number of frames
even if :attr:`Timer.fps` changes).
:arg repeat_seconds: how long to wait between calls, in seconds; time is
determined as for ``seconds``. If passed,
``repeat_frames`` is ignored; if neither is passed, the
initial time delay is used between calls.
:arg repeat_frames: how long to wait between calls, in frames (like
``repeat_seconds``).
:return: a timeout identifier to pass to :meth:`rm_timeout`. This is
guaranteed to be unique over time.
Times can be floats, in which case part-frames are carried over, and time
between calls is actually an average over a large enough number of frames.
``cb`` can return a boolean true object to repeat the timeout; otherwise it
will not be called again.
"""
if seconds is not None:
frames = None
elif frames is None:
raise TypeError('expected \'seconds\' or \'frames\' argument')
if repeat_seconds is not None:
repeat_frames = None
elif repeat_frames is None:
repeat_seconds = seconds
repeat_frames = frames
self._cbs[self._max_id] = [seconds, frames, repeat_seconds,
repeat_frames, True, cb]
self._max_id += 1
# ID is key in self._cbs
return self._max_id - 1
[docs] def rm_timeout (self, *ids):
"""Remove the timeouts with the given identifiers.
Missing IDs are ignored.
"""
cbs = self._cbs
interps = self._interps
interp_timers = self._interp_timers
for i in ids:
if i in cbs:
del cbs[i]
if i in interps:
interp_timers[interps[i]].remove(i)
del interps[i]
[docs] def pause_timeout (self, *ids):
"""Pause the timeouts with the given identifiers."""
cbs = self._cbs
for i in ids:
if i in cbs:
cbs[i][4] = False
[docs] def unpause_timeout (self, *ids):
"""Continue the paused timeouts with the given identifiers."""
cbs = self._cbs
for i in ids:
if i in cbs:
cbs[i][4] = True
def _update (self):
"""Handle callbacks this frame."""
cbs = self._cbs
frame = self.frame
# cbs might add/remove cbs, so use items instead of iteritems
for i, data in cbs.items():
if i not in cbs:
# removed since we called .items()
continue
if data[0] is not None:
remain = 0
dt = frame
else:
remain = 1
dt = 1
if data[4]:
data[remain] -= dt
if data[remain] <= 0:
# call callback
if data[5]():
# add on delay
total = data[2] is None
data[total] += data[total + 2]
elif i in cbs: # else removed in above call
self.rm_timeout(i)
# else paused
[docs] def interp (self, get_val, set_val, t_max=None, bounds=None, end=None,
round_val=False, multi_arg=False, resolution=None,
override=True):
"""Vary a value over time.
interp(get_val, set_val[, t_max][, bounds][, end], round_val=False,
multi_arg=False[, resolution], override=True) -> timeout_id
:arg get_val: a function called with the elapsed time in seconds to obtain the
current value. If this function returns ``None``, the
interpolation will be canceled. The ``interp_*`` functions in
this module can be used to construct such functions.
:arg set_val: a function called with the current value to set it. This may
also be an ``(obj, attr)`` tuple to do ``obj.attr = val``. In
the second form, ``attr`` may be a sequence of attribute names
corresponding to the sequence of values returned from
``get_val`` to set.
:arg t_max: if time becomes larger than this, cancel the interpolation.
:arg bounds: a function that takes the value returned from ``get_val`` and
checks if it is outside of some boundaries, and returns the
boundary value ``bdy`` if so (else None). If the value falls out
of bounds, ``set_val`` is called with ``bdy`` and the
interpolation is canceled.
:arg end: used to do some cleanup when the interpolation is canceled (when
``get_val`` returns ``None`` or ``t_max``, ``val_min`` or ``val_max``
comes into effect, but not when the ``rm_timeout`` method is called
with ``timeout_id``). This can be a final value to pass to
``set_val``, or a function to call without arguments. If the
function returns a (non-``None``) value, ``set_val`` is called with
it.
:arg round_val: whether to round the value(s) (see :func:`interp_round`
`do_round` argument for details).
:arg multi_arg: whether values should be interpreted as lists of arguments to
pass to ``set_val`` instead of a single argument. If
``set_val`` is ``(obj, attr)``, it is the same as a
one-argument function.
:arg resolution: 'framerate' to update the value at. If not given, the value
is set every frame it changes; if given, this sets an upper
limit on the number of times per second the value may updated.
The current value of :attr:`fps <Timer.fps>` (which may change
over the interpolation) also puts an upper limit on the rate.
:arg override: whether to override (abort) previous interpolations with the
same ``set_val``. This works for identical functions, and
``(obj, attr)`` for the identical objects and the exact same
sets of attributes (since ``attr`` can be a sequence). The
``end`` action for aborted interpolations is not called.
:return: an identifier that can be passed to :meth:`rm_timeout` to remove the
callback that continues the interpolation. In this case ``end`` is not
respected.
"""
if round_val:
get_val = interp_round(get_val, round_val)
if callable(set_val):
key = set_val
else:
obj, attr = set_val
if isinstance(attr, basestring):
key = (obj, attr)
set_val = lambda val: setattr(obj, attr, val)
else:
# attr is a sequence of attributes
key = (obj, frozenset(attr))
def set_val (vals):
for a, val in zip(attr, vals):
setattr(obj, a, val)
def timeout_cb ():
if resolution is not None:
update_frame = 1. / resolution
t = 0
dt = 0
last_v = None
done = False
while True:
frame = self.frame
t += frame
dt += frame
if resolution is None or dt >= update_frame:
if resolution is not None:
dt -= update_frame
# perform an update
v = get_val(t)
if v is None:
done = True
# check bounds
elif t_max is not None and t > t_max:
done = True
else:
if bounds is not None:
bdy = bounds(v)
if bdy is not None:
done = True
v = bdy
if v != last_v:
set_val(*v) if multi_arg else set_val(v)
last_v = v
if done:
# canceling for some reason
if callable(end):
v = end()
else:
v = end
# set final value if want to
if v is not None and v != last_v:
set_val(*v) if multi_arg else set_val(v)
yield False
# just in case we get called again (should never happen)
return
else:
yield True
else:
yield True
timeout_id = self.add_timeout(timeout_cb().next, frames=1)
if override and key in self._interp_timers:
self.rm_timeout(*self._interp_timers[key])
assert (key not in self._interp_timers,
'rm_timeout should\'ve removed other interpolations')
self._interp_timers.setdefault(key, []).append(timeout_id)
self._interps[timeout_id] = key
return timeout_id
[docs] def interp_simple (self, obj, attr, target, t, end_cb=None,
round_val=False, override=True):
"""A simple version of :meth:`interp`.
Varies an object's attribute linearly from its current value to a target value
in a set amount of time.
interp_simple(obj, attr, target, t[, end_cb], round_val=False, override=True)
-> timeout_id
:arg obj: vary an attribute of this object.
:arg attr: the attribute name of ``obj`` to vary, or a sequence of attributes
to set (if ``target`` is also a sequence).
:arg target: a target value, in the same form as the current value in the given
attribute (see
:func:`call_in_nest <engine.util.call_in_nest>`).
:arg t: the amount of time to take to reach the target value, in seconds.
:arg end_cb: a function to call when the target value has been reached.
:arg round_val: whether to round the value(s) (see :func:`interp_round` for
details).
:arg override: whether to override (abort) previous interpolations with the
same ``obj`` and ``attr``. This works for the exact same sets
of attributes (since ``attr`` can be a sequence). ``end_cb``
is not called for aborted interpolations.
:return: an identifier that can be passed to :meth:`rm_timeout` to remove the
callback that continues the interpolation. In this case ``end_cb`` is
not called.
"""
get_val = interp_linear(getattr(obj, attr), (target, t))
return self.interp(get_val, (obj, attr), end=end_cb,
round_val=round_val, override=override)
def _interp_locked (self, interp_fn, *args, **kwargs):
# HACK: Python 2 closures are immutable
timeout_id = [None]
def interp (*args, **kwargs):
if timeout_id[0] is not None:
self.rm_timeout(timeout_id[0])
timeout_id[0] = interp_fn(*args, **kwargs)
return timeout_id[0]
return partial(interp, *args, **kwargs)
[docs] def interp_locked (self, *args, **kwargs):
"""Generate a :meth:`interp` wrapper that allows only one running
interpolation.
With each successive call, the current interpolation is aborted and a new one
started. (For most cases, just passing the ``override`` argument to
:meth:`interp` should suffice.)
The wrapper is partially applied using the positional and keyword arguments
passed to this function. Typical usage is as follows::
# create the wrapper that knows how to set values
interp = scheduler.interp_locked(set_val=set_val)
[...]
# call it at some point with an interpolation function
interp(get_val)
[...]
# call it again later with a different interpolation function
interp(get_val2)
# only one interpolation is running
"""
return self._interp_locked(self.interp, *args, **kwargs)
[docs] def interp_simple_locked (self, *args, **kwargs):
"""Like :meth:`interp_locked`, but wraps :meth:`interp_simple`."""
return self._interp_locked(self.interp_simple, *args, **kwargs)
[docs] def countdown (self, t, autoreset=False):
"""Create and return a :class:`Countdown` that uses this instance for
timing.
countdown(t, autoreset=False) -> new_countdown
Arguments are as taken by :class:`Countdown`.
"""
return Countdown(self, t, autoreset)
[docs] def counter (self, limit=None):
"""Create and return a :class:`Counter` that uses this instance for
timing.
counter([limit]) -> new_counter
Arguments are as taken by :class:`Counter`.
"""
return Counter(self, limit)
[docs]class Countdown (CbManager):
"""A simple way of counting down to an event.
Countdown(scheduler, t, autoreset=False)
:arg scheduler: :class:`Scheduler` instance to use for timing.
:arg t: how long a countdown lasts, in seconds.
:arg autoreset: whether to reset and count down from the beginning again when
the countdown ends. This is only useful with :attr:`cbs` (the
finished state never becomes ``True``).
An instance is boolean ``True`` if the countdown has finished, else ``False``.
The initial state is finished---use :meth:`reset` to start the countdown.
This is a :class:`CbManager <engine.util.cb.CbManager>`; callbacks added are
called when the countdown ends, and take no arguments.
See also :meth:`Scheduler.countdown`.
"""
def __init__ (self, scheduler, t, autoreset=False):
CbManager.__init__(self)
self._scheduler = scheduler
self._t = t
#: As passed to the constructor.
self.autoreset = autoreset
self._timer_id = None
self._finished = True
@property
def t (self):
"""How long a countdown lasts, in seconds.
Changing this resets the countdown (if running).
"""
return self._t
@t.setter
def t (self, t):
self._t = t
if self._timer_id is not None:
self.reset()
def __nonzero__ (self):
return self._finished
def _end_cb (self):
# called when the timeout ends
if not self.autoreset:
self._timer_id = None
self._finished = True
self.call()
return self.autoreset
[docs] def reset (self):
"""Start counting down from the beginning again.
reset() -> self
Starts counting down even if the countdown wasn't already running.
"""
if self._timer_id is not None:
self._scheduler.rm_timeout(self._timer_id)
self._finished = False
self._timer_id = self._scheduler.add_timeout(self._end_cb, self.t)
return self
[docs] def cancel (self):
"""Stop counting down and set the finished state to ``False``.
cancel() -> self
"""
if self._timer_id is not None:
self._scheduler.rm_timeout(self._timer_id)
self._timer_id = None
self._finished = False
return self
[docs] def finish (self):
"""Stop counting down and set the finished state to ``True``.
finish() -> self
"""
self.cancel()
self._finished = True
return self
[docs] def pause (self):
"""Pause the countdown, if running.
pause() -> self
"""
if self._timer_id is not None:
self._scheduler.pause_timeout(self._timer_id)
return self
[docs] def unpause (self):
"""Unpause the countdown, if paused.
unpause() -> self
"""
if self._timer_id is not None:
self._scheduler.unpause_timeout(self._timer_id)
return self
[docs]class Counter (CbManager):
"""A simple way of keeping track of time.
Counter(scheduler[, limit])
:arg scheduler: :class:`Scheduler` instance to use for timing.
:arg limit: if given, stop the counter once it reaches this many seconds.
An instance is boolean ``True`` if the counter has reached ``limit``, else
``False``. The initial state is finished---use :meth:`reset` to start the
counter.
This is a :class:`CbManager <engine.util.cb.CbManager>`; callbacks added are
called when the counter reacher :attr:`limit`, and take no arguments.
See also :meth:`Scheduler.counter`.
"""
def __init__ (self, scheduler, limit=None):
CbManager.__init__(self)
self._scheduler = scheduler
#: How far the counter has counted, in seconds.
self.t = 0
#: As passed to the constructor, or ``None``.
self.limit = limit
self._timer_id = None
self._finished = True
def __nonzero__ (self):
return self._finished
def _update (self):
# called every frame
self.t += self._scheduler.frame
if self.limit is not None and self.t >= self.limit:
self.t = self.limit
self._finished = True
self.call()
return False
else:
return True
[docs] def reset (self):
"""Start counting from ``0`` again.
reset() -> self
Starts counting even if the counter wasn't already running.
"""
if self._timer_id is not None:
self._scheduler.rm_timeout(self._timer_id)
self.t = 0
self._finished = False
self._timer_id = self._scheduler.add_timeout(self._update, frames=1)
return self
[docs] def cancel (self):
"""Stop counting down and set the finished state to ``False``.
cancel() -> self
:attr:`t` is not changed.
"""
if self._timer_id is not None:
self._scheduler.rm_timeout(self._timer_id)
self._timer_id = None
self._finished = False
return self
[docs] def finish (self):
"""Stop counting down and set the finished state to ``True``.
finish() -> self
"""
self.cancel()
self._finished = True
return self
[docs] def pause (self):
"""Pause the countdown, if running.
pause() -> self
"""
if self._timer_id is not None:
self._scheduler.pause_timeout(self._timer_id)
return self
[docs] def unpause (self):
"""Unpause the countdown, if paused.
unpause() -> self
"""
if self._timer_id is not None:
self._scheduler.unpause_timeout(self._timer_id)
return self