6 class MoePipeError(moe.MoeError):
7 """Failure of the MoePipeline."""
9 class MoeAbortPipeline(Exception):
11 def __init__(self, skip_to=999):
12 self.skip_to = skip_to
17 def __init__(self, e, name, skip_to = 70):
23 self.skip_to = skip_to
25 def insert(self, pri, name, fun):
26 "Insert callable `fun` to time `pri`, `name` is only informative."
27 assert(isinstance(pri, int))
29 triple = (pri, name, fun)
30 pos = bisect.bisect(self.pipe, triple)
32 raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \
33 % (self.name, self.index, pri)
34 self.pipe.insert(pos, triple)
36 def dump(self, prefix=""):
38 Debugging dump of the pipe.
39 Returns a list of lines.
41 l=["%s >>> Pipeline %s" % (prefix, self.name)]
42 for pri, name, fun in self.pipe:
43 l.append("%s% 3d %s, %s" % (prefix, pri, name, fun))
46 def run(self, *args, **kwargs):
49 while self.index < len(self.pipe):
50 (pri,name,fun) = self.pipe[self.index]
52 self.e.log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
55 except MoeAbortPipeline, err:
56 min_pri = self.skip_to
58 self.e.log.debug("Pipeline %r:d skipping: %s" % (self.name, pri, name))
61 self.e.log.debug("Pipeline %r finished" % (self.name))