sched—event scheduler and interpolation

interp_linear(*waypoints) → f[source]

Linear interpolation for Scheduler.interp().

Parameters:waypoints – each is (v, t) to set the value to v at time t. t can be omitted for any but the last waypoint: the first is 0, and other gaps are filled in with equal spacing. v is like the arguments taken by call_in_nest, and we interpolate for each number in the nested list structure of v. Some objects in the v structures may be non-numbers, in which case they will not be varied (maybe your function takes another argument you don’t want to vary); objects may be None to always use the initial value in that position.
Returns:a function for which f(t) = v for every waypoint (v, t), with intermediate values linearly interpolated between waypoints.
interp_bezier(*pts[, transform_t]) → f[source]

Interpolate along a Bézier curve.

Parameters:
  • pts – points to use in constructing the curve, each with the same nested sequence form as taken by call_in_nest.
  • transform_t – function to use to transform the time before computing the curve point.
Returns:

the interpolation function.

interp_target(v0, target, damp, freq=0, speed=0, threshold=0, divisor=None) → f[source]

Move towards a target.

Parameters:
  • v0 – the initial value (a structure of numbers like arguments to call_in_nest). Elements which are not numbers are ignored.
  • target – the target value (has the same form as v0).
  • damp – rate we move towards the target (> 0).
  • freq – if damp is small, oscillation around target can occur, and this controls the frequency. If 0, there is no oscillation.
  • speed – if freq is non-zero, this is the initial ‘speed’, in the same form as v0.
  • threshold – stop when within this distance of target; in the same form as v0. If None, never stop. If varying more than one number, only stop when every number is within its threshold.
  • divisor – if given, use values of target and v0 that are nearest to each other, modulo this number (has the same form as v0; use None to do nothing) – eg. use 2 * pi for angles in radians.
Returns:

a function that returns position given the current time.

interp_shake(centre, amplitude = 1, threshold = 0, signed = True) → f[source]

Shake randomly.

Parameters:
  • centre – the value to shake about; a nested list (a structure of numbers like arguments to call_in_nest). Elements which are not numbers are ignored.
  • amplitude – a number to multiply the value by. This can be a function that takes the elapsed time in seconds to vary in time. Has the same form as centre (return value does, if a function).
  • threshold – stop when amplitude is this small; in the same form as centre. If None, never stop. If varying more than one number, only stop when every number is within its threshold.
  • signed – whether to shake around centre. If False, values are always greater than centre (note that amplitude may be signed).
Returns:

a function that returns position given the current time.

interp_round(get_val, do_round=True) → f[source]

Round the output of an existing interpolation function to integers.

Parameters:
  • get_val – the existing function.
  • do_round – determines which values to round. This is in the form of the values get_val returns, a structure of lists and booleans corresponding to each number (see call_in_nest).
Returns:

the get_val wrapper that rounds the returned value.

interp_repeat(get_val, [period, ]t_min = 0, t_start = t_min) → f[source]

Repeat an existing interpolation function.

Parameters:get_val – an existing interpolation function, as taken by Scheduler.interp().

Times passed to the returned function are looped around to fit in the range [t_min, t_min + period), starting at t_start, and the result is passed to get_val.

If period is not given, repeats end at the end of get_val. Note that this will not be entirely accurate, and you’re probably better off specifying a value if you can easily do so.

Returns:the get_val wrapper that repeats get_val over the given period.
interp_oscillate(get_val, [t_max, ]t_min = 0, t_start = t_min) → f[source]

Repeat a linear oscillation over an existing interpolation function.

Parameters:get_val – an existing interpolation function, as taken by Scheduler.interp().

Times passed to the returned function are looped and reversed to fit in the range [t_min, t_max), starting at t_start. If t_start is in the range [t_max, 2 * t_max - t_min), it is mapped to the ‘return journey’ of the oscillation.

If t_max is not given, it is taken to be the end of get_val. Note that this will not be entirely accurate, and you’re probably better off specifying a value if you can easily do so.

Returns:the get_val wrapper that oscillates get_val over the given range.
interp_combine(combine, *get_vals)[source]

Combine multiple interpolation functions.

Parameters:
  • combine – combination function that takes the results from the interpolation functions as arguments.
  • get_vals – any number of interpolation functions.

Ignores all non-number parts of each result (results may be arbitrarily nested sequences). Finishes when all interpolation functions return None.

Returns:interpolation function that combines results.
interp_sum(*get_vals)[source]

Sum over multiple interpolation functions.

