Source code for engine.game

"""Main loop and world handling.

Only one :class:`Game` instance should ever exist, and it stores itself in
:data:`conf.GAME`.  Start the game with :func:`run` and use the :class:`Game`
instance for changing worlds and handling the display.

"""

import sys
import os
from random import choice, randrange
from math import exp

import pygame as pg
from pygame.display import update as update_display

from .conf import conf
from .sched import Scheduler
from . import evt, gfx, res, text
from .util import ir, convert_sfc


[docs]def run (*args, **kwargs): """Run the game. Takes the same arguments as :class:`Game`, with an optional keyword-only argument ``t`` to run for this many seconds. """ t = kwargs.pop('t', None) global restarting restarting = True while restarting: restarting = False Game(*args, **kwargs).run(t)
class _ClassProperty (property): """Decorator to create a static property.""" def __get__(self, cls, owner): return self.fget.__get__(None, owner)()
[docs]class World (object): """A world base class; to be subclassed. World(scheduler, evthandler, resources) :arg scheduler: the :class:`sched.Scheduler <engine.sched.Scheduler>` instance this world should use for timing. :arg evthandler: the :class:`evt.EventHandler <engine.evt.handler.EventHandler>` instance this world should use for input. Event names prefixed with ``_game`` are reserved. :arg resources: the :class:`res.ResourceManager <engine.res.ResourceManager>` instance this world should use for loading resources. .. attribute:: id A unique identifier used for some settings in :mod:`conf`. This is a class property---it is independent of the instance. A subclass may define an ``_id`` class attribute (not instance attribute). If so, that is returned; if not, ``world_class.__name__.lower()`` is returned. """ def __init__ (self, scheduler, evthandler, resources, *args, **kwargs): #: :class:`sched.Scheduler <engine.sched.Scheduler>` instance taken by #: the constructor. self.scheduler = scheduler #: :class:`evt.EventHandler <engine.evt.handler.EventHandler>` instance #: taken by the constructor. self.evthandler = evthandler #: :class:`gfx.GraphicsManager <engine.gfx.container.GraphicsManager>` #: instance used for drawing by default (by, eg. entities). self.graphics = gfx.GraphicsManager(scheduler) #: :class:`gfx.GraphicsManager <engine.gfx.container.GraphicsManager>` #: instance used as the world's output to the screen. This is the same #: instance as :attr:`graphics` by default. self.display = self.graphics #: :class:`res.ResourceManager <engine.res.ResourceManager>` instance #: taken by the constructor. self.resources = resources #: ``set`` of :class:`Entity <engine.entity.Entity>` instances in this #: world. self.entities = set() self._initialised = False self._extra_args = (args, kwargs) self._music_evt = self.evthandler.add((conf.EVENT_ENDMUSIC,))[0] # {sound_id: [(sound, vol)]}, vol excluding the world's sound volume self._sounds = {} self._avg_draw_time = scheduler.frame self._since_last_draw = 0 @_ClassProperty @classmethod def id (cls): # doc is in the class(!) if hasattr(cls, '_id'): return cls._id else: return cls.__name__.lower() @property def fps (self): """The current draw rate, an average based on :data:`conf.FPS_AVERAGE_RATIO`. If this is less than :data:`conf.FPS`, then we're dropping frames. For the current update FPS, use the :attr:`Timer.current_fps <engine.sched.Timer.current_fps>` of :attr:`scheduler`. (If this indicates the scheduler isn't running at full speed, it may mean the draw rate is dropping to :data:`conf.MIN_FPS`.) """ return 1 / self._avg_draw_time
[docs] def init (self, *args, **kwargs): """Called when this first becomes the active world (before :meth:`select`). This receives the extra arguments passed in constructing the world through the :class:`Game` instance. """ pass
[docs] def select (self): """Called whenever this becomes the active world.""" pass
def _select (self): """Called by the game when becomes the active world.""" ident = self.id pg.event.set_grab(conf.GRAB_EVENTS[ident]) pg.mouse.set_visible(conf.MOUSE_VISIBLE[ident]) if conf.MUSIC_AUTOPLAY[ident]: self.play_music() else: pg.mixer.music.stop() pg.mixer.music.set_volume(self.scale_volume(self.music_volume)) if not self._initialised: self.init(*self._extra_args[0], **self._extra_args[1]) self._initialised = True del self._extra_args self.evthandler.normalise() self.select()
[docs] def pause (self): """Called to pause the game when the window loses focus.""" pass
[docs] def update (self): """Called every frame to makes any necessary changes.""" pass
def _update (self): """Called by the game to update.""" for e in list(self.entities): e.update() self.update() def _handle_slowdown (self): """Return whether to draw this frame.""" s = self.scheduler elapsed = s.elapsed if elapsed is None: # haven't completed a frame yet return True frame_t = s.current_frame_time target_t = s.frame # compute rolling frame average for drawing, but don't store it just # yet r = conf.FPS_AVERAGE_RATIO draw_t = ((1 - r) * self._avg_draw_time + r * (self._since_last_draw + elapsed)) if frame_t <= target_t or abs(frame_t - target_t) / target_t < .1: # running at (near enough (within 1% of)) full speed, so draw draw = True else: if draw_t >= 1. / conf.MIN_FPS[self.id]: # not drawing would make the draw FPS too low, so draw anyway draw = True else: draw = False draw |= not conf.DROP_FRAMES if draw: # update rolling draw frame average self._avg_draw_time = draw_t self._since_last_draw = 0 else: # remember frame time for when we next draw self._since_last_draw += elapsed return draw
[docs] def draw (self): """Draw to the screen. :return: a flag indicating what changes were made: ``True`` if the whole display needs to be updated, something falsy if nothing needs to be updated, else a list of rects to update the display in. This method should not change the state of the world, because it is not guaranteed to be called every frame. """ dirty = self.display.draw(False) return dirty
[docs] def quit (self): """Called when this is removed from the currently running worlds. Called before removal---when :attr:`Game.world` is still this world. """ pass
[docs] def add (self, *entities): """Add any number of :class:`Entity <engine.entity.Entity>` instances to the world. An entity may be in only one world at a time. If a given entity is already in another world, it is removed from that world. Each entity passed may also be a sequence of entities to add. """ entities = list(entities) all_entities = self.entities for e in entities: if hasattr(e, '__len__') and hasattr(e, '__getitem__'): entities.extend(e) else: if e.world is not None: e.world.rm(e) if e.graphics.manager is None: e.graphics.manager = self.graphics # else manager was explicitly set, so don't change it all_entities.add(e) e.world = self e.added()
[docs] def rm (self, *entities): """Remove any number of entities from the world. Missing entities are ignored. Each entity passed may also be a sequence of entities to remove. """ entities = list(entities) all_entities = self.entities for e in entities: if hasattr(e, '__len__') and hasattr(e, '__getitem__'): entities.extend(e) else: if e in all_entities: all_entities.remove(e) e.world = None # unset gm even if it's not this world's main manager e.graphics.manager = None
[docs] def use_pools (self, *pools): """Tell the resource manager that this world is using the given pools. This means the resources in the pool will not be removed from cache until this world drops the pool. """ for pool in pools: self.resources.use(pool, self)
[docs] def drop_pools (self, *pools): """Stop using the given pools of the resource manager.""" for pool in pools: self.resources.drop(pool, self)
@property def music_volume (self): """The world's music volume, before scaling. This is actually :data:`conf.MUSIC_VOLUME`, and changing it alters that value, and also changes the volume of currently playing music. """ return conf.MUSIC_VOLUME[self.id] @music_volume.setter def music_volume (self, volume): i = self.id if volume > 1: print >> sys.stderr, 'warning: music volume greater than 1' if volume != conf.MUSIC_VOLUME[i]: conf.MUSIC_VOLUME[i] = volume conf.changed('MUSIC_VOLUME') pg.mixer.music.set_volume(volume) @property def snd_volume (self): """The world's base sound volume. This is actually :data:`conf.SOUND_VOLUME`, and changing it alters that value, and also changes the volume of currently playing sounds. """ return conf.SOUND_VOLUME[self.id] @snd_volume.setter def snd_volume (self, volume): i = self.id if volume != conf.SOUND_VOLUME[i]: conf.SOUND_VOLUME[i] = volume conf.changed('SOUND_VOLUME') # reset playing sound volumes for base_id, snds in self._sounds.iteritems(): for snd, vol in snds: # vol excludes the world's volume vol *= volume vol = self.scale_volume(vol) if vol > 1: print >> sys.stderr, ('warning: sound volume greater ' 'than 1') snd.set_volume(volume * vol)
[docs] def play_music (self, group=None, loop=True, cb=None): """Randomly play music from a group. play_music([group], loop=True[, cb]) :arg group: music group to play from, as keys in :data:`conf.MUSIC`; defaults to :attr:`id`, and then `''` (the root directory of :data:`conf.MUSIC_DIR`) if there is no such group. :arg loop: whether to play multiple tracks. If ``True``, play random tracks sequentially until the active world changes, music from a different group is played, or the Pygame mixer is manually stopped. If a number, play that many randomly selected tracks (if falsy, do nothing). :arg cb: a function to call when all the music has been played, according to the value of ``loop``. Called even if no music is played (if there is none in this group, or ``loop`` is falsy). Raises ``KeyError`` if the given group does not exist. """ if group is None: group = self.id if group not in conf.MUSIC: group = '' # raises KeyError fns = conf.MUSIC[group] if not fns or not loop: # no files or don't want to play anything: do nothing if cb is not None: cb() return # modifying variables in closures is painful loop = [loop] def end_cb (): if loop[0] is not True: loop[0] -= 1 if loop[0]: play_next() elif cb is not None: cb() def play_next (): pg.mixer.music.load(choice(fns)) pg.mixer.music.play() play_next() self._music_evt.rm_cbs(*self._music_evt.cbs) self._music_evt.cb(end_cb)
[docs] def play_snd (self, base_id, volume=1): """Play a sound. play_snd(base_id, volume=1) :arg base_id: the identifier of the sound to play (we look for ``base_id + i`` for a number ``i``---there are as many sounds as set in :data:`conf.SOUNDS`). If this is not in :data:`conf.SOUNDS`, it is used as the whole filename (without ``'.ogg'``). :arg volume: amount to scale the playback volume by. """ alias = base_id if base_id in conf.SOUND_ALIASES: base_id = conf.SOUND_ALIASES[base_id] volume *= conf.SOUND_VOLUMES[alias] if base_id in conf.SOUNDS: ident = randrange(conf.SOUNDS[base_id]) base_id += str(ident) # else not a random sound # load sound, and make a copy so we can play/stop instances separately # (without managing channels, at least) snd = self.resources.snd(base_id + '.ogg') snd = pg.mixer.Sound(snd.get_buffer() if hasattr(snd, 'get_buffer') else snd.get_raw()) # store sound, and stop oldest if necessary playing = self._sounds.setdefault(alias, []) if alias in conf.MAX_SOUNDS: assert len(playing) <= conf.MAX_SOUNDS[alias] if len(playing) == conf.MAX_SOUNDS[alias] and playing: playing.pop(0)[0].stop() else: i = 0 while i < len(playing): if playing[i][0].get_num_channels() == 0: # sound is no longer playing, so remove it playing.pop(i) else: i += 1 playing.append((snd, volume)) # play volume *= conf.SOUND_VOLUME[self.id] volume = self.scale_volume(volume) if volume > 1: print >> sys.stderr, 'warning: sound volume greater than 1' snd.set_volume(volume) snd.play()
def _get_base_ids (self, *base_ids, **kwargs): # takes (*base_ids, exclude=False) to get the base_ids this represents if not base_ids: return self._sounds.keys() if kwargs.get('exclude', False): return list(set(self._sounds.keys()).difference(base_ids)) else: return base_ids def _get_playing_snds (self): # get {sound: channel} for all playing sounds C = pg.mixer.Channel snds = {} for i in xrange(pg.mixer.get_num_channels()): c = C(i) s = c.get_sound() if s is not None: snds[s] = c def _with_channels (self, method, *base_ids, **kwargs): # call a method on matching sounds' channels # avoids code duplication in .*pause_snds() base_ids = self._get_base_ids(*base_ids, exclude=kwargs.get('exclude', False)) playing = self._get_playing_snds() all_snds = self._sounds if not base_ids: base_ids = self._sounds.keys() for base_id in base_ids: for snd, vol in all_snds[base_id]: if snd in playing: getattr(playing[snd], method)()
[docs] def pause_snds (self, *base_ids, **kwargs): """Pause sounds with the given IDs, else pause all sounds. pause_snds(*base_ids, exclude=False) :arg base_ids: any number of ``base_id`` arguments as taken by :meth:`play_snd`; if none are given, apply to all. :arg exclude: if ``True``, apply to all but those in ``base_ids``. """ self._with_channels('pause', *base_ids, exclude=kwargs.get('exclude', False))
[docs] def unpause_snds (self, *base_ids, **kwargs): """Unpause sounds with the given IDs, else unpause all sounds. unpause_snds(*base_ids, exclude=False) :arg base_ids: any number of ``base_id`` arguments as taken by :meth:`play_snd`; if none are given, apply to all. :arg exclude: if ``True``, apply to all but those in ``base_ids``. """ self._with_channels('unpause', *base_ids, exclude=kwargs.get('exclude', False))
[docs] def stop_snds (self, *base_ids, **kwargs): """Stop all playing sounds with the given IDs, else stop all sounds. stop_snds(*base_ids, exclude=False) :arg base_ids: any number of ``base_id`` arguments as taken by :meth:`play_snd`; if none are given, apply to all. :arg exclude: if ``True``, apply to all but those in ``base_ids``. """ all_snds = self._sounds for base_id in self._get_base_ids( *base_ids, exclude=kwargs.get('exclude', False) ): for snd, vol in all_snds.pop(base_id, ()): snd.stop()
[docs] def scale_volume (self, vol): """Called to scale audio volumes before using them. The result should be between ``0`` and ``1``. The default implementation does :: (exp(conf.VOLUME_SCALING * vol) - 1) / (exp(conf.VOLUME_SCALING) - 1) or no scaling if :data:`conf.VOLUME_SCALING` is ``0``. """ scale = conf.VOLUME_SCALING if scale == 0: return vol else: return (exp(scale * vol) - 1) / (exp(scale) - 1)
[docs]class Game (object): """Handles worlds. Takes the same arguments as :meth:`create_world` and passes them to it. """ def __init__ (self, *args, **kwargs): conf.GAME = self conf.RES_F = pg.display.list_modes()[0] self._quit = False self._update_again = False #: The currently running world. self.world = None #: A list of previous (nested) worlds, most 'recent' last. self.worlds = [] # load display settings #: The main Pygame surface. self.screen = None self.refresh_display() #: :class:`res.ResourceManager <engine.res.ResourceManager>` instance #: used for caching resources. self.resources = res.ResourceManager() self.resources.use(conf.DEFAULT_RESOURCE_POOL, self) self._using_pool = conf.DEFAULT_RESOURCE_POOL #: ``{name: renderer}`` dict of #: :class:`text.TextRenderer <engine.text.TextRenderer>` instances #: available for referral by name in the ``'text'`` resource loader. self.text_renderers = {} self._init_cbs() # set up music pg.mixer.music.set_endevent(conf.EVENT_ENDMUSIC) # start first world self.start_world(*args, **kwargs) def _init_cbs (self): # set up settings callbacks conf.on_change('DEFAULT_RESOURCE_POOL', self._change_resource_pool, source=self) conf.on_change('FULLSCREEN', self.refresh_display, lambda: conf.RESIZABLE, source=self) def change_res_w (): if not conf.FULLSCREEN: self.refresh_display() conf.on_change('RES_W', change_res_w, source=self) def change_res_f (): if conf.FULLSCREEN: self.refresh_display() conf.on_change('RES_F', change_res_f, source=self) def _change_resource_pool (self, new_pool): # callback: after conf.DEFAULT_RESOURCE_POOL change self.resources.drop(self._using_pool, self) self.resources.use(new_pool, self) self._using_pool = new_pool # world handling
[docs] def create_world (self, cls, *args, **kwargs): """Create a world. create_world(cls, *args, **kwargs) -> world :arg cls: the world class to instantiate; must be a :class:`World` subclass. :arg args: positional arguments to pass to the constructor. :arg kwargs: keyword arguments to pass to the constructor. :return: the created world. A world is constructed by:: cls(scheduler, evthandler, *args, **kwargs) where ``scheduler`` and ``evthandler`` are as taken by :class:`World` (and should be passed to that base class). """ scheduler = Scheduler() scheduler.add_timeout(self._update, frames=1) eh = evt.EventHandler(scheduler) eh.add( (pg.QUIT, self.quit), (pg.ACTIVEEVENT, self._active_cb), (pg.VIDEORESIZE, self._resize_cb) ) eh.load_s(conf.GAME_EVENTS) eh['_game_quit'].cb(self.quit) eh['_game_minimise'].cb(self.minimise) eh['_game_fullscreen'].cb(self._toggle_fullscreen) # instantiate class world = cls(scheduler, eh, self.resources, *args, **kwargs) scheduler.fps = conf.FPS[world.id] return world
def _select_world (self, world): """Set the given world as the current world.""" if self.world is not None: self._update_again = True self.world.scheduler.stop() self.world = world world.display.orig_sfc = self.screen world.display.dirty() # create text renderers required by this world for name, r in conf.TEXT_RENDERERS[world.id].iteritems(): if not isinstance(r, text.TextRenderer): if isinstance(r, basestring): r = (r,) r = text.TextRenderer(*r) self.text_renderers[name] = r world._select()
[docs] def start_world (self, *args, **kwargs): """Store the current world (if any) and switch to a new one. Takes a :class:`World` instance, or the same arguments as :meth:`create_world` to create a new one. :return: the new current world. """ if self.world is not None: self.worlds.append(self.world) return self.switch_world(*args, **kwargs)
[docs] def switch_world (self, world, *args, **kwargs): """End the current world and start a new one. Takes a :class:`World` instance, or the same arguments as :meth:`create_world` to create a new one. :return: the new current world. """ if not isinstance(world, World): world = self.create_world(world, *args, **kwargs) self._select_world(world) return world
[docs] def get_worlds (self, ident, current = True): """Get a list of running worlds, filtered by identifier. get_worlds(ident, current = True) -> worlds :arg ident: the world identifier (:attr:`World.id`) to look for. :arg current: include the current world in the search. :return: the world list, in order of time started, most recent last. """ worlds = [] current = [{'world': self.world}] if current else [] for data in self.worlds + current: world = data['world'] if world.id == ident: worlds.append(world) return worlds
[docs] def quit_world (self, depth = 1): """Quit the currently running world. quit_world(depth = 1) -> worlds :arg depth: quit this many (nested) worlds. :return: a list of worlds that were quit, in the order they were quit. If this quits the last (root) world, exit the game. """ if depth < 1: return [] old_world = self.world old_world.quit() if self.worlds: self._select_world(self.worlds.pop()) else: self.quit() return [old_world] + self.quit_world(depth - 1)
# display
[docs] def refresh_display (self): """Update the display mode from :mod:`conf`.""" # get resolution and flags flags = conf.FLAGS if conf.FULLSCREEN: flags |= pg.FULLSCREEN r = conf.RES_F else: w = max(conf.MIN_RES_W[0], conf.RES_W[0]) h = max(conf.MIN_RES_W[1], conf.RES_W[1]) r = (w, h) if conf.RESIZABLE: flags |= pg.RESIZABLE ratio = conf.ASPECT_RATIO if ratio is not None: # lock aspect ratio r = list(r) r[0] = min(r[0], r[1] * ratio) r[1] = min(r[1], r[0] / ratio) conf.RES = r self.screen = pg.display.set_mode(conf.RES, flags) if self.world is not None: self.world.display.dirty()
[docs] def toggle_fullscreen (self): """Toggle fullscreen mode.""" conf.FULLSCREEN = not conf.FULLSCREEN
def _toggle_fullscreen (self, *args): # callback: keyboard shortcut pressed if conf.RESIZABLE: self.toggle_fullscreen()
[docs] def minimise (self): """Minimise the display.""" pg.display.iconify()
def _active_cb (self, event): """Callback to handle window focus loss.""" if event.state == 2 and not event.gain: self.world.pause() def _resize_cb (self, event): """Callback to handle a window resize.""" conf.RES_W = (event.w, event.h) self.refresh_display() def _update (self): """Update worlds and draw.""" self._update_again = True while self._update_again: self._update_again = False self.world.evthandler.update() # if a new world was created during the above call, we'll end up # updating twice before drawing if not self._update_again: self.world._update() if self.world._handle_slowdown(): drawn = self.world.draw() # update display if drawn is True: update_display() elif drawn: if len(drawn) > 60: # empirical - faster to update everything update_display() else: update_display(drawn) return True # running
[docs] def run (self, t = None): """Main loop. run([t]) :arg t: stop after this many seconds (else run forever). """ self.resources.use(conf.DEFAULT_RESOURCE_POOL, self) self._using_pool = conf.DEFAULT_RESOURCE_POOL self._init_cbs() while not self._quit and (t is None or t > 0): t = self.world.scheduler.run(seconds = t) self.resources.drop(conf.DEFAULT_RESOURCE_POOL, self) self._using_pool = None conf.rm_cbs(self)
[docs] def quit (self): """Quit the game.""" self.world.scheduler.stop() self._quit = True
[docs] def restart (self): """Restart the game.""" global restarting restarting = True self.quit()