]> mj.ucw.cz Git - moe.git/blobdiff - t/moe/eval.py
Pipeline functions now get keyword arguments
[moe.git] / t / moe / eval.py
index 32981440ab80fb245acf7197c7f60d9c21fce0b1..f15f9ee4a11c5713bea0342b6dccbd438f759032 100644 (file)
@@ -2,8 +2,9 @@
 
 import moe
 import moe.config
+import moe.box
 import moe.log
-import moe.stat
+import moe.status
 import moe.pipeline
 import moe.batch
 import moe.util
@@ -11,79 +12,126 @@ import os.path
 import shutil
 
 class Eval:
+    """
+    """
 
     def __init__(self):
-       self.log = moe.log.default
-       self.cfgs = moe.config.MoeConfigStack()
-       self.builtins = moe.config.MoeConfig(type="builtins")
-       self.cfgs.push(self.builtins)
+       self.log = moe.log.Loggers()
+       self.config = moe.config.ConfigTree()
        self.main_pipe = moe.pipeline.MoePipeline("main")
-       self.stat = moe.stat.MoeStatus()
-       pass
-
-    def init(self, overrides=None):
-       self.log.progress("Initializing... ")
-       self.init_global(overrides)
-       self.init_test()
-       self.init_logs()
-       self.init_task()
-       self.log.progress("OK\n")
-
-    def init_global(self, overrides):
-       main_cfg = moe.config.MoeConfig(name = os.path.join(self.cfgs['HOME'], "config"), type="main")
-       self.cfgs.push(main_cfg)
-       if overrides:
-           self.cfgs.push(overrides)
-
-    def init_test(self):
-       test = self.cfgs['TEST_DIR']
-       if os.path.isdir(test):
-           shutil.rmtree(test)
-       try:
-           moe.util.mkdir_tree(test)   
-       except OSError, e:
-           raise moe.MoeErr, "Cannot create %s: %s" % (test, e.strerror)
-
-    def init_logs(self):
-       self.log = moe.log.MoeLog()
-       if self.cfgs["V"]:
-           self.log.verbosity = int(self.cfgs["V"])
-       self.log.log_file = open(os.path.join(self.cfgs["TEST_DIR"], "log"), "w")
-       self.default_log = self.log
-       moe.log.default = self.log
-       self.log_config(3, "before loading the task")
-
-    def init_task(self):
-       task = self.cfgs['TASK']
-       task_dir = self.cfgs['TASK_DIR']
-       if not os.path.isdir(task_dir):
-           raise moe.MoeErr, "No such task %s" % task
-
-       task_cfg = moe.config.MoeConfig(name = os.path.join(task_dir, "config"), type='task')
-       self.cfgs.push(task_cfg)
-       self.log_config(3, "after loading the task")
-
-       self.stat["task"] = task
-
-       type = self.cfgs['TASK_TYPE']
-       if type == "batch" or type == "interactive":
-           moe.batch.prepare_pipe(self)
-       elif type == "opendata":
-           raise moe.MoeErr, "Opendata tasks not implemented yet"
-       else:
-           raise moe.MoeErr, "Unknown task type " + type
+       self.test_pipe = moe.pipeline.MoePipeline("test")
+       self.status = moe.status.MoeStatus()
 
+    def __getitem__(self, key):
+       return self.config[key]
+
+    def init(self, overrides=[]):
+       "Initializes most part of Eval before running the pipeline. See the timeline for details."
+       self.log.info("Initializing ...")
+       
+       # set basic builtins
+       self.config.parse('HOME = \'%s\'' % os.getcwd(), source="<builtins>", level=0)
+       self.config.parse('CONFIG = "{HOME}/config"', source="<builtins>", level=0)
+       self.config.parse('LOG = "{HOME}/log"', source="<builtins>", level=0)
+       self.config.parse('DEBUG_LEVEL = "0"', source="<builtins>", level=0)
+       self.config.parse('VERBOSE = ""', source="<builtins>", level=0)
+       self.config.parse('EXTENSIONS = ""', source="<builtins>", level=0)
+       
+       # apply overrides
+       for ov in overrides:
+           self.config.parse(ov, source="<overrides>", level=100)
+       
+       # load config file
+       self.config.fix('CONFIG')
+       self.config.parse_file(self['CONFIG'], level=30)
+       # fix variables
+       self.config.fix(['LOG', 'USER_LOG', 'VERBOSE', 'HOME', 'DEBUG_LEVEL', 'TDIR'])
+       # start logging
+       self.log.open_eval_log(self['LOG'], int(self['DEBUG_LEVEL']), redirect_fds = True)
+       self.log.open_user_log(self['USER_LOG'])
+       self.debug_dump_config()
+
+       # insert hooks into main pipeline
+       self.main_pipe.insert(5, "Eval.hook_init_dirs", hook_init_dirs)
+       self.main_pipe.insert(15, "Eval.hook_load_task_config", hook_load_task_config)
+       self.main_pipe.insert(20, "Eval.hook_init_tasktype", hook_init_tasktype)
+       self.main_pipe.insert(90, "Eval.hook_write_metadata", hook_write_metadata)
+
+       # ininialize extensions (let them insert hooks) 
+       self.config.fix('EXTENSIONS')
+       exts = self['EXTENSIONS'].split()
+       for e in exts:
+           if not e:
+               raise MoeError, "Invalid extension name: %r" % e
+           self.log.debug("Loading extension %s", e)
+           try:
+               mod = moe.util.load_module('moe.exts.' + e)
+           except ImportError:
+               self.log.exception()
+               raise MoeError, 'Unknown extension: %r' % e
+           mod.init(self)
+       
     def run(self):
-       self.log_config(2, "for the task pipeline")
-       self.main_pipe.configure(self.cfgs["HOOKS"])
-       if self.log.verbosity >= 2:
-           self.main_pipe.dump(self.log.log_file, prefix="\t")
-       self.main_pipe.run(self)
-
-    def log_config(self, verb, msg):
-       if self.log.verbosity >= verb:
-           self.log.say("@@@ Configuration stack %s @@@\n" % msg)
-           self.cfgs.dump_defs(self.log.log_file, prefix="\t")
-       if self.log.verbosity >= 99:
-           self.log.say("@@@@@@ Resolved configuration %s @@@@@@\n" % msg)
-           self.cfgs.dump(self.log.log_file, prefix="\t")
+       "Run the main pipeline."
+       self.debug_dump_pipe(self.main_pipe)
+       self.log.debug('Running main pipeline')
+       self.main_pipe.run(e=self)
+
+    def debug_dump_config(self):
+       "Dumps config at level DDEBUG (only compiles the dump if main level is low enough)."
+       if self.log.level <= 5:
+           self.log.ddebug('****** Config dump: ******')
+           self.log.ddebug('\n'.join(self.config.dump('*  ')))
+           self.log.ddebug('**************************')
+
+    def debug_dump_pipe(self, pipe):
+       "Dumps pipeline `pipe` at level DDEBUG (only compiles the dump if main level low enough)."
+       if self.log.level <= 5:
+           self.log.ddebug('****** Pipeline %r dump: ******'%pipe.name)
+           self.log.ddebug(pipe.pipe)
+           self.log.ddebug(pipe.dump(prefix='*  '))
+           self.log.ddebug('**************************')
+
+def hook_init_dirs(e):
+    """(mainline at time 5) Create and check directories, fix directory variables.
+    .. note:: Currently only TDIR."""
+    e.config.fix('TDIR')
+    tdir = e['TDIR']
+    if os.path.isdir(tdir):
+       shutil.rmtree(tdir)
+    moe.util.mkdir_tree(tdir)
+
+def hook_load_task_config(e):
+    """(mainline at time 15) Load `TASK_CONFIG` and check `PDIR`, fixes `TASK`, `PDIR`, `TASK_CONFIG`."""
+    e.config.fix(['TASK', 'PDIR', 'TASK_CONFIG'])
+    e.log.debug('Loading task config %s', self['TASK_CONFIG'])
+    if not os.path.isdir(e['PDIR']):
+       raise moe.MoeError, "No such task %s in %s" % (e['TASK'], self['PDIR'])
+    e.config.parse_file(self['TASK_CONFIG'], level=50)
+    e.debug_dump_config()
+
+    e.status["task"] = task  # Metadata
+
+def hook_init_tasktype(e):
+    """(mainline at time 20) Fix `TASK_TYPE`, initialize task type module."""
+
+    e.config.fix('TASK_TYPE')
+    task_type = e['TASK_TYPE']
+    e.log.debug('Loading module for TASK_TYPE: %r', task_type)
+    if not task_type:
+       raise MoeError, "Invalid TASK_TYPE: %r" % e
+    try:
+       e.tasktype_module = utils.load_module('moe.tasktypes.' + task_type)
+    except ImportError:
+       e.log.exception()
+       raise MoeError, 'Unknown TASK_TYPE: %r' % task_type
+    mod.tasktype_module.init(e)
+
+def hook_write_metadata(e):
+    """(mainline at time 90) Write status metadata into file `STATUS_FILE`."""
+    e.log.debug('Writing status file %s', self['STATUS_FILE'])
+    e.status.write(self['STATUS_FILE'])
+    # TODO: dump to ddebug     
+
+
+