--- /dev/null
--- /dev/null
++#!/usr/bin/env python
++
++import bisect
++import moe
++
++class PipeError(moe.MoeError):
++ """Failure of the Moe Pipeline."""
++
++
++class PipelineEvent(object):
++ def __init__(self, name, virtual=False):
++ self.name = name
++ self.virtual = virtual
++ self.hooks = [] # tuples ('name', callable)
++ self.after = set() # List of event names this one comes after
++ self.before = set() # List of event names coming before this one
++ # Per run:
++ self.dependencies = set() # unfinished events this one comes 'after'
++
++ def add_hook(self, hook, name=None):
++ "Add `fun` as a callable hook `name` is the 'module.function' name, guessed by default."
++ assert(callable(fun))
++ if self.virtual:
++ raise PipeError("Cannot add hook to uninitialised event %r", self.name)
++ if name is None:
++ name = fun.__module__ + '.' + fun.__name__
++ self.hooks.append( (name, hook) )
++
++ def run(self, *args, **kwargs):
++ if not self.hooks:
++ log.debug('Passed virtual event %s', self.name)
++ log.info('Running pipeline event %s', self.name)
++ for n,h in self.hooks:
++ log.debug('[%s] Running hook %s', self.name, n)
++ h( *args, **kwargs)
++
++
++class Pipeline:
++ """Moe pipeline."""
++
++ def __init__(self, name, skip_to = 'cleanup'):
++ self.name = name
++ self.events = {}
++ self.skip_to = skip_to
++ # per-run:
++ self.ready = set()
++ self.finished = set()
++
++ def add_event(self, name, after=None, before=None):
++ if name in self.events:
++ e = self[name]
++ if not e.virtual:
++ raise PipeError("Reinserting event %r", name)
++ else: e.virtual = False
++ else:
++ self.events[name] = PipelineEvent(name, virtual=False)
++ self.ready.add(name)
++
++ if after:
++ for d in after:
++ self.add_dependency(d, name)
++ if before:
++ for d in before:
++ self.add_dependency(name, d)
++
++ return self[name]
++
++ def add_dependency(first, then):
++ if first not in self.events:
++ self.events[first] = PipelineEvent(first, virtual=True)
++ self.ready.add(first)
++ if then not in self.events:
++ self.events[then] = PipelineEvent(then, virtual=True)
++ self.ready.add(then)
++ if then in self.finished:
++ raise PipeError("Cannot alter the past")
++
++ self[first].before.add(then)
++ self[then].after.add(first)
++ self[then].dependencies.add(first)
++ if (first not in self.finished) and (then in self.ready):
++ self.ready.remove(then)
++
++ def get_next(self):
++ "Return next event (or None if exhausted)"
++ if not self.ready:
++ if len(self.finished) < len(self.events):
++ raise PipeError("Not all events processed -- cyclic dependency found")
++ return None # All events processed
++ ename = self.ready.pop()
++ log.debug("[%s] processing event %r", self.name, ename)
++ e = self[ename]
++ if e.virtual:
++ raise PipeError("Schedule error: running uninitialised event")
++ for d in e.before:
++ self[d].dependencies.remove(ename)
++ return e
++
++ def run(self, *args, **kwargs):
++ while e = self.get_next():
++ log.debug("Pipeline %r running %r" % (self.name, e.name))
++ e.run(*args, **kwargs)