Parameters:get_vals – any number of interpolation functions.

Ignores all non-number parts of each result (results may be arbitrarily nested sequences). Finishes when all interpolation functions return None.

Returns:interpolation function that sums over results.
interp_avg(*get_vals)[source]

Average over multiple interpolation functions.

Parameters:get_vals – any number of interpolation functions.

Ignores all non-number parts of each result (results may be arbitrarily nested sequences). Finishes when all interpolation functions return None.

Returns:interpolation function that averages over results.
class Timer(fps=60)[source]

Bases: object

Frame-based timer.

Parameters:fps – frames per second to aim for.
frame = None

The current length of a frame in seconds.

current_frame_time = None

The current average frame time in seconds (like current_fps).

t = None

The amount of time in seconds that has elapsed since the start of the current call to run(), if any.

elapsed = None

How many seconds the last frame took to run (including calling the cb argument to run() and any sleeping to make up a full frame).

fps

The target FPS. Set this directly.

current_fps

The current framerate, an average based on conf.FPS_AVERAGE_RATIO.

If this is less than fps, then the timer isn’t running at full speed because of slow calls to the cb argument to run().

run(cb, *args[, seconds][, frames]) → remain[source]

Run indefinitely or for a specified amount of time.

Parameters:
  • cb – a function to call every frame.
  • args – extra arguments to pass to cb.
  • seconds – the number of seconds to run for; can be a float. Accounts for changes to fps.
  • frames – the number of frames to run for; can be a float. Ignored if seconds is passed.

If neither seconds nor frames is given, run forever (until stop() is called). Time passed is based on the number of frames that have passed, so it does not necessarily reflect real time.

Returns:the number of seconds/frames left until the timer has been running for the requested amount of time (or None, if neither were given). This may be less than 0 if cb took a long time to run.
stop()[source]

Stop the current call to run(), if any.

class Scheduler(fps=60)[source]

Bases: engine.sched.Timer

Frame-based event scheduler.

Parameters:fps – frames per second to aim for.
run([seconds][, frames]) → remain[source]

Start the scheduler.

Arguments and return value are as for Timer.run().

add_timeout(cb[, seconds][, frames][, repeat_seconds][, repeat_frames]) → ident[source]

Call a function after a delay.

Parameters:
  • cb – the function to call.
  • seconds – how long to wait before calling, in seconds (respects changes to Timer.fps). If passed, frames is ignored.
  • frames – how long to wait before calling, in frames (same number of frames even if Timer.fps changes).
  • repeat_seconds – how long to wait between calls, in seconds; time is determined as for seconds. If passed, repeat_frames is ignored; if neither is passed, the initial time delay is used between calls.
  • repeat_frames – how long to wait between calls, in frames (like repeat_seconds).
Returns:

a timeout identifier to pass to rm_timeout(). This is guaranteed to be unique over time.

Times can be floats, in which case part-frames are carried over, and time between calls is actually an average over a large enough number of frames.

cb can return a boolean true object to repeat the timeout; otherwise it will not be called again.

rm_timeout(*ids)[source]

Remove the timeouts with the given identifiers.

Missing IDs are ignored.

pause_timeout(*ids)[source]

Pause the timeouts with the given identifiers.

unpause_timeout(*ids)[source]

Continue the paused timeouts with the given identifiers.

interp(get_val, set_val[, t_max][, bounds][, end], round_val=False, multi_arg=False[, resolution], override=True) → timeout_id[source]

Vary a value over time.

Parameters:
  • get_val – a function called with the elapsed time in seconds to obtain the current value. If this function returns None, the interpolation will be canceled. The interp_* functions in this module can be used to construct such functions.
  • set_val – a function called with the current value to set it. This may also be an (obj, attr) tuple to do obj.attr = val. In the second form, attr may be a sequence of attribute names corresponding to the sequence of values returned from get_val to set.
  • t_max – if time becomes larger than this, cancel the interpolation.
  • bounds – a function that takes the value returned from get_val and checks if it is outside of some boundaries, and returns the boundary value bdy if so (else None). If the value falls out of bounds, set_val is called with bdy and the interpolation is canceled.
  • end – used to do some cleanup when the interpolation is canceled (when get_val returns None or t_max, val_min or val_max comes into effect, but not when the rm_timeout method is called with timeout_id). This can be a final value to pass to set_val, or a function to call without arguments. If the function returns a (non-None) value, set_val is called with it.
  • round_val – whether to round the value(s) (see interp_round() do_round argument for details).
  • multi_arg – whether values should be interpreted as lists of arguments to pass to set_val instead of a single argument. If set_val is (obj, attr), it is the same as a one-argument function.
  • resolution – ‘framerate’ to update the value at. If not given, the value is set every frame it changes; if given, this sets an upper limit on the number of times per second the value may updated. The current value of fps (which may change over the interpolation) also puts an upper limit on the rate.
  • override – whether to override (abort) previous interpolations with the same set_val. This works for identical functions, and (obj, attr) for the identical objects and the exact same sets of attributes (since attr can be a sequence). The end action for aborted interpolations is not called.
