8 class MoePipeError(moe.MoeError):
9 """Failure of the MoePipeline."""
11 class MoeAbortPipeline(Exception):
13 def __init__(self, skip_to=None):
14 self.skip_to = skip_to
20 def __init__(self, name, skip_to = 700):
25 self.skip_to = skip_to
27 def insert(self, pri, fun, desc='', name=None):
28 """Insert callable `fun` to time `pri`,
29 `desc` is optional description,
30 `name` is the "module.function" name, guessed by default.
32 assert(isinstance(pri, int))
35 name = fun.__module__ + '.' + fun.__name__
38 desc += '[' + name + ']'
40 triple = (pri, desc, fun)
41 pos = bisect.bisect(self.pipe, triple)
43 raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \
44 % (self.name, self.index, pri)
45 self.pipe.insert(pos, triple)
47 def dump(self, prefix=""):
49 Debugging dump of the pipe.
50 Returns a list of lines.
52 l=["%s >>> Pipeline %s" % (prefix, self.name)]
53 for pri, name, fun in self.pipe:
54 l.append("%s% 3d %s" % (prefix, pri, name))
57 def run(self, *args, **kwargs):
60 while self.index < len(self.pipe):
61 (pri,name,fun) = self.pipe[self.index]
63 log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
66 except MoeAbortPipeline, err:
67 min_pri = self.skip_to
69 log.debug("Pipeline %r:d skipping: %s" % (self.name, pri, name))
72 log.debug("Pipeline %r finished" % (self.name))