"""
import types, itertools, re, bisect
-import logging as log
-
+from logs import log
from moe import MoeError
+++ /dev/null
-#!/usr/bin/env python
-
-import re
-import sys
-import moe
-
-key_pattern = re.compile("^[A-Za-z0-9_-]+$")
-ref_pattern = re.compile("^[A-Za-z0-9_-]+")
-
-class MoeConfigInvalid(moe.MoeError):
- pass
-
-class MoeConfigEvalError(moe.MoeError):
- pass
-
-class MoeConfig:
- """Moe configuration file. Should be immutable once a part of a stack."""
-
- def __init__(self, file=None, name=None, type="<unnamed>"):
- self.vars = {}
- self.type = type
- if file is not None:
- self.load(file)
- elif name is not None:
- self.name = name
- try:
- file = open(name, "r")
- except IOError, err:
- raise MoeConfigInvalid, "Cannot open configuration file %s: %s" % (name, err.strerror)
- else:
- self.load(file)
-
- def set(self, k, v):
- self.vars[k] = [("s", v)]
-
- def parse_line(self, x):
- x = x.rstrip("\n").lstrip(" \t")
- if x=="" or x.startswith("#"):
- pass
- else:
- sep = x.find("=")
- if sep >= 0:
- k = x[:sep]
- v = x[sep+1:]
- if k.endswith("+"):
- k = k[:-1]
- if not self.vars.has_key(k):
- self.vars[k] = [("a","")];
- else:
- self.vars[k] += [("s"," ")]
- else:
- self.vars[k] = []
- if not key_pattern.match(k):
- raise MoeConfigInvalid, "Malformed name of configuration variable"
- if v.startswith("'"):
- v=v[1:]
- if not v.endswith("'"):
- raise MoeConfigInvalid, "Misquoted string"
- self.vars[k].append(("s", v[:-1]))
- elif v.startswith('"'):
- v=v[1:]
- if not v.endswith('"'):
- raise MoeConfigInvalid, "Misquoted string"
- self.parse_interpolated(self.vars[k], v[:-1])
- else:
- self.parse_interpolated(self.vars[k], v)
- else:
- raise MoeConfigInvalid, "Parse error"
-
- def load(self, file):
- lino = 0
- for x in file.readlines():
- lino += 1
- try:
- self.parse_line(x)
- except MoeConfigInvalid, x:
- msg = x.message + " at line " + str(lino)
- if hasattr(self, "name"):
- msg += " of " + self.name
- raise MoeConfigInvalid, msg
-
- def parse_interpolated(self, list, s):
- while s<>"":
- if s.startswith("$"):
- s = s[1:]
- if s.startswith("{"):
- p = s.find("}")
- if not p:
- raise MoeConfigInvalid, "Unbalanced braces"
- k, s = s[1:p], s[p+1:]
- if not key_pattern.match(k):
- raise MoeConfigInvalid, "Invalid variable name"
- else:
- m = ref_pattern.match(s)
- if m:
- k, s = s[:m.end()], s[m.end():]
- else:
- raise MoeConfigInvalid, "Invalid variable reference"
- list.append(("i", k))
- else:
- p = s.find("$")
- if p < 0:
- p = len(s)
- list.append(("s", s[:p]))
- s = s[p:]
-
- def dump(self, file=sys.stdout, prefix=""):
- for k,v in self.vars.items():
- file.write(prefix)
- file.write(k)
- if len(v) > 0 and v[0][0] == "a":
- file.write("+")
- v = v[1:]
- file.write("=")
- for t,w in v:
- if t == "s":
- file.write("'" + w + "'")
- elif t == "i":
- file.write('"$' + w + '"')
- file.write("\n")
-
-class MoeConfigStack:
- """Stack of configuration files."""
-
- def __init__(self, base=None):
- if base:
- self.stk = base.stk[:]
- else:
- self.stk = []
- self.in_progress = {}
-
- def push(self, cfg):
- self.stk.append(cfg)
-
- def __getitem__(self, k):
- if self.in_progress.has_key(k):
- raise MoeConfigEvalError, "Definition of $%s is recursive" % k;
- self.in_progress[k] = 1;
- v = self.do_get(k, len(self.stk)-1)
- del self.in_progress[k]
- return v
-
- def do_get(self, k, pos):
- while pos >= 0:
- cfg = self.stk[pos]
- if cfg.vars.has_key(k):
- new = cfg.vars[k]
- if len(new) > 0 and new[0][0] == "a":
- v = self.do_get(k, pos-1)
- if v != "" and not v.endswith(" "):
- v += " "
- else:
- v = ""
- for op,arg in new:
- if op == "s":
- v = v + arg
- elif op == "i":
- v = v + self[arg]
- return v
- pos -= 1
- return ""
-
- def keys(self):
- seen = {}
- for cfg in self.stk:
- for k in cfg.vars.keys():
- seen[k] = None
- return seen.keys()
-
- def dump(self, file=sys.stdout, prefix=""):
- for k in sorted(self.keys()):
- v = self[k]
- file.write("%s%s=%s\n" % (prefix,k,v))
-
- def dump_defs(self, file=sys.stdout, prefix=""):
- level = 0
- for cfg in self.stk:
- level += 1
- file.write("%s(level %d: %s)\n" % (prefix,level,cfg.type))
- cfg.dump(file, prefix + "\t")
- file.write("%s(end)\n" % prefix)
-
- def apply_overrides(self, prefix):
- newstk = []
- for cfg in self.stk:
- over = MoeConfig(type = cfg.type + '-overrides')
- changed = False
- for k in cfg.vars.keys():
- if k.startswith(prefix):
- over.vars[k[len(prefix):]] = cfg.vars[k]
- changed = True
- if changed:
- clean = MoeConfig(type = cfg.type)
- for k in cfg.vars.keys():
- if not k.startswith(prefix):
- clean.vars[k] = cfg.vars[k]
- newstk.append(clean)
- newstk.append(over)
- else:
- newstk.append(cfg)
- self.stk = newstk
-
-def parse_overrides(argv):
- cfg = None
- argv0 = argv.pop(0)
- while len(argv) > 0 and argv[0].find("=") >= 0:
- if cfg is None:
- cfg = MoeConfig(type='cmdline')
- cfg.parse_line(argv.pop(0))
- argv.insert(0, argv0)
- return cfg
.. note:: Formula can contain additional/unnecessary parentheses
"""
-import re, types, itertools, logging as log
+import re, types, itertools
import traceback
-
+from logs import log
import moe.config as cf
import moe.config as cf
from moe.config_parser import *
-import logging as log
import unittest
import tempfile
#!/usr/bin/env python
+import os.path
+import shutil
+
import moe
import moe.config
import moe.box
-import moe.log
import moe.status
import moe.pipeline
import moe.util
-import os.path
-import shutil
+import moe.logs
+from moe.logs import *
+
class Eval:
"""
"""
def __init__(self):
- self.log = moe.log.Loggers()
self.config = moe.config.ConfigTree()
- self.main_pipe = moe.pipeline.Pipeline(self, "main")
- self.test_pipe = moe.pipeline.Pipeline(self, "test")
+ self.main_pipe = moe.pipeline.Pipeline("main")
+ self.test_pipe = moe.pipeline.Pipeline("test")
self.status = moe.status.Status()
def __getitem__(self, key):
def init(self, overrides=[]):
"Initializes most part of Eval before running the pipeline. See the timeline for details."
- self.log.info("Initializing ...")
+ log.info("Initializing Eval ...")
# set basic builtins
self.config.parse('HOME = \'%s\'' % os.getcwd(), source="<builtins>", level=0)
# fix variables
self.config.fix(['LOG', 'USER_LOG', 'VERBOSE', 'HOME', 'DEBUG_LEVEL', 'TDIR'])
# start logging
- self.log.open_eval_log(self['LOG'], int(self['DEBUG_LEVEL']), redirect_fds = True)
- self.log.open_user_log(self['USER_LOG'])
+ moe.logs.open_eval_log(self['LOG'], level=int(self['DEBUG_LEVEL']), redirect_fds = True)
+ moe.logs.open_user_log(self['USER_LOG'])
self.debug_dump_config()
# insert hooks into main pipeline
- self.main_pipe.insert(5, hook_init_dirs, "Initialize working directories")
- self.main_pipe.insert(15, hook_load_task_config, "Load task config")
- self.main_pipe.insert(20, hook_init_tasktype, "Load tasktype module")
- self.main_pipe.insert(90, hook_write_metadata, "Write final metadata file")
+ self.main_pipe.insert(50, hook_init_dirs, "Initialize working directories")
+ self.main_pipe.insert(100, hook_load_task_config, "Load task config")
+ self.main_pipe.insert(200, hook_init_tasktype, "Load tasktype module")
+ self.main_pipe.insert(900, hook_write_metadata, "Write final metadata file")
# ininialize extensions (let them insert hooks)
self.config.fix('EXTENSIONS')
exts = self['EXTENSIONS'].split()
- for e in exts:
- if not e:
- raise MoeError, "Invalid extension name: %r" % e
- self.log.debug("Loading extension %s", e)
+ for ex in exts:
+ if not ex:
+ raise MoeError, "Invalid extension name: %r" % ex
+ log.debug("Loading extension %s", ex)
try:
- mod = moe.util.load_module('moe.exts.' + e)
+ mod = moe.util.load_module('moe.exts.' + ex)
except ImportError:
- self.log.exception()
- raise MoeError, 'Unknown extension: %r' % e
+ log.exception("Error importing exception %r", ex)
+ raise MoeError('Unknown extension: %r', ex)
mod.init(self)
def run(self):
"Run the main pipeline."
self.debug_dump_pipe(self.main_pipe)
- self.log.debug('Running main pipeline')
+ log.debug('Running main pipeline')
self.main_pipe.run(e=self)
def debug_dump_config(self):
"Dumps config at level DDEBUG (only compiles the dump if main level is low enough)."
- if self.log.level <= 5:
- self.log.ddebug(' ****** Config dump: ******')
- self.log.ddebug('\n'.join(self.config.dump(' * ')))
- self.log.ddebug(' **************************')
+ if log.level <= 5:
+ log.debug(' ****** Config dump: ******')
+ log.debug('\n'.join(self.config.dump(' * ')))
+ log.debug(' **************************')
def debug_dump_pipe(self, pipe):
"Dumps pipeline `pipe` at level DDEBUG (only compiles the dump if main level low enough)."
- if self.log.level <= 5:
- self.log.ddebug(' ****** Pipeline %r dump: ******'%pipe.name)
- self.log.ddebug('\n'.join(pipe.dump(prefix=' * ')))
- self.log.ddebug(' **************************')
+ if log.level <= 5:
+ log.debug(' ****** Pipeline %r dump: ******'%pipe.name)
+ log.debug('\n'.join(pipe.dump(prefix=' * ')))
+ log.debug(' **************************')
def debug_dump_status(self):
"Dumps status metadata at level DDEBUG (only compiles the dump if main level low enough)."
- if self.log.level <= 5:
- self.log.ddebug(' ****** Status dump: ******')
- self.log.ddebug('\n'.join(self.status.dump(prefix=' * ')).rstrip())
- self.log.ddebug(' **************************')
+ if log.level <= 5:
+ log.ddebug(' ****** Status dump: ******')
+ log.ddebug('\n'.join(self.status.dump(prefix=' * ')).rstrip())
+ log.ddebug(' **************************')
def hook_init_dirs(e):
"""(mainline at time 5) Create and check directories, fix directory variables.
def hook_load_task_config(e):
"""(mainline at time 15) Load `TASK_CONFIG` and check `PDIR`, fixes `TASK`, `PDIR`, `TASK_CONFIG`."""
e.config.fix(['TASK', 'PDIR', 'TASK_CONFIG'])
- e.log.debug('Loading task config %s', e['TASK_CONFIG'])
+ log.debug('Loading task config %s', e['TASK_CONFIG'])
if not os.path.isdir(e['PDIR']):
raise moe.MoeError, "No such task %s in %s" % (e['TASK'], e['PDIR'])
e.config.parse_file(e['TASK_CONFIG'], level=50)
e.config.fix('TASK_TYPE')
task_type = e['TASK_TYPE']
- e.log.debug('Loading module for TASK_TYPE: %r', task_type)
+ log.debug('Loading module for TASK_TYPE: %r', task_type)
if not task_type:
raise MoeError, "Invalid TASK_TYPE: %r" % e
try:
e.tasktype_module = moe.util.load_module('moe.tasktypes.' + task_type)
except ImportError:
- e.log.exception()
+ log.exception()
raise MoeError, 'Unknown TASK_TYPE: %r' % task_type
e.tasktype_module.init(e)
def hook_write_metadata(e):
"""(mainline at time 90) Write status metadata into file `STATUS_FILE`."""
e.debug_dump_status()
- e.log.debug('Writing status file %s', e['STATUS_FILE'])
+ log.debug('Writing status file %s', e['STATUS_FILE'])
with open(e['STATUS_FILE'], 'w') as f:
e.status.write(f)
Adds several info-printing hooks to both pipelines.
"""
+from moe.logs import log, userlog
+
def init(e):
def hook_m_0(e):
- e.log.info('Hey! It\'s me, the dummy extension in your main pipeline! (at time 0)')
+ log.info('Hey! It\'s me, the dummy extension in your main pipeline! (at time 0)')
e.main_pipe.insert(0, hook_m_0)
def hook_m_79(e):
- e.log.info('Me, the dummy extension, requies no cleanup! (at time 79)')
+ log.info('Me, the dummy extension, requies no cleanup! (at time 79)')
- e.main_pipe.insert(79, hook_m_79)
+ e.main_pipe.insert(790, hook_m_79)
def hook_t_42(e):
t = 'It\'s test %s and the dummy extension did nothing! (at time 42)' % e['TEST']
- e.log.info(t)
- e.log.user.info(t)
+ log.info(t)
+ userlog.info(t)
- e.test_pipe.insert(42, hook_t_42)
+ e.test_pipe.insert(420, hook_t_42)
+++ /dev/null
-#!/usr/bin/env python
-
-"""
-
-`Loggers` is a collection of logggers for :class:`~moe.eval.Eval`, initializing 4 subloggers (see class description).
-
-Use as `e.log.debug(...)` for main log that goes both to `test` and `main` log,
-`e.log.test.debug(...)` for individual logs.
-
-.. :data:: DDEBUG = 5
- Very verbose debugging level
-
-"""
-
-
-import sys, os
-import logging
-from logging import Logger, StreamHandler, Formatter
-
-logging.addLevelName(5,'DDEBUG')
-
-
-class Loggers(Logger):
- """Defines several logs:
-
- `self`
- root log, which sends messages to `eval` and `test` logs
- `self.eval`
- main log, initially a duplicated fd 2 (stderr)
- `self.user`
- public progress log, initially a duplicated fd 1 (stdout)
- `self.test`
- per-test log, initially no handler, to be directed to file like `{TEST}.log`
-
- .. note:: Currently, the logs and fd's get never closed.
- .. warning:: `Loggers.open_eval_log` changes global fd's 1 and 2 by default.
- """
- def __init__(self):
- "Initialize loggers as described in class description."
- Logger.__init__(self, '')
- # Duplicate the fd's
- self.orig_stdout_fd = os.dup(1)
- self.orig_stdout_file = os.fdopen(self.orig_stdout_fd, 'w', 0)
- self.orig_stderr_fd = os.dup(2)
- self.orig_stderr_file = os.fdopen(self.orig_stderr_fd, 'w', 0)
- # Eval main logger
- self.eval = Logger('eval')
- self.eval.addHandler(StreamHandler(self.orig_stderr_file))
- # per-test logger
- self.test = Logger('test')
- self.test_handler = None
- self.test_file = None
- # user progress logger
- self.user = Logger('user')
- self.user.addHandler(StreamHandler(self.orig_stdout_file))
-
- self.addHandler(self.test)
- self.addHandler(self.eval)
- self.debug('Logging initialized.')
-
- def open_user_log(self, filename, level=logging.INFO):
- """Open user (progress) logfile. Leaves logging to stdout active."""
- h = StreamHandler(open(filename, 'w', 0))
- h.setFormatter(Formatter('%(message)s'))
- self.user.setLevel(level)
- self.user.addHandler(h)
- self.user.debug('Logging started')
-
- def open_eval_log(self, filename, level, redirect_fds = True):
- """Open main logfile.
- Leaves logging to stderr active. If told to, redirects fd's 1 and 2 to this file.
- Sets level of both `self.eval` and `self`."""
- self.eval_file = open(filename, 'w', 0)
- self.eval_handler = StreamHandler(self.eval_file)
- self.eval_handler.setFormatter(Formatter('%(asctime)s [%(levelno)s] %(message)s'))
- self.eval.addHandler(self.eval_handler)
- if redirect_fds:
- os.dup2(self.eval_file.fileno(), 1)
- os.dup2(self.eval_file.fileno(), 2)
- self.eval.setLevel(level)
- self.setLevel(level)
- self.eval.debug('Logging started')
-
-
- def open_test_log(self, filename, level):
- """Open per-test log file, like "{TEST}.log". Also set handler level to `level`."""
- self.debug('Opening per-test log %s' % filename)
- if self.test_handler:
- self.close_test_log()
- self.test_file = open(filename, 'w')
- self.test_handler = StreamHandler(self.test_file)
- self.test.addHandler(self.test_handler)
- self.test.setLevel(level)
- self.test_handler.setFormatter(Formatter('%(asctime)s [%(levelno)s] %(message)s'))
- self.test.debug('Logging started')
-
- def close_test_log(self):
- """Close per-test logfile, leaving only the null handler."""
- if self.test_handler:
- self.test.info('Closing test logfile')
- self.test.removeHandler(self.test_handler)
- self.test_handler = None
- self.test_file.close()
-
- def ddebug(self, msg, *args, **kwargs):
- """Log with priority 5 (normal DEBUG is 10)"""
- self.log(5, msg, *args, **kwargs)
--- /dev/null
+"""
+
+`Loggers` is a collection of logggers for :class:`~moe.eval.Eval`, initializing 4 subloggers (see class description).
+
+Use as `e.log.debug(...)` for main log that goes both to `test` and `main` log,
+`e.log.test.debug(...)` for individual logs.
+
+.. :data:: DDEBUG = 5
+ Very verbose debugging level
+
+Defines several logs:
+
+`log`
+ main log, initially a duplicated fd 2 (stderr)
+`userlog`
+ public progress log, initially a duplicated fd 1 (stdout)
+`testlog`
+ per-test log, initially no handler, to be directed to file like `{TEST}.log`
+
+.. note:: Currently, the logs and fd's get never closed.
+.. warning:: `Loggers.open_eval_log` changes global fd's 1 and 2 by default.
+"""
+
+
+# Global logs to be imported by every module
+
+__all__ = ['log', 'userlog', 'testlog', 'pipelog' ]
+
+log = None
+userlog = None
+testlog = None
+pipelog = None
+
+orig_stdout_fd = None
+orig_stdout_file = None
+orig_stderr_fd = None
+orig_stderr_file = None
+testlog_file = None
+
+import logging, sys, os
+from logging import Formatter, StreamHandler
+
+def __init__():
+ """Very basic loggers setup to stderr."""
+
+ global orig_stdout_fd, orig_stdout_file, orig_stderr_fd, orig_stderr_file
+ global log, userlog
+
+ orig_stdout_fd = os.dup(1)
+ orig_stdout_file = os.fdopen(orig_stdout_fd, 'w', 0)
+ orig_stderr_fd = os.dup(2)
+ orig_stderr_file = os.fdopen(orig_stderr_fd, 'w', 0)
+
+ log = logging.getLogger('mainlog')
+ log.addHandler(StreamHandler(orig_stderr_file))
+
+ userlog = logging.getLogger('userlog')
+ userlog.addHandler(StreamHandler(orig_stdout_file))
+
+ testlog = log
+ pipelog = log
+
+ logging.addLevelName(5,'DDEBUG')
+
+ log.debug('Logging initialized.')
+
+
+def open_user_log(filename, level=logging.INFO):
+ """Open user (progress) logfile. Leaves logging to stdout active."""
+ h = StreamHandler(open(filename, 'w', 0))
+ h.setFormatter(Formatter('%(message)s'))
+ userlog.setLevel(level)
+ userlog.addHandler(h)
+
+ log.debug('User logging to %r started', filename)
+
+def open_eval_log(filename, level=None, redirect_fds = True):
+ """Open main logfile.
+ Leaves logging to stderr active. If told to, redirects fd's 1 and 2 to this file."""
+
+ eval_file = open(filename, 'w', 0)
+ h = StreamHandler(eval_file)
+ h.setFormatter(Formatter('%(asctime)s [%(levelno)s] %(message)s'))
+ log.addHandler(h)
+ if (level): log.setLevel(level)
+ log.debug('Opened eval logfile %r')
+
+ if redirect_fds:
+ os.dup2(eval_file.fileno(), 1)
+ os.dup2(eval_file.fileno(), 2)
+ log.debug('Redirected fds 1,2 -> eval logfile')
+
+
+def open_test_log(filename, level):
+ """Open per-test log file, usually "{TEST}.log", set its level."""
+
+ log.debug('Opening per-test log %r', filename)
+
+ global testlog, testlog_file
+ assert testlog_file is None
+
+ testlog = logging.getLogger('testlog')
+ testlog.setLevel(level)
+ for h in testlog.handlers:
+ testlog.removeHandler(h)
+
+ testlog_file = open(filename, 'w')
+ h = StreamHandler(testlog_file)
+ h.setFormatter(Formatter('%(asctime)s [%(levelno)s] %(message)s'))
+ testlog.addHandler(h)
+
+ testlog.debug('Logging started')
+
+def close_test_log():
+ """Close per-test logfile, set `testlog` to `log`."""
+ global testlog_file
+ assert testlog_file is not None
+ testlog_file.close()
+ testlog_file = None
+ testlog = log
+
+# Initialise
+
+__init__()
--- /dev/null
+#!/usr/bin/env python
+
+import bisect
+import moe
+
+class PipeError(moe.MoeError):
+ """Failure of the Moe Pipeline."""
+
+
+class PipelineEvent(object):
+ def __init__(self, name, virtual=False):
+ self.name = name
+ self.virtual = virtual
+ self.hooks = [] # tuples ('name', callable)
+ self.after = set() # List of event names this one comes after
+ self.before = set() # List of event names coming before this one
+ # Per run:
+ self.dependencies = set() # unfinished events this one comes 'after'
+
+ def add_hook(self, hook, name=None):
+ "Add `fun` as a callable hook `name` is the 'module.function' name, guessed by default."
+ assert(callable(fun))
+ if self.virtual:
+ raise PipeError("Cannot add hook to uninitialised event %r", self.name)
+ if name is None:
+ name = fun.__module__ + '.' + fun.__name__
+ self.hooks.append( (name, hook) )
+
+ def run(self, *args, **kwargs):
+ if not self.hooks:
+ log.debug('Passed virtual event %s', self.name)
+ log.info('Running pipeline event %s', self.name)
+ for n,h in self.hooks:
+ log.debug('[%s] Running hook %s', self.name, n)
+ h( *args, **kwargs)
+
+
+class Pipeline:
+ """Moe pipeline."""
+
+ def __init__(self, name, skip_to = 'cleanup'):
+ self.name = name
+ self.events = {}
+ self.skip_to = skip_to
+ # per-run:
+ self.ready = set()
+ self.finished = set()
+
+ def add_event(self, name, after=None, before=None):
+ if name in self.events:
+ e = self[name]
+ if not e.virtual:
+ raise PipeError("Reinserting event %r", name)
+ else: e.virtual = False
+ else:
+ self.events[name] = PipelineEvent(name, virtual=False)
+ self.ready.add(name)
+
+ if after:
+ for d in after:
+ self.add_dependency(d, name)
+ if before:
+ for d in before:
+ self.add_dependency(name, d)
+
+ return self[name]
+
+ def add_dependency(first, then):
+ if first not in self.events:
+ self.events[first] = PipelineEvent(first, virtual=True)
+ self.ready.add(first)
+ if then not in self.events:
+ self.events[then] = PipelineEvent(then, virtual=True)
+ self.ready.add(then)
+ if then in self.finished:
+ raise PipeError("Cannot alter the past")
+
+ self[first].before.add(then)
+ self[then].after.add(first)
+ self[then].dependencies.add(first)
+ if (first not in self.finished) and (then in self.ready):
+ self.ready.remove(then)
+
+ def get_next(self):
+ "Return next event (or None if exhausted)"
+ if not self.ready:
+ if len(self.finished) < len(self.events):
+ raise PipeError("Not all events processed -- cyclic dependency found")
+ return None # All events processed
+ ename = self.ready.pop()
+ log.debug("[%s] processing event %r", self.name, ename)
+ e = self[ename]
+ if e.virtual:
+ raise PipeError("Schedule error: running uninitialised event")
+ for d in e.before:
+ self[d].dependencies.remove(ename)
+ return e
+
+ def run(self, *args, **kwargs):
+ while e = self.get_next():
+ log.debug("Pipeline %r running %r" % (self.name, e.name))
+ e.run(*args, **kwargs)
#!/usr/bin/env python
import bisect
+
import moe
+from moe.logs import *
-class PipeError(moe.MoeError):
- """Failure of the Moe Pipeline."""
+class MoePipeError(moe.MoeError):
+ """Failure of the MoePipeline."""
+class MoeAbortPipeline(Exception):
-class PipelineEvent(object):
- def __init__(self, name, virtual=False):
- self.name = name
- self.virtual = virtual
- self.hooks = [] # tuples ('name', callable)
- self.after = set() # List of event names this one comes after
- self.before = set() # List of event names coming before this one
- # Per run:
- self.dependencies = set() # unfinished events this one comes 'after'
+ def __init__(self, skip_to=None):
+ self.skip_to = skip_to
- def add_hook(self, hook, name=None):
- "Add `fun` as a callable hook `name` is the 'module.function' name, guessed by default."
- assert(callable(fun))
- if self.virtual:
- raise PipeError("Cannot add hook to uninitialised event %r", self.name)
- if name is None:
- name = fun.__module__ + '.' + fun.__name__
- self.hooks.append( (name, hook) )
- def run(self, *args, **kwargs):
- if not self.hooks:
- log.debug('Passed virtual event %s', self.name)
- log.info('Running pipeline event %s', self.name)
- for n,h in self.hooks:
- log.debug('[%s] Running hook %s', self.name, n)
- h( *args, **kwargs)
-
-
class Pipeline:
"""Moe pipeline."""
- def __init__(self, name, skip_to = 'cleanup'):
+ def __init__(self, name, skip_to = 700):
+ # e is Eval
+ self.pipe = []
+ self.index = -1
self.name = name
- self.events = {}
self.skip_to = skip_to
- # per-run:
- self.ready = set()
- self.finished = set()
- def add_event(self, name, after=None, before=None):
- if name in self.events:
- e = self[name]
- if not e.virtual:
- raise PipeError("Reinserting event %r", name)
- else: e.virtual = False
- else:
- self.events[name] = PipelineEvent(name, virtual=False)
- self.ready.add(name)
+ def insert(self, pri, fun, desc='', name=None):
+ """Insert callable `fun` to time `pri`,
+ `desc` is optional description,
+ `name` is the "module.function" name, guessed by default.
+ """
+ assert(isinstance(pri, int))
+ assert(callable(fun))
+ if name is None:
+ name = fun.__module__ + '.' + fun.__name__
+ if desc:
+ desc += ' '
+ desc += '[' + name + ']'
- if after:
- for d in after:
- self.add_dependency(d, name)
- if before:
- for d in before:
- self.add_dependency(name, d)
+ triple = (pri, desc, fun)
+ pos = bisect.bisect(self.pipe, triple)
+ if pos <= self.index:
+ raise MoePipeError, "Pipeline %r at time %d: Insert cannot alter the past (time %d)" \
+ % (self.name, self.index, pri)
+ self.pipe.insert(pos, triple)
- return self[name]
-
- def add_dependency(first, then):
- if first not in self.events:
- self.events[first] = PipelineEvent(first, virtual=True)
- self.ready.add(first)
- if then not in self.events:
- self.events[then] = PipelineEvent(then, virtual=True)
- self.ready.add(then)
- if then in self.finished:
- raise PipeError("Cannot alter the past")
-
- self[first].before.add(then)
- self[then].after.add(first)
- self[then].dependencies.add(first)
- if (first not in self.finished) and (then in self.ready):
- self.ready.remove(then)
-
- def get_next(self):
- "Return next event (or None if exhausted)"
- if not self.ready:
- if len(self.finished) < len(self.events):
- raise PipeError("Not all events processed -- cyclic dependency found")
- return None # All events processed
- ename = self.ready.pop()
- log.debug("[%s] processing event %r", self.name, ename)
- e = self[ename]
- if e.virtual:
- raise PipeError("Schedule error: running uninitialised event")
- for d in e.before:
- self[d].dependencies.remove(ename)
- return e
+ def dump(self, prefix=""):
+ """
+ Debugging dump of the pipe.
+ Returns a list of lines.
+ """
+ l=["%s >>> Pipeline %s" % (prefix, self.name)]
+ for pri, name, fun in self.pipe:
+ l.append("%s% 3d %s" % (prefix, pri, name))
+ return l
def run(self, *args, **kwargs):
- while e = self.get_next():
- log.debug("Pipeline %r running %r" % (self.name, e.name))
- e.run(*args, **kwargs)
+ self.index = 0
+ min_pri = -1
+ while self.index < len(self.pipe):
+ (pri,name,fun) = self.pipe[self.index]
+ if pri >= min_pri:
+ log.debug("Pipeline %r:%d running: %s" % (self.name, pri, name))
+ try:
+ fun(*args, **kwargs)
+ except MoeAbortPipeline, err:
+ min_pri = self.skip_to
+ else:
+ log.debug("Pipeline %r:d skipping: %s" % (self.name, pri, name))
+ self.index += 1
+ self.index = -1
+ log.debug("Pipeline %r finished" % (self.name))
+
import moe.config
import moe.testutils
+from moe.logs import log
def init(e):
def hook_m_50(e):
- e.log.info('Here should be compiling')
+ log.info('Here should be compiling')
- e.main_pipe.insert(50, hook_m_50)
+ e.main_pipe.insert(500, hook_m_50)
- e.main_pipe.insert(60, moe.testutils.hook_run_tests, desc='Run testcases')
+ e.main_pipe.insert(600, moe.testutils.hook_run_tests, desc='Run testcases')
def hook_t_30(e):
- e.log.info("Maybe we should do something? Nah...")
+ log.info("Maybe we should do something? Nah...")
- e.test_pipe.insert(30, hook_t_30)
+ e.test_pipe.insert(300, hook_t_30)
#!/usr/bin/env python
+import shutil
+import traceback
+import re
import os.path
+
import moe
import moe.config
import moe.eval
-import moe.log
-import shutil
-import traceback
-import re
+import moe.logs
+from moe.logs import *
+
# Allowed test names
testname_regexp = re.compile('\A[\w]+\Z')
.. todo :: Different log-level for per-test log?
"""
- e.log.info('Running test pipeline for each test')
+ log.info('Running test pipeline for each test')
e.config.fix('TESTS')
tests = e['TESTS'].split()
- e.log.debug('TESTS: %r', tests)
+ log.debug('TESTS: %r', tests)
for t in tests:
if not testname_regexp.match(t):
raise MoeError("Invalid test name %r", t)
- e.log.user.info('TEST %s ...' % t)
+ userlog.info('TEST %s ...' % t)
with e.config.parse("TEST='"+t+"'", level=70, source='<hook_run_tests>'):
try:
e.config.fix('TEST')
- e.log.open_test_log(e['TEST_LOG'], e.log.level)
- e.log.info(' *** Test case %s *** ' % t)
+ moe.logs.open_test_log(e['TEST_LOG'], log.level)
+ log.info(' *** Test case %s *** ' % t)
e.debug_dump_config()
e.debug_dump_pipe(e.test_pipe)
e.test_pipe.run(e=e)
except:
- e.log.test.exception()
+ (testlog or log).exception("Exception falls through hook_run_tests()")
raise
finally:
e.config.unfix('TEST')
- e.log.close_test_log()
-
+ moe.logs.close_test_log()
def configure_test(e, test):
test_cf = os.path.join(e["PDIR"], test + ".config")
#sys.path.append('.')
import moe.eval
+from moe.logs import *
import os
e = moe.eval.Eval()
try:
- e.init(['TASK = "sum"; CONTESTANT = "mj"; SOURCE = "som_sol.c"; VERBOSE = "2"'] + sys.argv[1:])
- e.log.debug("### Evaluating task %s of contestant %s ###\n\n" % (e['TASK'], e['CONTESTANT']))
+ e.init(['TASK = "sum"; CONTESTANT = "mj"; SOURCE = "som_sol.c"; VERBOSE = "2";'] + sys.argv[1:])
+ log.debug("### Evaluating task %s of contestant %s ###\n\n" % (e['TASK'], e['CONTESTANT']))
try:
e.run()
except moe.SolutionError, err: # Why are we catching this?
e.status["error"] = err
- e.log.user.error(err)
- e.log.exception()
+ userlog.error(err)
+ log.exception()
except:
- e.log.exception("Moe fatal error")
+ log.exception("Moe fatal error")
sys.exit(1)