From: Tomas Gavenciak Date: Tue, 11 Jan 2011 18:00:44 +0000 (+0100) Subject: Started rewrite to dependency based pipelines X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=a7d14763379058e1d2b47950623dce641a0b99cc;p=eval.git Started rewrite to dependency based pipelines --- diff --git a/t/moe/pipeline.py b/t/moe/pipeline.py index 1b2d98a..f97b703 100644 --- a/t/moe/pipeline.py +++ b/t/moe/pipeline.py @@ -3,45 +3,96 @@ import bisect import moe -class MoePipeError(moe.MoeError): - """Failure of the MoePipeline.""" +class PipeError(moe.MoeError): + """Failure of the Moe Pipeline.""" -class MoeAbortPipeline(Exception): - def __init__(self, skip_to=999): - self.skip_to = skip_to +class PipelineEvent(object): + def __init__(self, name, virtual=False): + self.name = name + self.virtual = virtual + self.hooks = [] # tuples ('name', callable) + self.after = set() # List of event names this one comes after + self.before = set() # List of event names coming before this one + # Per run: + self.dependencies = set() # unfinished events this one comes 'after' + + def add_hook(self, hook, name=None): + "Add `fun` as a callable hook `name` is the 'module.function' name, guessed by default." + assert(callable(fun)) + if self.virtual: + raise PipeError("Cannot add hook to uninitialised event %r", self.name) + if name is None: + name = fun.__module__ + '.' + fun.__name__ + self.hooks.append( (name, hook) ) + def run(self, *args, **kwargs): + if not self.hooks: + log.debug('Passed virtual event %s', self.name) + log.info('Running pipeline event %s', self.name) + for n,h in self.hooks: + log.debug('[%s] Running hook %s', self.name, n) + h( *args, **kwargs) + + class Pipeline: """Moe pipeline.""" - def __init__(self, e, name, skip_to = 70): - # e is Eval - self.e = e - self.pipe = [] - self.index = -1 + def __init__(self, name, skip_to = 'cleanup'): self.name = name + self.events = {} self.skip_to = skip_to + # per-run: + self.ready = set() + self.finished = set() - def insert(self, pri, fun, desc='', name=None): - """Insert callable `fun` to time `pri`, - `desc` is optional description, - `name` is the "module.function" name, guessed by default. - """ - assert(isinstance(pri, int)) - assert(callable(fun)) - if name is None: - name = fun.__module__ + '.' + fun.__name__ - if desc: - desc += ' ' - desc += '[' + name + ']' + def add_event(self, name, after=None, before=None): + if name in self.events: + e = self[name] + if not e.virtual: + raise PipeError("Reinserting event %r", name) + else: e.virtual = False + else: + self.events[name] = PipelineEvent(name, virtual=False) + self.ready.add(name) - triple = (pri, desc, fun) - pos = bisect.bisect(self.pipe, triple) - if pos <= self.index: - raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \ - % (self.name, self.index, pri) - self.pipe.insert(pos, triple) + if after: + for d in after: + self.add_dependency(d, name) + if before: + for d in before: + self.add_dependency(name, d) + return self[name] + + def add_dependency(first, then): + if first not in self.events: + self.events[first] = PipelineEvent(first, virtual=True) + self.ready.add(first) + if then not in self.events: + self.events[then] = PipelineEvent(then, virtual=True) + self.ready.add(then) + if then in self.finished: + raise PipeError("Cannot alter the past") + + self[first].before.add(then) + self[then].after.add(first) + self[then].dependencies.add(first) + if (first not in self.finished) and (then in self.ready): + self.ready.remove(then) + + def run_next(self, *args, **kwargs): + if not self.ready: + if len(self.finished) < len(self.events): + raise PipeError("Not all events processed -- cyclic dependency found") + raise PipeError("All events processed") + ename = self.ready.pop() + log.debug("[%s] processing event %r", self.name, ename) + e = self[ename] + if e.virtual: + raise PipeError() + for d in e.before: + self[ def dump(self, prefix=""): """ Debugging dump of the pipe. @@ -53,12 +104,7 @@ class Pipeline: return l def run(self, *args, **kwargs): - self.index = 0 - min_pri = -1 - while self.index < len(self.pipe): - (pri,name,fun) = self.pipe[self.index] - if pri >= min_pri: - self.e.log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name)) + log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name)) try: fun(*args, **kwargs) except MoeAbortPipeline, err: