6 class MoePipeError(moe.MoeError):
7 """Failure of the MoePipeline."""
9 class MoeAbortPipeline(Exception):
11 def __init__(self, skip_to=999):
12 self.skip_to = skip_to
17 def __init__(self, e, name, skip_to = 70):
23 self.skip_to = skip_to
25 def insert(self, pri, fun, desc='', name=None):
26 """Insert callable `fun` to time `pri`,
27 `desc` is optional description,
28 `name` is the "module.function" name, guessed by default.
30 assert(isinstance(pri, int))
33 name = fun.__module__ + '.' + fun.__name__
36 desc += '[' + name + ']'
38 triple = (pri, desc, fun)
39 pos = bisect.bisect(self.pipe, triple)
41 raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \
42 % (self.name, self.index, pri)
43 self.pipe.insert(pos, triple)
45 def dump(self, prefix=""):
47 Debugging dump of the pipe.
48 Returns a list of lines.
50 l=["%s >>> Pipeline %s" % (prefix, self.name)]
51 for pri, name, fun in self.pipe:
52 l.append("%s% 3d %s" % (prefix, pri, name))
55 def run(self, *args, **kwargs):
58 while self.index < len(self.pipe):
59 (pri,name,fun) = self.pipe[self.index]
61 self.e.log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
64 except MoeAbortPipeline, err:
65 min_pri = self.skip_to
67 self.e.log.debug("Pipeline %r:d skipping: %s" % (self.name, pri, name))
70 self.e.log.debug("Pipeline %r finished" % (self.name))