6 class PipeError(moe.MoeError):
7 """Failure of the Moe Pipeline."""
10 class PipelineEvent(object):
11 def __init__(self, name, virtual=False):
13 self.virtual = virtual
14 self.hooks = [] # tuples ('name', callable)
15 self.after = set() # List of event names this one comes after
16 self.before = set() # List of event names coming before this one
18 self.dependencies = set() # unfinished events this one comes 'after'
20 def add_hook(self, hook, name=None):
21 "Add `fun` as a callable hook `name` is the 'module.function' name, guessed by default."
24 raise PipeError("Cannot add hook to uninitialised event %r", self.name)
26 name = fun.__module__ + '.' + fun.__name__
27 self.hooks.append( (name, hook) )
29 def run(self, *args, **kwargs):
31 log.debug('Passed virtual event %s', self.name)
32 log.info('Running pipeline event %s', self.name)
33 for n,h in self.hooks:
34 log.debug('[%s] Running hook %s', self.name, n)
41 def __init__(self, name, skip_to = 'cleanup'):
44 self.skip_to = skip_to
49 def add_event(self, name, after=None, before=None):
50 if name in self.events:
53 raise PipeError("Reinserting event %r", name)
54 else: e.virtual = False
56 self.events[name] = PipelineEvent(name, virtual=False)
61 self.add_dependency(d, name)
64 self.add_dependency(name, d)
68 def add_dependency(first, then):
69 if first not in self.events:
70 self.events[first] = PipelineEvent(first, virtual=True)
72 if then not in self.events:
73 self.events[then] = PipelineEvent(then, virtual=True)
75 if then in self.finished:
76 raise PipeError("Cannot alter the past")
78 self[first].before.add(then)
79 self[then].after.add(first)
80 self[then].dependencies.add(first)
81 if (first not in self.finished) and (then in self.ready):
82 self.ready.remove(then)
85 "Return next event (or None if exhausted)"
87 if len(self.finished) < len(self.events):
88 raise PipeError("Not all events processed -- cyclic dependency found")
89 return None # All events processed
90 ename = self.ready.pop()
91 log.debug("[%s] processing event %r", self.name, ename)
94 raise PipeError("Schedule error: running uninitialised event")
96 self[d].dependencies.remove(ename)
99 def run(self, *args, **kwargs):
100 while e = self.get_next():
101 log.debug("Pipeline %r running %r" % (self.name, e.name))
102 e.run(*args, **kwargs)