]> mj.ucw.cz Git - moe.git/blob - t/moe/pipeline.py
df4b229a1000f3a410cd06569e1343e43283bfdc
[moe.git] / t / moe / pipeline.py
1 #!/usr/bin/env python
2
3 import bisect
4
5 import moe
6 from moe.logs import *
7
8 class MoePipeError(moe.MoeError):
9     """Failure of the MoePipeline."""
10
11 class MoeAbortPipeline(Exception):
12
13     def __init__(self, skip_to=None):
14         self.skip_to = skip_to
15
16
17 class Pipeline:
18     """Moe pipeline."""
19
20     def __init__(self, name, skip_to = 700):
21         # e is Eval
22         self.pipe = []
23         self.index = -1
24         self.name = name
25         self.skip_to = skip_to
26
27     def insert(self, pri, fun, desc='', name=None):
28         """Insert callable `fun` to time `pri`, 
29         `desc` is optional description, 
30         `name` is the "module.function" name, guessed by default.
31         """
32         assert(isinstance(pri, int))
33         assert(callable(fun))
34         if name is None:
35             name = fun.__module__ + '.' + fun.__name__
36         if desc:
37             desc += ' '
38         desc += '[' + name + ']'
39
40         triple = (pri, desc, fun)
41         pos = bisect.bisect(self.pipe, triple)
42         if pos <= self.index:
43             raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \
44                 % (self.name, self.index, pri)
45         self.pipe.insert(pos, triple)
46
47     def dump(self, prefix=""):
48         """
49         Debugging dump of the pipe.
50         Returns a list of lines.
51         """
52         l=["%s >>> Pipeline %s" % (prefix, self.name)]
53         for pri, name, fun in self.pipe:
54             l.append("%s% 3d %s" % (prefix, pri, name))
55         return l
56
57     def run(self, *args, **kwargs):
58         self.index = 0
59         min_pri = -1
60         while self.index < len(self.pipe):
61             (pri,name,fun) = self.pipe[self.index]
62             if pri >= min_pri:
63                 log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
64                 try:
65                     fun(*args, **kwargs)
66                 except MoeAbortPipeline, err:
67                     min_pri = self.skip_to
68             else:
69                 log.debug("Pipeline %r:d skipping: %s" % (self.name, pri, name))
70             self.index += 1
71         self.index = -1
72         log.debug("Pipeline %r finished" % (self.name))
73