Returns:

an identifier that can be passed to rm_timeout() to remove the callback that continues the interpolation. In this case end is not respected.

interp_simple(obj, attr, target, t, [end_cb, ]round_val=False, override=True) → timeout_id[source]

A simple version of interp().

Varies an object’s attribute linearly from its current value to a target value in a set amount of time.

Parameters:
  • obj – vary an attribute of this object.
  • attr – the attribute name of obj to vary, or a sequence of attributes to set (if target is also a sequence).
  • target – a target value, in the same form as the current value in the given attribute (see call_in_nest).
  • t – the amount of time to take to reach the target value, in seconds.
  • end_cb – a function to call when the target value has been reached.
  • round_val – whether to round the value(s) (see interp_round() for details).
  • override – whether to override (abort) previous interpolations with the same obj and attr. This works for the exact same sets of attributes (since attr can be a sequence). end_cb is not called for aborted interpolations.
Returns:

an identifier that can be passed to rm_timeout() to remove the callback that continues the interpolation. In this case end_cb is not called.

interp_locked(*args, **kwargs)[source]

Generate a interp() wrapper that allows only one running interpolation.

With each successive call, the current interpolation is aborted and a new one started. (For most cases, just passing the override argument to interp() should suffice.)

The wrapper is partially applied using the positional and keyword arguments passed to this function. Typical usage is as follows:

# create the wrapper that knows how to set values
interp = scheduler.interp_locked(set_val=set_val)

[...]

# call it at some point with an interpolation function
interp(get_val)

[...]

# call it again later with a different interpolation function
interp(get_val2)
# only one interpolation is running
interp_simple_locked(*args, **kwargs)[source]

Like interp_locked(), but wraps interp_simple().

countdown(t, autoreset=False) → new_countdown[source]

Create and return a Countdown that uses this instance for timing.

Arguments are as taken by Countdown.

counter([limit]) → new_counter[source]

Create and return a Counter that uses this instance for timing.

Arguments are as taken by Counter.

class Countdown(scheduler, t, autoreset=False)[source]

Bases: engine.util.cb.CbManager

A simple way of counting down to an event.

Parameters:
  • schedulerScheduler instance to use for timing.
  • t – how long a countdown lasts, in seconds.
  • autoreset – whether to reset and count down from the beginning again when the countdown ends. This is only useful with cbs (the finished state never becomes True).

An instance is boolean True if the countdown has finished, else False. The initial state is finished—use reset() to start the countdown.

This is a CbManager; callbacks added are called when the countdown ends, and take no arguments.

See also Scheduler.countdown().

autoreset = None

As passed to the constructor.

t

How long a countdown lasts, in seconds.

Changing this resets the countdown (if running).

reset() → self[source]

Start counting down from the beginning again.

Starts counting down even if the countdown wasn’t already running.

cancel() → self[source]

Stop counting down and set the finished state to False.

finish() → self[source]

Stop counting down and set the finished state to True.

pause() → self[source]

Pause the countdown, if running.

unpause() → self[source]

Unpause the countdown, if paused.

class Counter(scheduler[, limit])[source]

Bases: engine.util.cb.CbManager

A simple way of keeping track of time.

Parameters:
  • schedulerScheduler instance to use for timing.
  • limit – if given, stop the counter once it reaches this many seconds.

An instance is boolean True if the counter has reached limit, else False. The initial state is finished—use reset() to start the counter.

This is a CbManager; callbacks added are called when the counter reacher limit, and take no arguments.

See also Scheduler.counter().

t = None

How far the counter has counted, in seconds.

limit = None

As passed to the constructor, or None.

reset() → self[source]

Start counting from 0 again.

Starts counting even if the counter wasn’t already running.

cancel() → self[source]

Stop counting down and set the finished state to False.

t is not changed.

finish() → self[source]

Stop counting down and set the finished state to True.

pause() → self[source]

Pause the countdown, if running.

unpause() → self[source]

Unpause the countdown, if paused.