From: Tomas Gavenciak Date: Wed, 12 Jan 2011 13:54:26 +0000 (+0100) Subject: Merge branch 'python-newpipe' into python X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=976c301650130b3c6c7aa0df83e8c29ea2ffb650;p=moe.git Merge branch 'python-newpipe' into python Attempt at order-defined pipelines --- 976c301650130b3c6c7aa0df83e8c29ea2ffb650 diff --cc t/moe/pipeline-deps.py index 0000000,0000000..9b41691 new file mode 100644 --- /dev/null +++ b/t/moe/pipeline-deps.py @@@ -1,0 -1,0 +1,102 @@@ ++#!/usr/bin/env python ++ ++import bisect ++import moe ++ ++class PipeError(moe.MoeError): ++ """Failure of the Moe Pipeline.""" ++ ++ ++class PipelineEvent(object): ++ def __init__(self, name, virtual=False): ++ self.name = name ++ self.virtual = virtual ++ self.hooks = [] # tuples ('name', callable) ++ self.after = set() # List of event names this one comes after ++ self.before = set() # List of event names coming before this one ++ # Per run: ++ self.dependencies = set() # unfinished events this one comes 'after' ++ ++ def add_hook(self, hook, name=None): ++ "Add `fun` as a callable hook `name` is the 'module.function' name, guessed by default." ++ assert(callable(fun)) ++ if self.virtual: ++ raise PipeError("Cannot add hook to uninitialised event %r", self.name) ++ if name is None: ++ name = fun.__module__ + '.' + fun.__name__ ++ self.hooks.append( (name, hook) ) ++ ++ def run(self, *args, **kwargs): ++ if not self.hooks: ++ log.debug('Passed virtual event %s', self.name) ++ log.info('Running pipeline event %s', self.name) ++ for n,h in self.hooks: ++ log.debug('[%s] Running hook %s', self.name, n) ++ h( *args, **kwargs) ++ ++ ++class Pipeline: ++ """Moe pipeline.""" ++ ++ def __init__(self, name, skip_to = 'cleanup'): ++ self.name = name ++ self.events = {} ++ self.skip_to = skip_to ++ # per-run: ++ self.ready = set() ++ self.finished = set() ++ ++ def add_event(self, name, after=None, before=None): ++ if name in self.events: ++ e = self[name] ++ if not e.virtual: ++ raise PipeError("Reinserting event %r", name) ++ else: e.virtual = False ++ else: ++ self.events[name] = PipelineEvent(name, virtual=False) ++ self.ready.add(name) ++ ++ if after: ++ for d in after: ++ self.add_dependency(d, name) ++ if before: ++ for d in before: ++ self.add_dependency(name, d) ++ ++ return self[name] ++ ++ def add_dependency(first, then): ++ if first not in self.events: ++ self.events[first] = PipelineEvent(first, virtual=True) ++ self.ready.add(first) ++ if then not in self.events: ++ self.events[then] = PipelineEvent(then, virtual=True) ++ self.ready.add(then) ++ if then in self.finished: ++ raise PipeError("Cannot alter the past") ++ ++ self[first].before.add(then) ++ self[then].after.add(first) ++ self[then].dependencies.add(first) ++ if (first not in self.finished) and (then in self.ready): ++ self.ready.remove(then) ++ ++ def get_next(self): ++ "Return next event (or None if exhausted)" ++ if not self.ready: ++ if len(self.finished) < len(self.events): ++ raise PipeError("Not all events processed -- cyclic dependency found") ++ return None # All events processed ++ ename = self.ready.pop() ++ log.debug("[%s] processing event %r", self.name, ename) ++ e = self[ename] ++ if e.virtual: ++ raise PipeError("Schedule error: running uninitialised event") ++ for d in e.before: ++ self[d].dependencies.remove(ename) ++ return e ++ ++ def run(self, *args, **kwargs): ++ while e = self.get_next(): ++ log.debug("Pipeline %r running %r" % (self.name, e.name)) ++ e.run(*args, **kwargs)