import moe.log
import moe.status
import moe.pipeline
-import moe.batch
import moe.util
import os.path
import shutil
"""
def __init__(self):
- self.log = moe.log.Logers()
+ self.log = moe.log.Loggers()
self.config = moe.config.ConfigTree()
- self.main_pipe = moe.pipeline.MoePipeline("main")
- self.test_pipe = moe.pipeline.MoePipeline("test")
- self.stat = moe.status.MoeStatus()
+ self.main_pipe = moe.pipeline.Pipeline(self, "main")
+ self.test_pipe = moe.pipeline.Pipeline(self, "test")
+ self.status = moe.status.Status()
def __getitem__(self, key):
return self.config[key]
- def debug_dumpconf(self):
- if self.log.level <= 5:
- self.log.ddebug('****** Config dump: ******'))
- self.log.ddebug(self.config.dump('**** '))
- self.log.ddebug('**************************'))
- def debug_dumppipe(self, pipe):
- if self.log.level <= 5:
- self.log.ddebug('****** Pipeline %r dump: ******'%pipe,name))
- self.log.ddebug(pipe.dump(prefix='**** '))
- self.log.ddebug('**************************'))
-
def init(self, overrides=[]):
+ "Initializes most part of Eval before running the pipeline. See the timeline for details."
self.log.info("Initializing ...")
# set basic builtins
- self.config.parse('HOME=\'%s\'' % os.getcwd(), source="<builtins>", level=0)
- self.config.parse('CONFIG="{HOME}/config"', source="<builtins>", level=0)
- self.config.parse('LOG="{HOME}/log"', source="<builtins>", level=0)
- self.config.parse('DEBUG_LEVEL="0"', source="<builtins>", level=0)
- self.config.parse('VERBOSE=""', source="<builtins>", level=0)
- self.config.parse('TDIR="{HOME}/test"', source="<temp-builtins>", level=0) # -> config
- self.config.parse('USER_LOG="{TDIR}/log"', source="<temp-builtins>", level=0) # -> config
+ self.config.parse('HOME = \'%s\'' % os.getcwd(), source="<builtins>", level=0)
+ self.config.parse('CONFIG = "{HOME}/config"', source="<builtins>", level=0)
+ self.config.parse('LOG = "{HOME}/log"', source="<builtins>", level=0)
+ self.config.parse('DEBUG_LEVEL = "0"', source="<builtins>", level=0)
+ self.config.parse('VERBOSE = ""', source="<builtins>", level=0)
+ self.config.parse('EXTENSIONS = ""', source="<builtins>", level=0)
+
# apply overrides
for ov in overrides:
- self.config.parse(ov, source="<overrides>", level=100)
+ self.config.parse(ov, source="<overrides>", level=100)
# load config file
self.config.fix('CONFIG')
- with open(self['CONFIG'], 'r') as f:
- self.config.parse(f, source=self['CONFIG'], level=30)
+ self.config.parse_file(self['CONFIG'], level=30)
# fix variables
- self.config.fix(['LOG', 'VERBOSE', 'HOME', 'DEBUG_LEVEL', 'TDIR'])
+ self.config.fix(['LOG', 'USER_LOG', 'VERBOSE', 'HOME', 'DEBUG_LEVEL', 'TDIR'])
# start logging
- self.log.open_eval_log(self['LOG'], self['DEBUG_LEVEL'], redirect_fds = True)
+ self.log.open_eval_log(self['LOG'], int(self['DEBUG_LEVEL']), redirect_fds = True)
self.log.open_user_log(self['USER_LOG'])
self.debug_dump_config()
- # init and check TDIR
- self.debug('Cleaning TDIR: %s'%self['TDIR'])
- self.init_TDIR()
-
# insert hooks into main pipeline
- # TODO
+ self.main_pipe.insert(5, hook_init_dirs, "Initialize working directories")
+ self.main_pipe.insert(15, hook_load_task_config, "Load task config")
+ self.main_pipe.insert(20, hook_init_tasktype, "Load tasktype module")
+ self.main_pipe.insert(90, hook_write_metadata, "Write final metadata file")
+
+ # ininialize extensions (let them insert hooks)
+ self.config.fix('EXTENSIONS')
+ exts = self['EXTENSIONS'].split()
+ for e in exts:
+ if not e:
+ raise MoeError, "Invalid extension name: %r" % e
+ self.log.debug("Loading extension %s", e)
+ try:
+ mod = moe.util.load_module('moe.exts.' + e)
+ except ImportError:
+ self.log.exception()
+ raise MoeError, 'Unknown extension: %r' % e
+ mod.init(self)
+
+ def run(self):
+ "Run the main pipeline."
+ self.debug_dump_pipe(self.main_pipe)
+ self.log.debug('Running main pipeline')
+ self.main_pipe.run(e=self)
+
+ def debug_dump_config(self):
+ "Dumps config at level DDEBUG (only compiles the dump if main level is low enough)."
+ if self.log.level <= 5:
+ self.log.ddebug(' ****** Config dump: ******')
+ self.log.ddebug('\n'.join(self.config.dump(' * ')))
+ self.log.ddebug(' **************************')
+
+ def debug_dump_pipe(self, pipe):
+ "Dumps pipeline `pipe` at level DDEBUG (only compiles the dump if main level low enough)."
+ if self.log.level <= 5:
+ self.log.ddebug(' ****** Pipeline %r dump: ******'%pipe.name)
+ self.log.ddebug('\n'.join(pipe.dump(prefix=' * ')))
+ self.log.ddebug(' **************************')
+
+ def debug_dump_status(self):
+ "Dumps status metadata at level DDEBUG (only compiles the dump if main level low enough)."
+ if self.log.level <= 5:
+ self.log.ddebug(' ****** Status dump: ******')
+ self.log.ddebug('\n'.join(self.status.dump(prefix=' * ')).rstrip())
+ self.log.ddebug(' **************************')
+
+def hook_init_dirs(e):
+ """(mainline at time 5) Create and check directories, fix directory variables.
+ .. note:: Currently only TDIR."""
+ e.config.fix('TDIR')
+ tdir = e['TDIR']
+ if os.path.isdir(tdir):
+ shutil.rmtree(tdir)
+ moe.util.mkdir_tree(tdir)
+
+def hook_load_task_config(e):
+ """(mainline at time 15) Load `TASK_CONFIG` and check `PDIR`, fixes `TASK`, `PDIR`, `TASK_CONFIG`."""
+ e.config.fix(['TASK', 'PDIR', 'TASK_CONFIG'])
+ e.log.debug('Loading task config %s', e['TASK_CONFIG'])
+ if not os.path.isdir(e['PDIR']):
+ raise moe.MoeError, "No such task %s in %s" % (e['TASK'], e['PDIR'])
+ e.config.parse_file(e['TASK_CONFIG'], level=50)
+ e.debug_dump_config()
+
+ e.status["task"] = e['TASK'] # Metadata
+
+def hook_init_tasktype(e):
+ """(mainline at time 20) Fix `TASK_TYPE`, initialize task type module."""
+
+ e.config.fix('TASK_TYPE')
+ task_type = e['TASK_TYPE']
+ e.log.debug('Loading module for TASK_TYPE: %r', task_type)
+ if not task_type:
+ raise MoeError, "Invalid TASK_TYPE: %r" % e
+ try:
+ e.tasktype_module = moe.util.load_module('moe.tasktypes.' + task_type)
+ except ImportError:
+ e.log.exception()
+ raise MoeError, 'Unknown TASK_TYPE: %r' % task_type
+ e.tasktype_module.init(e)
+
+def hook_write_metadata(e):
+ """(mainline at time 90) Write status metadata into file `STATUS_FILE`."""
+ e.debug_dump_status()
+ e.log.debug('Writing status file %s', e['STATUS_FILE'])
+ with open(e['STATUS_FILE'], 'w') as f:
+ e.status.write(f)
- # insert custom hooks
- self.conf.fix('HOOKS')
- self.main_pipe.configure(self['HOOKS'])
- # go!
- self.debug_dump_pipe(self.main_pipe)
- self.debug('Running main pipeline')
- self.main_pipe.run(self)
-
- #self.init_task()
- #moe.box.init(self)
-
- def init_TDIR(self):
- test = self['TDIR']
- if os.path.isdir(test):
- shutil.rmtree(test)
- try: #TODO: Remove
- moe.util.mkdir_tree(test)
- except OSError, err:
- raise moe.MoeError, "Cannot create %s: %s" % (test, err.strerror)
-
-
-#TODO ...
- def init_task(self):
- task = self['TASK']
- task_dir = self['PDIR']
- if not os.path.isdir(task_dir):
- raise moe.MoeError, "No such task %s" % task
-
- task_cfg = moe.config.MoeConfig(name = os.path.join(task_dir, "config"), type='task')
- self.cfgs.push(task_cfg)
- self.log_config(3, "after loading the task")
-
- self.stat["task"] = task
-
- type = self['TASK_TYPE']
- if type == "batch" or type == "interactive":
- moe.batch.prepare_pipe(self)
- elif type == "opendata":
- raise moe.MoeError, "Opendata tasks not implemented yet"
- else:
- raise moe.MoeError, "Unknown task type " + type