X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;ds=sidebyside;f=t%2Fmoe%2Fpipeline.py;h=e8138200461b943dd0c7b5007f6b51180321d18f;hb=878693c0f1a0a4cee1ae0478e762b4c57dc44df3;hp=bc407a2e18cc21c47e5547b5a59dcb11c3c1e4ae;hpb=f9b0b2c89eca1c93e70e80c8c7f7775a65fdccbb;p=moe.git diff --git a/t/moe/pipeline.py b/t/moe/pipeline.py index bc407a2..e813820 100644 --- a/t/moe/pipeline.py +++ b/t/moe/pipeline.py @@ -1,58 +1,62 @@ #!/usr/bin/env python -import sys import bisect -import imp import moe -import moe.log -class MoePipeError(moe.MoeErr): +class MoePipeError(moe.MoeError): """Failure of the MoePipeline.""" -class MoePipeline: +class MoeAbortPipeline(Exception): + + def __init__(self, skip_to=999): + self.skip_to = skip_to + +class Pipeline: """Moe pipeline.""" - def __init__(self, name): + def __init__(self, e, name, skip_to = 70): + # e is Eval + self.e = e self.pipe = [] self.index = -1 self.name = name + self.skip_to = skip_to def insert(self, pri, name, fun): - triple = (pri,name,fun) + "Insert callable `fun` to time `pri`, `name` is only informative." + assert(isinstance(pri, int)) + assert(callable(fun)) + triple = (pri, name, fun) pos = bisect.bisect(self.pipe, triple) if pos <= self.index: - raise MoePipeError, "Pipeline insert cannot alter the past" + raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \ + % (self.name, self.index, pri) self.pipe.insert(pos, triple) - def dump(self, file=sys.stdout, prefix=""): - file.write(">>> Pipeline %s\n" % self.name) - for pri,name,fun in self.pipe: - file.write("%s%03d %s\n" % (prefix,pri,name)) - - def run(self, *args): + def dump(self, prefix=""): + """ + Debugging dump of the pipe. + Returns a list of lines. + """ + l=["%s >>> Pipeline %s" % (prefix, self.name)] + for pri, name, fun in self.pipe: + l.append("%s% 3d %s, %s" % (prefix, pri, name, fun)) + return l + + def run(self, *args, **kwargs): self.index = 0 + min_pri = -1 while self.index < len(self.pipe): (pri,name,fun) = self.pipe[self.index] - moe.log.default.verbose(">> Running %s:%s\n" % (self.name,name)) - fun(*args) + if pri >= min_pri: + self.e.log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name)) + try: + fun(*args, **kwargs) + except MoeAbortPipeline, err: + min_pri = self.skip_to + else: + self.e.log.debug("Pipeline %r:d skipping: %s" % (self.name, pri, name)) self.index += 1 self.index = -1 + self.e.log.debug("Pipeline %r finished" % (self.name)) - def add_hook(self, name): - modname = "moe.hooks." + name - moe.log.default.verbose(">> Loading hook %s\n" % name) - if not sys.modules.has_key(modname): - ## FIXME: Configuration variable for the hook directory? - try: - fp, path, desc = imp.find_module(name, ["moe/hooks"]) - except ImportError: - raise MoePipeError, "Cannot find hook module " + modname - try: - imp.load_module(modname, fp, path, desc) - finally: - fp.close() - sys.modules[modname].init(self) - - def configure(self, names): - for name in names.split(): - self.add_hook(name)