- self.log_config(2, "for the task pipeline")
- self.main_pipe.configure(self.cfgs["HOOKS"])
- if self.log.verbosity >= 2:
- self.main_pipe.dump(self.log.log_file, prefix="\t")
- self.main_pipe.run(self)
-
- def log_config(self, verb, msg):
- if self.log.verbosity >= verb:
- self.log.say("@@@ Configuration stack %s @@@\n" % msg)
- self.cfgs.dump_defs(self.log.log_file, prefix="\t")
- if self.log.verbosity >= 99:
- self.log.say("@@@@@@ Resolved configuration %s @@@@@@\n" % msg)
- self.cfgs.dump(self.log.log_file, prefix="\t")
+ "Run the main pipeline."
+ self.debug_dump_pipe(self.main_pipe)
+ self.log.debug('Running main pipeline')
+ self.main_pipe.run(e=self)
+
+ def debug_dump_config(self):
+ "Dumps config at level DDEBUG (only compiles the dump if main level is low enough)."
+ if self.log.level <= 5:
+ self.log.ddebug(' ****** Config dump: ******')
+ self.log.ddebug('\n'.join(self.config.dump(' * ')))
+ self.log.ddebug(' **************************')
+
+ def debug_dump_pipe(self, pipe):
+ "Dumps pipeline `pipe` at level DDEBUG (only compiles the dump if main level low enough)."
+ if self.log.level <= 5:
+ self.log.ddebug(' ****** Pipeline %r dump: ******'%pipe.name)
+ self.log.ddebug('\n'.join(pipe.dump(prefix=' * ')))
+ self.log.ddebug(' **************************')
+
+ def debug_dump_status(self):
+ "Dumps status metadata at level DDEBUG (only compiles the dump if main level low enough)."
+ if self.log.level <= 5:
+ self.log.ddebug(' ****** Status dump: ******')
+ self.log.ddebug('\n'.join(self.status.dump(prefix=' * ')).rstrip())
+ self.log.ddebug(' **************************')
+
+def hook_init_dirs(e):
+ """(mainline at time 5) Create and check directories, fix directory variables.
+ .. note:: Currently only TDIR."""
+ e.config.fix('TDIR')
+ tdir = e['TDIR']
+ if os.path.isdir(tdir):
+ shutil.rmtree(tdir)
+ moe.util.mkdir_tree(tdir)
+
+def hook_load_task_config(e):
+ """(mainline at time 15) Load `TASK_CONFIG` and check `PDIR`, fixes `TASK`, `PDIR`, `TASK_CONFIG`."""
+ e.config.fix(['TASK', 'PDIR', 'TASK_CONFIG'])
+ e.log.debug('Loading task config %s', e['TASK_CONFIG'])
+ if not os.path.isdir(e['PDIR']):
+ raise moe.MoeError, "No such task %s in %s" % (e['TASK'], e['PDIR'])
+ e.config.parse_file(e['TASK_CONFIG'], level=50)
+ e.debug_dump_config()
+
+ e.status["task"] = e['TASK'] # Metadata
+
+def hook_init_tasktype(e):
+ """(mainline at time 20) Fix `TASK_TYPE`, initialize task type module."""
+
+ e.config.fix('TASK_TYPE')
+ task_type = e['TASK_TYPE']
+ e.log.debug('Loading module for TASK_TYPE: %r', task_type)
+ if not task_type:
+ raise MoeError, "Invalid TASK_TYPE: %r" % e
+ try:
+ e.tasktype_module = moe.util.load_module('moe.tasktypes.' + task_type)
+ except ImportError:
+ e.log.exception()
+ raise MoeError, 'Unknown TASK_TYPE: %r' % task_type
+ e.tasktype_module.init(e)
+
+def hook_write_metadata(e):
+ """(mainline at time 90) Write status metadata into file `STATUS_FILE`."""
+ e.debug_dump_status()
+ e.log.debug('Writing status file %s', e['STATUS_FILE'])
+ with open(e['STATUS_FILE'], 'w') as f:
+ e.status.write(f)
+ # TODO: dump to ddebug
+
+
+