import bisect
import moe
-class MoePipeError(moe.MoeError):
- """Failure of the MoePipeline."""
+class PipeError(moe.MoeError):
+ """Failure of the Moe Pipeline."""
-class MoeAbortPipeline(Exception):
- def __init__(self, skip_to=999):
- self.skip_to = skip_to
+class PipelineEvent(object):
+ def __init__(self, name, virtual=False):
+ self.name = name
+ self.virtual = virtual
+ self.hooks = [] # tuples ('name', callable)
+ self.after = set() # List of event names this one comes after
+ self.before = set() # List of event names coming before this one
+ # Per run:
+ self.dependencies = set() # unfinished events this one comes 'after'
+
+ def add_hook(self, hook, name=None):
+ "Add `fun` as a callable hook `name` is the 'module.function' name, guessed by default."
+ assert(callable(fun))
+ if self.virtual:
+ raise PipeError("Cannot add hook to uninitialised event %r", self.name)
+ if name is None:
+ name = fun.__module__ + '.' + fun.__name__
+ self.hooks.append( (name, hook) )
+ def run(self, *args, **kwargs):
+ if not self.hooks:
+ log.debug('Passed virtual event %s', self.name)
+ log.info('Running pipeline event %s', self.name)
+ for n,h in self.hooks:
+ log.debug('[%s] Running hook %s', self.name, n)
+ h( *args, **kwargs)
+
+
class Pipeline:
"""Moe pipeline."""
- def __init__(self, e, name, skip_to = 70):
- # e is Eval
- self.e = e
- self.pipe = []
- self.index = -1
+ def __init__(self, name, skip_to = 'cleanup'):
self.name = name
+ self.events = {}
self.skip_to = skip_to
+ # per-run:
+ self.ready = set()
+ self.finished = set()
- def insert(self, pri, fun, desc='', name=None):
- """Insert callable `fun` to time `pri`,
- `desc` is optional description,
- `name` is the "module.function" name, guessed by default.
- """
- assert(isinstance(pri, int))
- assert(callable(fun))
- if name is None:
- name = fun.__module__ + '.' + fun.__name__
- if desc:
- desc += ' '
- desc += '[' + name + ']'
+ def add_event(self, name, after=None, before=None):
+ if name in self.events:
+ e = self[name]
+ if not e.virtual:
+ raise PipeError("Reinserting event %r", name)
+ else: e.virtual = False
+ else:
+ self.events[name] = PipelineEvent(name, virtual=False)
+ self.ready.add(name)
- triple = (pri, desc, fun)
- pos = bisect.bisect(self.pipe, triple)
- if pos <= self.index:
- raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \
- % (self.name, self.index, pri)
- self.pipe.insert(pos, triple)
+ if after:
+ for d in after:
+ self.add_dependency(d, name)
+ if before:
+ for d in before:
+ self.add_dependency(name, d)
+ return self[name]
+
+ def add_dependency(first, then):
+ if first not in self.events:
+ self.events[first] = PipelineEvent(first, virtual=True)
+ self.ready.add(first)
+ if then not in self.events:
+ self.events[then] = PipelineEvent(then, virtual=True)
+ self.ready.add(then)
+ if then in self.finished:
+ raise PipeError("Cannot alter the past")
+
+ self[first].before.add(then)
+ self[then].after.add(first)
+ self[then].dependencies.add(first)
+ if (first not in self.finished) and (then in self.ready):
+ self.ready.remove(then)
+
+ def run_next(self, *args, **kwargs):
+ if not self.ready:
+ if len(self.finished) < len(self.events):
+ raise PipeError("Not all events processed -- cyclic dependency found")
+ raise PipeError("All events processed")
+ ename = self.ready.pop()
+ log.debug("[%s] processing event %r", self.name, ename)
+ e = self[ename]
+ if e.virtual:
+ raise PipeError()
+ for d in e.before:
+ self[
def dump(self, prefix=""):
"""
Debugging dump of the pipe.
return l
def run(self, *args, **kwargs):
- self.index = 0
- min_pri = -1
- while self.index < len(self.pipe):
- (pri,name,fun) = self.pipe[self.index]
- if pri >= min_pri:
- self.e.log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
+ log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
try:
fun(*args, **kwargs)
except MoeAbortPipeline, err: