]> mj.ucw.cz Git - eval.git/commitdiff
Started rewrite to dependency based pipelines
authorTomas Gavenciak <gavento@ucw.cz>
Tue, 11 Jan 2011 18:00:44 +0000 (19:00 +0100)
committerTomas Gavenciak <gavento@ucw.cz>
Tue, 11 Jan 2011 18:00:44 +0000 (19:00 +0100)
t/moe/pipeline.py

index 1b2d98a581fbf9e24a94120e273ec27a5d520900..f97b703e914d7eff921abf1e1fea9b0f1af8d4b8 100644 (file)
@@ -3,45 +3,96 @@
 import bisect
 import moe
 
-class MoePipeError(moe.MoeError):
-    """Failure of the MoePipeline."""
+class PipeError(moe.MoeError):
+    """Failure of the Moe Pipeline."""
 
-class MoeAbortPipeline(Exception):
 
-    def __init__(self, skip_to=999):
-       self.skip_to = skip_to
+class PipelineEvent(object):
+    def __init__(self, name, virtual=False):
+       self.name = name
+       self.virtual = virtual
+       self.hooks = [] # tuples ('name', callable)
+       self.after = set() # List of event names this one comes after
+       self.before = set() # List of event names coming before this one
+       # Per run: 
+       self.dependencies = set() # unfinished events this one comes 'after'
+
+    def add_hook(self, hook, name=None):
+       "Add `fun` as a callable hook `name` is the 'module.function' name, guessed by default."
+       assert(callable(fun))
+       if self.virtual:
+           raise PipeError("Cannot add hook to uninitialised event %r", self.name)
+       if name is None:
+           name = fun.__module__ + '.' + fun.__name__
+       self.hooks.append( (name, hook) )
 
+    def run(self, *args, **kwargs):
+       if not self.hooks:
+           log.debug('Passed virtual event %s', self.name)
+       log.info('Running pipeline event %s', self.name)
+       for n,h in self.hooks:
+           log.debug('[%s] Running hook %s', self.name, n)
+           h( *args, **kwargs)
+       
+    
 class Pipeline:
     """Moe pipeline."""
 
-    def __init__(self, e, name, skip_to = 70):
-       # e is Eval
-       self.e = e
-        self.pipe = []
-       self.index = -1
+    def __init__(self, name, skip_to = 'cleanup'):
        self.name = name
+        self.events = {}
        self.skip_to = skip_to
+       # per-run:
+       self.ready = set()
+       self.finished = set()
 
-    def insert(self, pri, fun, desc='', name=None):
-       """Insert callable `fun` to time `pri`, 
-       `desc` is optional description, 
-       `name` is the "module.function" name, guessed by default.
-       """
-       assert(isinstance(pri, int))
-       assert(callable(fun))
-       if name is None:
-           name = fun.__module__ + '.' + fun.__name__
-       if desc:
-           desc += ' '
-       desc += '[' + name + ']'
+    def add_event(self, name, after=None, before=None):
+       if name in self.events:
+           e = self[name]
+           if not e.virtual:
+               raise PipeError("Reinserting event %r", name)
+           else: e.virtual = False
+       else:
+           self.events[name] = PipelineEvent(name, virtual=False)
+           self.ready.add(name)
 
-       triple = (pri, desc, fun)
-       pos = bisect.bisect(self.pipe, triple)
-       if pos <= self.index:
-           raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \
-               % (self.name, self.index, pri)
-       self.pipe.insert(pos, triple)
+       if after:
+           for d in after:
+               self.add_dependency(d, name)
+       if before:
+           for d in before:
+               self.add_dependency(name, d)
 
+       return self[name]
+           
+    def add_dependency(first, then):
+       if first not in self.events:
+           self.events[first] = PipelineEvent(first, virtual=True)
+           self.ready.add(first)
+       if then not in self.events:
+           self.events[then] = PipelineEvent(then, virtual=True)
+           self.ready.add(then)
+       if then in self.finished:
+           raise PipeError("Cannot alter the past")
+       
+       self[first].before.add(then)
+       self[then].after.add(first)
+       self[then].dependencies.add(first)
+       if (first not in self.finished) and (then in self.ready):
+           self.ready.remove(then)
+      
+    def run_next(self, *args, **kwargs):
+       if not self.ready:
+           if len(self.finished) < len(self.events):
+               raise PipeError("Not all events processed -- cyclic dependency found")
+           raise PipeError("All events processed")
+       ename = self.ready.pop()
+       log.debug("[%s] processing event %r", self.name, ename)
+       e = self[ename]
+       if e.virtual:
+           raise PipeError()
+       for d in e.before:
+           self[
     def dump(self, prefix=""):
        """
        Debugging dump of the pipe.
@@ -53,12 +104,7 @@ class Pipeline:
        return l
 
     def run(self, *args, **kwargs):
-       self.index = 0
-       min_pri = -1
-       while self.index < len(self.pipe):
-           (pri,name,fun) = self.pipe[self.index]
-           if pri >= min_pri:
-               self.e.log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
+       log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
                try:
                    fun(*args, **kwargs)
                except MoeAbortPipeline, err: