]> mj.ucw.cz Git - eval.git/blob - t/moe/pipeline.py
Many changes to pipeline.py
[eval.git] / t / moe / pipeline.py
1 #!/usr/bin/env python
2
3 import bisect
4 import moe
5
6 class MoePipeError(moe.MoeError):
7     """Failure of the MoePipeline."""
8
9 class MoeAbortPipeline(Exception):
10
11     def __init__(self, skip_to=999):
12         self.skip_to = skip_to
13
14 class Pipeline:
15     """Moe pipeline."""
16
17     def __init__(self, e, name, skip_to = 70):
18         # e is Eval
19         self.e = e
20         self.pipe = []
21         self.index = -1
22         self.name = name
23         self.skip_to = skip_to
24
25     def insert(self, pri, name, fun):
26         "Insert callable `fun` to time `pri`, `name` is only informative."
27         assert(isinstance(pri, int))
28         assert(callable(fun))
29         triple = (pri, name, fun)
30         pos = bisect.bisect(self.pipe, triple)
31         if pos <= self.index:
32             raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \
33                 % (self.name, self.index, pri)
34         self.pipe.insert(pos, triple)
35
36     def dump(self, prefix=""):
37         """
38         Debugging dump of the pipe.
39         Returns a list of lines.
40         """
41         l=["%s >>> Pipeline %s\n" % (prefix, self.name)]
42         for pri, name, fun in self.pipe:
43             l.append("%s% 3d %s, %s\n" % (prefix, pri, name, fun))
44
45     def run(self, *args, **kwargs):
46         self.index = 0
47         min_pri = -1
48         while self.index < len(self.pipe):
49             (pri,name,fun) = self.pipe[self.index]
50             if pri >= min_pri:
51                 self.e.log.debug("Pipeline %r:%d running: %s\n" % (self.name, pri, name))
52                 try:
53                     fun(*args, **kwargs)
54                 except MoeAbortPipeline, err:
55                     min_pri = self.skip_to
56             else:
57                 self.e.log.debug("Pipeline %r:d skipping: %s\n" % (self.name, pri, name))
58             self.index += 1
59         self.index = -1
60         self.e.log.debug("Pipeline %r finished\n" % (self.name))
61