]> mj.ucw.cz Git - moe.git/blob - t/moe/pipeline.py
Rewrite docs of config
[moe.git] / t / moe / pipeline.py
1 #!/usr/bin/env python
2
3 import bisect
4 import moe
5
6 class MoePipeError(moe.MoeError):
7     """Failure of the MoePipeline."""
8
9 class MoeAbortPipeline(Exception):
10
11     def __init__(self, skip_to=999):
12         self.skip_to = skip_to
13
14 class Pipeline:
15     """Moe pipeline."""
16
17     def __init__(self, e, name, skip_to = 70):
18         # e is Eval
19         self.e = e
20         self.pipe = []
21         self.index = -1
22         self.name = name
23         self.skip_to = skip_to
24
25     def insert(self, pri, fun, desc='', name=None):
26         """Insert callable `fun` to time `pri`, 
27         `desc` is optional description, 
28         `name` is the "module.function" name, guessed by default.
29         """
30         assert(isinstance(pri, int))
31         assert(callable(fun))
32         if name is None:
33             name = fun.__module__ + '.' + fun.__name__
34         if desc:
35             desc += ' '
36         desc += '[' + name + ']'
37
38         triple = (pri, desc, fun)
39         pos = bisect.bisect(self.pipe, triple)
40         if pos <= self.index:
41             raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \
42                 % (self.name, self.index, pri)
43         self.pipe.insert(pos, triple)
44
45     def dump(self, prefix=""):
46         """
47         Debugging dump of the pipe.
48         Returns a list of lines.
49         """
50         l=["%s >>> Pipeline %s" % (prefix, self.name)]
51         for pri, name, fun in self.pipe:
52             l.append("%s% 3d %s" % (prefix, pri, name))
53         return l
54
55     def run(self, *args, **kwargs):
56         self.index = 0
57         min_pri = -1
58         while self.index < len(self.pipe):
59             (pri,name,fun) = self.pipe[self.index]
60             if pri >= min_pri:
61                 self.e.log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
62                 try:
63                     fun(*args, **kwargs)
64                 except MoeAbortPipeline, err:
65                     min_pri = self.skip_to
66             else:
67                 self.e.log.debug("Pipeline %r:d skipping: %s" % (self.name, pri, name))
68             self.index += 1
69         self.index = -1
70         self.e.log.debug("Pipeline %r finished" % (self.name))
71