if (first not in self.finished) and (then in self.ready):
self.ready.remove(then)
- def run_next(self, *args, **kwargs):
+ def get_next(self):
+ "Return next event (or None if exhausted)"
if not self.ready:
if len(self.finished) < len(self.events):
raise PipeError("Not all events processed -- cyclic dependency found")
- raise PipeError("All events processed")
+ return None # All events processed
ename = self.ready.pop()
log.debug("[%s] processing event %r", self.name, ename)
e = self[ename]
if e.virtual:
- raise PipeError()
+ raise PipeError("Schedule error: running uninitialised event")
for d in e.before:
- self[
- def dump(self, prefix=""):
- """
- Debugging dump of the pipe.
- Returns a list of lines.
- """
- l=["%s >>> Pipeline %s" % (prefix, self.name)]
- for pri, name, fun in self.pipe:
- l.append("%s% 3d %s" % (prefix, pri, name))
- return l
+ self[d].dependencies.remove(ename)
+ return e
def run(self, *args, **kwargs):
- log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
- try:
- fun(*args, **kwargs)
- except MoeAbortPipeline, err:
- min_pri = self.skip_to
- else:
- self.e.log.debug("Pipeline %r:d skipping: %s" % (self.name, pri, name))
- self.index += 1
- self.index = -1
- self.e.log.debug("Pipeline %r finished" % (self.name))
-
+ while e = self.get_next():
+ log.debug("Pipeline %r running %r" % (self.name, e.name))
+ e.run(*args, **kwargs)