]> mj.ucw.cz Git - moe.git/blob - t/moe/pipeline-deps.py
Merge branch 'python-newpipe' into python
[moe.git] / t / moe / pipeline-deps.py
1 #!/usr/bin/env python
2
3 import bisect
4 import moe
5
6 class PipeError(moe.MoeError):
7     """Failure of the Moe Pipeline."""
8
9
10 class PipelineEvent(object):
11     def __init__(self, name, virtual=False):
12         self.name = name
13         self.virtual = virtual
14         self.hooks = [] # tuples ('name', callable)
15         self.after = set() # List of event names this one comes after
16         self.before = set() # List of event names coming before this one
17         # Per run: 
18         self.dependencies = set() # unfinished events this one comes 'after'
19
20     def add_hook(self, hook, name=None):
21         "Add `fun` as a callable hook `name` is the 'module.function' name, guessed by default."
22         assert(callable(fun))
23         if self.virtual:
24             raise PipeError("Cannot add hook to uninitialised event %r", self.name)
25         if name is None:
26             name = fun.__module__ + '.' + fun.__name__
27         self.hooks.append( (name, hook) )
28
29     def run(self, *args, **kwargs):
30         if not self.hooks:
31             log.debug('Passed virtual event %s', self.name)
32         log.info('Running pipeline event %s', self.name)
33         for n,h in self.hooks:
34             log.debug('[%s] Running hook %s', self.name, n)
35             h( *args, **kwargs)
36         
37     
38 class Pipeline:
39     """Moe pipeline."""
40
41     def __init__(self, name, skip_to = 'cleanup'):
42         self.name = name
43         self.events = {}
44         self.skip_to = skip_to
45         # per-run:
46         self.ready = set()
47         self.finished = set()
48
49     def add_event(self, name, after=None, before=None):
50         if name in self.events:
51             e = self[name]
52             if not e.virtual:
53                 raise PipeError("Reinserting event %r", name)
54             else: e.virtual = False
55         else:
56             self.events[name] = PipelineEvent(name, virtual=False)
57             self.ready.add(name)
58
59         if after:
60             for d in after:
61                 self.add_dependency(d, name)
62         if before:
63             for d in before:
64                 self.add_dependency(name, d)
65
66         return self[name]
67             
68     def add_dependency(first, then):
69         if first not in self.events:
70             self.events[first] = PipelineEvent(first, virtual=True)
71             self.ready.add(first)
72         if then not in self.events:
73             self.events[then] = PipelineEvent(then, virtual=True)
74             self.ready.add(then)
75         if then in self.finished:
76             raise PipeError("Cannot alter the past")
77         
78         self[first].before.add(then)
79         self[then].after.add(first)
80         self[then].dependencies.add(first)
81         if (first not in self.finished) and (then in self.ready):
82             self.ready.remove(then)
83       
84     def get_next(self):
85         "Return next event (or None if exhausted)"
86         if not self.ready:
87             if len(self.finished) < len(self.events):
88                 raise PipeError("Not all events processed -- cyclic dependency found")
89             return None # All events processed
90         ename = self.ready.pop()
91         log.debug("[%s] processing event %r", self.name, ename)
92         e = self[ename]
93         if e.virtual:
94             raise PipeError("Schedule error: running uninitialised event")
95         for d in e.before:
96             self[d].dependencies.remove(ename)
97         return e
98
99     def run(self, *args, **kwargs):
100         while e = self.get_next():
101             log.debug("Pipeline %r running %r" % (self.name, e.name))
102             e.run(*args, **kwargs)