#!/usr/bin/env python
import bisect
+
import moe
+from moe.logs import *
class MoePipeError(moe.MoeError):
"""Failure of the MoePipeline."""
class MoeAbortPipeline(Exception):
- def __init__(self, skip_to=999):
+ def __init__(self, skip_to=None):
self.skip_to = skip_to
+
class Pipeline:
"""Moe pipeline."""
- def __init__(self, e, name, skip_to = 70):
+ def __init__(self, name, skip_to = 700):
# e is Eval
- self.e = e
self.pipe = []
self.index = -1
self.name = name
self.skip_to = skip_to
- def insert(self, pri, name, fun):
- "Insert callable `fun` to time `pri`, `name` is only informative."
+ def insert(self, pri, fun, desc='', name=None):
+ """Insert callable `fun` to time `pri`,
+ `desc` is optional description,
+ `name` is the "module.function" name, guessed by default.
+ """
assert(isinstance(pri, int))
assert(callable(fun))
- triple = (pri, name, fun)
+ if name is None:
+ name = fun.__module__ + '.' + fun.__name__
+ if desc:
+ desc += ' '
+ desc += '[' + name + ']'
+
+ triple = (pri, desc, fun)
pos = bisect.bisect(self.pipe, triple)
if pos <= self.index:
raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \
"""
l=["%s >>> Pipeline %s" % (prefix, self.name)]
for pri, name, fun in self.pipe:
- l.append("%s% 3d %s, %s" % (prefix, pri, name, fun))
+ l.append("%s% 3d %s" % (prefix, pri, name))
return l
def run(self, *args, **kwargs):
while self.index < len(self.pipe):
(pri,name,fun) = self.pipe[self.index]
if pri >= min_pri:
- self.e.log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
+ log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
try:
fun(*args, **kwargs)
except MoeAbortPipeline, err:
min_pri = self.skip_to
else:
- self.e.log.debug("Pipeline %r:d skipping: %s" % (self.name, pri, name))
+ log.debug("Pipeline %r:d skipping: %s" % (self.name, pri, name))
self.index += 1
self.index = -1
- self.e.log.debug("Pipeline %r finished" % (self.name))
+ log.debug("Pipeline %r finished" % (self.name))