]> mj.ucw.cz Git - moe.git/commitdiff
Merge branch 'python-newpipe' into python
authorTomas Gavenciak <gavento@ucw.cz>
Wed, 12 Jan 2011 13:54:26 +0000 (14:54 +0100)
committerTomas Gavenciak <gavento@ucw.cz>
Wed, 12 Jan 2011 13:57:40 +0000 (14:57 +0100)
Attempt at order-defined pipelines

1  2 
t/moe/pipeline-deps.py

index 0000000000000000000000000000000000000000,0000000000000000000000000000000000000000..9b41691e490d7920257d0f1cffb9fed2c945d486
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,102 @@@
++#!/usr/bin/env python
++
++import bisect
++import moe
++
++class PipeError(moe.MoeError):
++    """Failure of the Moe Pipeline."""
++
++
++class PipelineEvent(object):
++    def __init__(self, name, virtual=False):
++      self.name = name
++      self.virtual = virtual
++      self.hooks = [] # tuples ('name', callable)
++      self.after = set() # List of event names this one comes after
++      self.before = set() # List of event names coming before this one
++      # Per run: 
++      self.dependencies = set() # unfinished events this one comes 'after'
++
++    def add_hook(self, hook, name=None):
++      "Add `fun` as a callable hook `name` is the 'module.function' name, guessed by default."
++      assert(callable(fun))
++      if self.virtual:
++          raise PipeError("Cannot add hook to uninitialised event %r", self.name)
++      if name is None:
++          name = fun.__module__ + '.' + fun.__name__
++      self.hooks.append( (name, hook) )
++
++    def run(self, *args, **kwargs):
++      if not self.hooks:
++          log.debug('Passed virtual event %s', self.name)
++      log.info('Running pipeline event %s', self.name)
++      for n,h in self.hooks:
++          log.debug('[%s] Running hook %s', self.name, n)
++          h( *args, **kwargs)
++      
++    
++class Pipeline:
++    """Moe pipeline."""
++
++    def __init__(self, name, skip_to = 'cleanup'):
++      self.name = name
++        self.events = {}
++      self.skip_to = skip_to
++      # per-run:
++      self.ready = set()
++      self.finished = set()
++
++    def add_event(self, name, after=None, before=None):
++      if name in self.events:
++          e = self[name]
++          if not e.virtual:
++              raise PipeError("Reinserting event %r", name)
++          else: e.virtual = False
++      else:
++          self.events[name] = PipelineEvent(name, virtual=False)
++          self.ready.add(name)
++
++      if after:
++          for d in after:
++              self.add_dependency(d, name)
++      if before:
++          for d in before:
++              self.add_dependency(name, d)
++
++      return self[name]
++          
++    def add_dependency(first, then):
++      if first not in self.events:
++          self.events[first] = PipelineEvent(first, virtual=True)
++          self.ready.add(first)
++      if then not in self.events:
++          self.events[then] = PipelineEvent(then, virtual=True)
++          self.ready.add(then)
++      if then in self.finished:
++          raise PipeError("Cannot alter the past")
++      
++      self[first].before.add(then)
++      self[then].after.add(first)
++      self[then].dependencies.add(first)
++      if (first not in self.finished) and (then in self.ready):
++          self.ready.remove(then)
++      
++    def get_next(self):
++      "Return next event (or None if exhausted)"
++      if not self.ready:
++          if len(self.finished) < len(self.events):
++              raise PipeError("Not all events processed -- cyclic dependency found")
++          return None # All events processed
++      ename = self.ready.pop()
++      log.debug("[%s] processing event %r", self.name, ename)
++      e = self[ename]
++      if e.virtual:
++          raise PipeError("Schedule error: running uninitialised event")
++      for d in e.before:
++          self[d].dependencies.remove(ename)
++      return e
++
++    def run(self, *args, **kwargs):
++      while e = self.get_next():
++          log.debug("Pipeline %r running %r" % (self.name, e.name))
++          e.run(*args, **kwargs)