From: Tomas Gavenciak Date: Tue, 7 Sep 2010 12:16:57 +0000 (+0200) Subject: Renamed conf -> config, refactoring inside config X-Git-Tag: python-dummy-working~36 X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=d8de72934717e8f287301c07d2ec70f1ca332e79;p=moe.git Renamed conf -> config, refactoring inside config Added ConfigTree.parse and ConfigTree.parse_file Minor global renaming changes inside parser and tests --- diff --git a/t/moe/conf.py b/t/moe/conf.py deleted file mode 100644 index 00e0b5a..0000000 --- a/t/moe/conf.py +++ /dev/null @@ -1,389 +0,0 @@ -""" -conf.py -------- - -Lazy conditional string evaluation module for Moe configuration variables. - - -* Each variable has ordered list of operations (definitions), each defining operation either -assigns (SET) or appends (APPEND) value of an expression to the variable. Each operation may be guarded by condition(s). - -NOTE: If no 'SET' applies, a variable is still undefined even if some 'APPEND' applies. This might change. - -* Each condition is a formula (tree consisting of 'AND', 'OR', 'NOT' and '==', '!=' between two expressions. - -* Expression is a list of strings and variables to be expanded. - -NOTE: All expanded data should be (or is converted to) unicode - - -TODO (OPT): Cleanup of unused undefined variables. -TODO (OPT): Better variable name checking (no name '.'-structural prefix of another) -TODO (OPT): Implemet "subtree" listing. -""" - -import types, itertools, re, bisect -import logging as log - -from moe import MoeError - - -"Allowed depth of recursion - includes ALL recursive calls, so should quite high." -c_maxdepth = 256 - -"Maximum attained depth of recursion - for debug/testing" -debug_maxdepth = 0 - -"Variable name regexp, dots (separators) must be separated from edges and each other." -re_VARNAME = re.compile(r'\A([A-Za-z0-9_-]+\.)*[A-Za-z0-9_-]+\Z') - - -def check_depth(depth): - "Helper to check for recursion depth." - global debug_maxdepth - if depth > c_maxdepth: - raise CyclicConfigError('Too deep recursion in config evaluation (cyclic substitution?)') - if depth > debug_maxdepth: - debug_maxdepth = depth - - -class ConfigError(MoeError): - pass - -class UndefinedError(ConfigError): - pass - -class VariableNameError(ConfigError): - pass - -class VariableFixedError(ConfigError): - pass - -class CyclicConfigError(ConfigError): - pass - - -class ConfigTree(object): - """ - Configuration tree containing all the variables. - - The variables in `self.variables` are referenced directly by the full name. - """ - - def __init__(self): - self.variables = {} - - def lookup(self, key, create = True): - """ - Lookup and return a variable. - If not found and `create` set, check the name and transparently create a new one. - """ - if key not in self.variables: - if not re_VARNAME.match(key): - raise VariableNameError('Invalid variable identifier %r in config', key) - if not create: - raise UndefinedError('Config variable %r undefined.', key) - self.variables[key] = ConfigVar(key) - return self.variables[key] - - def __getitem__(self, key) - """ - Return the value of an existing variable. - """ - return self.lookup(key, create=False).value() - - def dump(self, prefix=''): - """ - Pretty printing of the tree. - Returns an iterator of lines (strings). - """ - return itertools.chain(*[ - self.variables[k].dump(prefix) for k in sorted(self.variables.keys()) - ]) - - -class ConfigElem(object): - """ - Base class for cahed config elements - variables and conditions - """ - - def __init__(self, name): - # Full name with separators, definition for conditions - self.name = name - # Vars and conditions depending on value of this one - self.dependants = set([]) - # Cached value (may be None in case of evaluation error) - self.cached = False - self.cached_val = None - - def invalidate(self, depth=0): - """ - Invalidate cached data and invalidate all dependants. - Does nothing if not cached. - """ - check_depth(depth) - if self.cached: - log.debug('invalidating %s', self) - self.cached = False - for d in self.dependants: - d.invalidate(depth + 1) - - def value(self, depth=0): - "Caching helper calling self.evaluate(), returns a value or throws an exception." - check_depth(depth) - if not self.cached: - self.cached_val = self.evaluate(depth=depth+1) - self.cached = True - if self.cached_val == None: - raise UndefinedError("Unable to evaluate %r."%(self.name,)) - return self.cached_val - - def __str__(self): - return self.name - - -class ConfigCondition(ConfigElem): - """ - Condition using equality and logic operators. - Clause is a tuple-tree in the following recursive form: - ('AND', c1, c1), ('OR', c1, c2), ('NOT', c1), - ('==', e1, e2), ('!=', e1, e2) where e1, e2 are `ConfigExpression`s. - """ - - def __init__(self, formula, text=None, parent=None): - """ - Condition defined by `text` (informative), `formula` as in class definition, - `parent` is the parent condition (if any). - """ - if not text: - text = self.formula_string(formula) - super(ConfigCondition, self).__init__(text) - self.formula = formula - self.parent = parent - # Setup dependencies on used variables (not on the parent condition) - for v in self.variables(): - v.dependants.add(self) - if self.parent: - self.parent.dependants.add(self) - - def variables(self, cl=None): - "Return an iterator of variables used in formula `cl`" - if not cl: - cl = self.formula - if cl[0] in ['==','!=']: - return itertools.chain(cl[1].variables(), cl[2].variables()) - if cl[0] in ['AND','OR']: - return itertools.chain(self.variables(cl[1]), self.variables(cl[2])) - return self.variables(cl[1]) # only 'NOT' left - - def remove_dependencies(self): - "Remove self as a dependant from all used variables" - for v in self.variables(): - v.dependants.discard(self) - if self.parent: - self.parent.dependants.discard(self) - - def evaluate(self, cl=None, depth=0): - """Evaluate formula `cl` (or the entire condition). - Partial evaluation for AND and OR. Tests the parent condition first.""" - check_depth(depth) - if not cl: - cl = self.formula - if self.parent and not self.parent.value(): - return False - if cl[0] in ['==','!=']: - v = cl[1].evaluate(depth=depth+1) == cl[2].evaluate(depth=depth+1) - if cl[0] == '!=': v = not v - return v - v1 = self.evaluate(cl=cl[1], depth=depth+1) - if cl[0] == 'NOT': - return not v1 - if cl[0] == 'OR' and v1: return True - if cl[0] == 'AND' and not v1: return False - return self.evaluate(cl=cl[2], depth=depth+1) - - def formula_string(self, formula): - "Create a string representation of a formula." - if formula[0] == 'AND': - return itertools.chain(['('], self.formula_string(formula[1]), [' and '], self.formula_string(formula[2]),[')']) - elif formula[0] == 'OR': - return itertools.chain(['('], self.formula_string(formula[1]), [' or '], self.formula_string(formula[2]),[')']) - elif formula[0] == 'NOT': - return itertools.chain(['(not '], self.formula_string(formula[1]),[')']) - elif formula[0] in ['==', '!=']: - return itertools.chain(formula[1], formula[0], formula[2]) - return iter(['']) - - def str(self, parents=False): - "Retur the defining expression, if `parents` set, then prefixed with parent conditions." - if parents and self.parent: - return self.parent.str(parents=True) + u' && ' + self.name - return self.name - - def __str__(self): - return self.str(parents=False) - - -class Operation(object): - "Helper class for operation data. Must not be present in more variables or present multiple times." - - def __init__(self, operation, condition, expression, level=0, source='?'): - # operation is currently 'SET' and 'APPEND' - self.operation = operation - self.condition = condition - self.expression = expression - self.level = level - self.source = source - - def __str__(self): - return "%s <%d, %s> [%s] %r" % ( {'SET':'=', 'APPEND':'+'}[self.operation], self.level, self.source, - (self.condition and self.condition.str(parents=True)) or '', unicode(self.expression)) - - -class ConfigVar(ConfigElem): - - def __init__(self, name): - super(ConfigVar, self).__init__(name) - # Ordered list of `Operations` (ascending by `level`) - self.operations = [] - # Fixed to value (may be None) - self.fixed = False - self.fixed_val = None - - def variables(self): - "Return a set of variables used in the expressions" - return set(sum([ list(op.expression.variables()) for op in self.operations ], [])) - - def fix(self): - """ - Fixes the value of the variable. Exception is raised should the variable - evaluate to a different value while fixed. - """ - if self.fixed: - return - self.fixed_val = self.value() - self.fixed = True - - def unfix(self): - "Set the variable to be modifiable again." - self.fixed = False - - def value(self, depth=0): - "Handle the case when fixed, raise exc. on different evaluation" - val = super(ConfigVar,self).value(depth) - if self.fixed and self.fixed_val != val: - raise VariableFixedError("value of var %s was fixed to %r but evaluated to %r", self.name, self.fixed_val, val) - return val - - def add_operation(self, operation): - """ - Inserts an operation. The operations are sorted by `level` (ascending), new operation goes last among - these with the same level. - Adds the variable as a dependant of the conditions and variables used in the expressions. - """ - # Invalidate cached value - self.invalidate() - # Add the operation - pos = bisect.bisect_right([o.level for o in self.operations], operation.level) - self.operations.insert(pos, operation) - # Create dependencies - for v in operation.expression.variables(): - v.dependants.add(self) - if operation.condition: - operation.condition.dependants.add(self) - - def remove_operation(self, operation): - """ - Remove the Operation. - Also removes the variable as dependant from all conditions and variables used in this - operation that are no longer used. - """ - # Invalidate cached value - self.invalidate() - # Remove the operation - self.operations.remove(operation) - # Remove dependencies on variables unused in other operations - vs = self.variables() - for v in operation.expression.variables(): - if v not in vs: - v.dependants.remove(self) - # Remove the dependency on the conditions (if not used in another operation) - if operation.condition and operation.condition not in [op.condition for op in self.operations]: - operation.condition.dependants.remove(self) - - def evaluate(self, depth=0): - """ - Find the last 'SET' operation that applies and return the result of concatenating with all - subsequent applicable 'APPEND' operations. The result is the same as performing the operations - first-to-last. - NOTE: undefined if some 'APPEND' apply but no 'SET' applies. - """ - check_depth(depth) - log.debug('evaluating var %r', self.name) - # List of strings to be concatenated - val = [] - # Scan for last applicable expression - try each starting from the end, concatenate extensions - for i in range(len(self.operations)-1, -1, -1): - op = self.operations[i] - # Check the guarding condition - if (not op.condition) or op.condition.value(depth+1): - val.insert(0, op.expression.evaluate(depth=depth+1)) - if op.operation == 'SET': - return u''.join(val) - return None - - def dump(self, prefix=''): - """ - Pretty printing of the variable. Includes all operations. - Returns iterator of lines (unicode strings). - """ - # Try to evaluate the variable, but avoid undefined exceptions - v = None - try: - v = self.value(depth=0) - except ConfigError: - pass - yield prefix+u'%s = %r' % (self.name, v) - for op in self.operations: - #yield prefix+u' %s [%s] %s' % (op.operation, op.condition and op.condition.str(parents=True), op.expression) - yield prefix + u' ' + unicode(op) - - -class ConfigExpression(object): - """ - String expression with some unexpanded config variables. Used in variable operations and conditions. - Expression is given as a list of unicode strings and ConfigVar variables to be expanded. - """ - - def __init__(self, exprlist, original = u''): - self.exprlist = list(exprlist) - # Original defining string - self.original = original - # Replace strings with unicode - for i in range(len(self.exprlist)): - e = self.exprlist[i] - if isinstance(e, types.StringTypes): - if not isinstance(e, unicode): - self.exprlist[i] = unicode(e, 'ascii') - - def variables(self): - "Return an iterator of variables user in the expression" - return itertools.ifilter(lambda e: isinstance(e, ConfigVar), self.exprlist) - - def __str__(self): - return self.original - - def evaluate(self, depth): - check_depth(depth) - "Return unicode result of expansion of the variables." - s = [] - for e in self.exprlist: - if isinstance(e, ConfigVar): - s.append(e.value(depth+1)) - elif isinstance(e, unicode): - s.append(e) - else: - raise ConfigError('Invalid type %s in expression \'%s\'.'%(type(e), self)) - return u''.join(s) - - diff --git a/t/moe/config.py b/t/moe/config.py index a96fdb6..813976c 100644 --- a/t/moe/config.py +++ b/t/moe/config.py @@ -1,211 +1,403 @@ -#!/usr/bin/env python - -import re -import sys -import moe - -key_pattern = re.compile("^[A-Za-z0-9_-]+$") -ref_pattern = re.compile("^[A-Za-z0-9_-]+") - -class MoeConfigInvalid(moe.MoeError): - pass - -class MoeConfigEvalError(moe.MoeError): - pass - -class MoeConfig: - """Moe configuration file. Should be immutable once a part of a stack.""" - - def __init__(self, file=None, name=None, type=""): - self.vars = {} - self.type = type - if file is not None: - self.load(file) - elif name is not None: - self.name = name - try: - file = open(name, "r") - except IOError, err: - raise MoeConfigInvalid, "Cannot open configuration file %s: %s" % (name, err.strerror) - else: - self.load(file) - - def set(self, k, v): - self.vars[k] = [("s", v)] - - def parse_line(self, x): - x = x.rstrip("\n").lstrip(" \t") - if x=="" or x.startswith("#"): - pass - else: - sep = x.find("=") - if sep >= 0: - k = x[:sep] - v = x[sep+1:] - if k.endswith("+"): - k = k[:-1] - if not self.vars.has_key(k): - self.vars[k] = [("a","")]; - else: - self.vars[k] += [("s"," ")] - else: - self.vars[k] = [] - if not key_pattern.match(k): - raise MoeConfigInvalid, "Malformed name of configuration variable" - if v.startswith("'"): - v=v[1:] - if not v.endswith("'"): - raise MoeConfigInvalid, "Misquoted string" - self.vars[k].append(("s", v[:-1])) - elif v.startswith('"'): - v=v[1:] - if not v.endswith('"'): - raise MoeConfigInvalid, "Misquoted string" - self.parse_interpolated(self.vars[k], v[:-1]) - else: - self.parse_interpolated(self.vars[k], v) - else: - raise MoeConfigInvalid, "Parse error" - - def load(self, file): - lino = 0 - for x in file.readlines(): - lino += 1 - try: - self.parse_line(x) - except MoeConfigInvalid, x: - msg = x.message + " at line " + str(lino) - if hasattr(self, "name"): - msg += " of " + self.name - raise MoeConfigInvalid, msg - - def parse_interpolated(self, list, s): - while s<>"": - if s.startswith("$"): - s = s[1:] - if s.startswith("{"): - p = s.find("}") - if not p: - raise MoeConfigInvalid, "Unbalanced braces" - k, s = s[1:p], s[p+1:] - if not key_pattern.match(k): - raise MoeConfigInvalid, "Invalid variable name" - else: - m = ref_pattern.match(s) - if m: - k, s = s[:m.end()], s[m.end():] - else: - raise MoeConfigInvalid, "Invalid variable reference" - list.append(("i", k)) - else: - p = s.find("$") - if p < 0: - p = len(s) - list.append(("s", s[:p])) - s = s[p:] - - def dump(self, file=sys.stdout, prefix=""): - for k,v in self.vars.items(): - file.write(prefix) - file.write(k) - if len(v) > 0 and v[0][0] == "a": - file.write("+") - v = v[1:] - file.write("=") - for t,w in v: - if t == "s": - file.write("'" + w + "'") - elif t == "i": - file.write('"$' + w + '"') - file.write("\n") - -class MoeConfigStack: - """Stack of configuration files.""" - - def __init__(self, base=None): - if base: - self.stk = base.stk[:] - else: - self.stk = [] - self.in_progress = {} - - def push(self, cfg): - self.stk.append(cfg) - - def __getitem__(self, k): - if self.in_progress.has_key(k): - raise MoeConfigEvalError, "Definition of $%s is recursive" % k; - self.in_progress[k] = 1; - v = self.do_get(k, len(self.stk)-1) - del self.in_progress[k] - return v - - def do_get(self, k, pos): - while pos >= 0: - cfg = self.stk[pos] - if cfg.vars.has_key(k): - new = cfg.vars[k] - if len(new) > 0 and new[0][0] == "a": - v = self.do_get(k, pos-1) - if v != "" and not v.endswith(" "): - v += " " - else: - v = "" - for op,arg in new: - if op == "s": - v = v + arg - elif op == "i": - v = v + self[arg] - return v - pos -= 1 - return "" - - def keys(self): - seen = {} - for cfg in self.stk: - for k in cfg.vars.keys(): - seen[k] = None - return seen.keys() - - def dump(self, file=sys.stdout, prefix=""): - for k in sorted(self.keys()): - v = self[k] - file.write("%s%s=%s\n" % (prefix,k,v)) - - def dump_defs(self, file=sys.stdout, prefix=""): - level = 0 - for cfg in self.stk: - level += 1 - file.write("%s(level %d: %s)\n" % (prefix,level,cfg.type)) - cfg.dump(file, prefix + "\t") - file.write("%s(end)\n" % prefix) - - def apply_overrides(self, prefix): - newstk = [] - for cfg in self.stk: - over = MoeConfig(type = cfg.type + '-overrides') - changed = False - for k in cfg.vars.keys(): - if k.startswith(prefix): - over.vars[k[len(prefix):]] = cfg.vars[k] - changed = True - if changed: - clean = MoeConfig(type = cfg.type) - for k in cfg.vars.keys(): - if not k.startswith(prefix): - clean.vars[k] = cfg.vars[k] - newstk.append(clean) - newstk.append(over) - else: - newstk.append(cfg) - self.stk = newstk - -def parse_overrides(argv): - cfg = None - argv0 = argv.pop(0) - while len(argv) > 0 and argv[0].find("=") >= 0: - if cfg is None: - cfg = MoeConfig(type='cmdline') - cfg.parse_line(argv.pop(0)) - argv.insert(0, argv0) - return cfg +""" +config.py +------- + +Lazy conditional string evaluation module for Moe configuration variables. + + +* Each variable has ordered list of operations (definitions), each defining operation either +assigns (SET) or appends (APPEND) value of an expression to the variable. Each operation may be guarded by condition(s). + +NOTE: If no 'SET' applies, a variable is still undefined even if some 'APPEND' applies. This might change. + +* Each condition is a formula (tree consisting of 'AND', 'OR', 'NOT' and '==', '!=' between two expressions. + +* Expression is a list of strings and variables to be expanded. + +NOTE: All expanded data should be (or is converted to) unicode + + +TODO (OPT): Cleanup of unused undefined variables. +TODO (OPT): Better variable name checking (no name '.'-structural prefix of another) +TODO (OPT): Implemet "subtree" listing. +""" + +import types, itertools, re, bisect +import logging as log + +from moe import MoeError + + +"Allowed depth of recursion - includes ALL recursive calls, so should quite high." +c_maxdepth = 256 + +"Maximum attained depth of recursion - for debug/testing" +debug_maxdepth = 0 + +"Variable name regexp, dots (separators) must be separated from edges and each other." +re_VARNAME = re.compile(r'\A([A-Za-z0-9_-]+\.)*[A-Za-z0-9_-]+\Z') + + +def check_depth(depth): + "Helper to check for recursion depth." + global debug_maxdepth + if depth > c_maxdepth: + raise CyclicConfigError('Too deep recursion in config evaluation (cyclic substitution?)') + if depth > debug_maxdepth: + debug_maxdepth = depth + + +class ConfigError(MoeError): + pass + +class UndefinedError(ConfigError): + pass + +class VariableNameError(ConfigError): + pass + +class VariableFixedError(ConfigError): + pass + +class CyclicConfigError(ConfigError): + pass + + +class ConfigTree(object): + """ + Configuration tree containing all the variables. + + The variables in `self.variables` are referenced directly by the full name. + """ + + def __init__(self): + self.variables = {} + + def lookup(self, key, create = True): + """ + Lookup and return a variable. + If not found and `create` set, check the name and transparently create a new one. + """ + if key not in self.variables: + if not re_VARNAME.match(key): + raise VariableNameError('Invalid variable identifier %r in config', key) + if not create: + raise UndefinedError('Config variable %r undefined.', key) + self.variables[key] = ConfigVar(key) + return self.variables[key] + + def __getitem__(self, key): + """ + Return the value of an existing variable. + """ + return self.lookup(key, create=False).value() + + def dump(self, prefix=''): + """ + Pretty printing of the tree. + Returns an iterator of lines (strings). + """ + return itertools.chain(*[ + self.variables[k].dump(prefix) for k in sorted(self.variables.keys()) + ]) + + def parse(self, s, source=None, level=0): + """Parse `s` (stream/string) into the tree, see `moe.confparser.ConfigParser` for details.""" + import moe.confparser + p = moe.confparser.ConfigParser(text, self, source=source, level=level) + p.parse() + + def parse_file(self, filename, desc=None, level=0): + """Parse an utf-8 file into the tree, see `moe.confparser.ConfigParser` for details. + Names the source "`filename` <`desc`>". """ + f = open(filename, 'rt') + if desc: + filename += " <" + desc + ">" + self.parse(f, source=filename, level=level) + + +class ConfigElem(object): + """ + Base class for cahed config elements - variables and conditions + """ + + def __init__(self, name): + # Full name with separators, definition for conditions + self.name = name + # Vars and conditions depending on value of this one + self.dependants = set([]) + # Cached value (may be None in case of evaluation error) + self.cached = False + self.cached_val = None + + def invalidate(self, depth=0): + """ + Invalidate cached data and invalidate all dependants. + Does nothing if not cached. + """ + check_depth(depth) + if self.cached: + log.debug('invalidating %s', self) + self.cached = False + for d in self.dependants: + d.invalidate(depth + 1) + + def value(self, depth=0): + "Caching helper calling self.evaluate(), returns a value or throws an exception." + check_depth(depth) + if not self.cached: + self.cached_val = self.evaluate(depth=depth+1) + self.cached = True + if self.cached_val == None: + raise UndefinedError("Unable to evaluate %r."%(self.name,)) + return self.cached_val + + def __str__(self): + return self.name + + +class ConfigCondition(ConfigElem): + """ + Condition using equality and logic operators. + Clause is a tuple-tree in the following recursive form: + ('AND', c1, c1), ('OR', c1, c2), ('NOT', c1), + ('==', e1, e2), ('!=', e1, e2) where e1, e2 are `ConfigExpression`s. + """ + + def __init__(self, formula, text=None, parent=None): + """ + Condition defined by `text` (informative), `formula` as in class definition, + `parent` is the parent condition (if any). + """ + if not text: + text = self.formula_string(formula) + super(ConfigCondition, self).__init__(text) + self.formula = formula + self.parent = parent + # Setup dependencies on used variables (not on the parent condition) + for v in self.variables(): + v.dependants.add(self) + if self.parent: + self.parent.dependants.add(self) + + def variables(self, cl=None): + "Return an iterator of variables used in formula `cl`" + if not cl: + cl = self.formula + if cl[0] in ['==','!=']: + return itertools.chain(cl[1].variables(), cl[2].variables()) + if cl[0] in ['AND','OR']: + return itertools.chain(self.variables(cl[1]), self.variables(cl[2])) + return self.variables(cl[1]) # only 'NOT' left + + def remove_dependencies(self): + "Remove self as a dependant from all used variables" + for v in self.variables(): + v.dependants.discard(self) + if self.parent: + self.parent.dependants.discard(self) + + def evaluate(self, cl=None, depth=0): + """Evaluate formula `cl` (or the entire condition). + Partial evaluation for AND and OR. Tests the parent condition first.""" + check_depth(depth) + if not cl: + cl = self.formula + if self.parent and not self.parent.value(): + return False + if cl[0] in ['==','!=']: + v = cl[1].evaluate(depth=depth+1) == cl[2].evaluate(depth=depth+1) + if cl[0] == '!=': v = not v + return v + v1 = self.evaluate(cl=cl[1], depth=depth+1) + if cl[0] == 'NOT': + return not v1 + if cl[0] == 'OR' and v1: return True + if cl[0] == 'AND' and not v1: return False + return self.evaluate(cl=cl[2], depth=depth+1) + + def formula_string(self, formula): + "Create a string representation of a formula." + if formula[0] == 'AND': + return itertools.chain(['('], self.formula_string(formula[1]), [' and '], self.formula_string(formula[2]),[')']) + elif formula[0] == 'OR': + return itertools.chain(['('], self.formula_string(formula[1]), [' or '], self.formula_string(formula[2]),[')']) + elif formula[0] == 'NOT': + return itertools.chain(['(not '], self.formula_string(formula[1]),[')']) + elif formula[0] in ['==', '!=']: + return itertools.chain(formula[1], formula[0], formula[2]) + return iter(['']) + + def str(self, parents=False): + "Retur the defining expression, if `parents` set, then prefixed with parent conditions." + if parents and self.parent: + return self.parent.str(parents=True) + u' && ' + self.name + return self.name + + def __str__(self): + return self.str(parents=False) + + +class Operation(object): + "Helper class for operation data. Must not be present in more variables or present multiple times." + + def __init__(self, operation, condition, expression, level=0, source='?'): + # operation is currently 'SET' and 'APPEND' + self.operation = operation + self.condition = condition + self.expression = expression + self.level = level + self.source = source + + def __str__(self): + return "%s <%d, %s> [%s] %r" % ( {'SET':'=', 'APPEND':'+'}[self.operation], self.level, self.source, + (self.condition and self.condition.str(parents=True)) or '', unicode(self.expression)) + + +class ConfigVar(ConfigElem): + + def __init__(self, name): + super(ConfigVar, self).__init__(name) + # Ordered list of `Operations` (ascending by `level`) + self.operations = [] + # Fixed to value (may be None) + self.fixed = False + self.fixed_val = None + + def variables(self): + "Return a set of variables used in the expressions" + return set(sum([ list(op.expression.variables()) for op in self.operations ], [])) + + def fix(self): + """ + Fixes the value of the variable. Exception is raised should the variable + evaluate to a different value while fixed. + """ + if self.fixed: + return + self.fixed_val = self.value() + self.fixed = True + + def unfix(self): + "Set the variable to be modifiable again." + self.fixed = False + + def value(self, depth=0): + "Handle the case when fixed, raise exc. on different evaluation" + val = super(ConfigVar,self).value(depth) + if self.fixed and self.fixed_val != val: + raise VariableFixedError("value of var %s was fixed to %r but evaluated to %r", self.name, self.fixed_val, val) + return val + + def add_operation(self, operation): + """ + Inserts an operation. The operations are sorted by `level` (ascending), new operation goes last among + these with the same level. + Adds the variable as a dependant of the conditions and variables used in the expressions. + """ + # Invalidate cached value + self.invalidate() + # Add the operation + pos = bisect.bisect_right([o.level for o in self.operations], operation.level) + self.operations.insert(pos, operation) + # Create dependencies + for v in operation.expression.variables(): + v.dependants.add(self) + if operation.condition: + operation.condition.dependants.add(self) + + def remove_operation(self, operation): + """ + Remove the Operation. + Also removes the variable as dependant from all conditions and variables used in this + operation that are no longer used. + """ + # Invalidate cached value + self.invalidate() + # Remove the operation + self.operations.remove(operation) + # Remove dependencies on variables unused in other operations + vs = self.variables() + for v in operation.expression.variables(): + if v not in vs: + v.dependants.remove(self) + # Remove the dependency on the conditions (if not used in another operation) + if operation.condition and operation.condition not in [op.condition for op in self.operations]: + operation.condition.dependants.remove(self) + + def evaluate(self, depth=0): + """ + Find the last 'SET' operation that applies and return the result of concatenating with all + subsequent applicable 'APPEND' operations. The result is the same as performing the operations + first-to-last. + NOTE: undefined if some 'APPEND' apply but no 'SET' applies. + """ + check_depth(depth) + log.debug('evaluating var %r', self.name) + # List of strings to be concatenated + val = [] + # Scan for last applicable expression - try each starting from the end, concatenate extensions + for i in range(len(self.operations)-1, -1, -1): + op = self.operations[i] + # Check the guarding condition + if (not op.condition) or op.condition.value(depth+1): + val.insert(0, op.expression.evaluate(depth=depth+1)) + if op.operation == 'SET': + return u''.join(val) + return None + + def dump(self, prefix=''): + """ + Pretty printing of the variable. Includes all operations. + Returns iterator of lines (unicode strings). + """ + # Try to evaluate the variable, but avoid undefined exceptions + v = None + try: + v = self.value(depth=0) + except ConfigError: + pass + yield prefix+u'%s = %r' % (self.name, v) + for op in self.operations: + #yield prefix+u' %s [%s] %s' % (op.operation, op.condition and op.condition.str(parents=True), op.expression) + yield prefix + u' ' + unicode(op) + + +class ConfigExpression(object): + """ + String expression with some unexpanded config variables. Used in variable operations and conditions. + Expression is given as a list of unicode strings and ConfigVar variables to be expanded. + """ + + def __init__(self, exprlist, original = u''): + self.exprlist = list(exprlist) + # Original defining string + self.original = original + # Replace strings with unicode + for i in range(len(self.exprlist)): + e = self.exprlist[i] + if isinstance(e, types.StringTypes): + if not isinstance(e, unicode): + self.exprlist[i] = unicode(e, 'ascii') + + def variables(self): + "Return an iterator of variables user in the expression" + return itertools.ifilter(lambda e: isinstance(e, ConfigVar), self.exprlist) + + def __str__(self): + return self.original + + def evaluate(self, depth): + check_depth(depth) + "Return unicode result of expansion of the variables." + s = [] + for e in self.exprlist: + if isinstance(e, ConfigVar): + s.append(e.value(depth+1)) + elif isinstance(e, unicode): + s.append(e) + else: + raise ConfigError('Invalid type %s in expression \'%s\'.'%(type(e), self)) + return u''.join(s) + + diff --git a/t/moe/config_old.py b/t/moe/config_old.py new file mode 100644 index 0000000..a96fdb6 --- /dev/null +++ b/t/moe/config_old.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python + +import re +import sys +import moe + +key_pattern = re.compile("^[A-Za-z0-9_-]+$") +ref_pattern = re.compile("^[A-Za-z0-9_-]+") + +class MoeConfigInvalid(moe.MoeError): + pass + +class MoeConfigEvalError(moe.MoeError): + pass + +class MoeConfig: + """Moe configuration file. Should be immutable once a part of a stack.""" + + def __init__(self, file=None, name=None, type=""): + self.vars = {} + self.type = type + if file is not None: + self.load(file) + elif name is not None: + self.name = name + try: + file = open(name, "r") + except IOError, err: + raise MoeConfigInvalid, "Cannot open configuration file %s: %s" % (name, err.strerror) + else: + self.load(file) + + def set(self, k, v): + self.vars[k] = [("s", v)] + + def parse_line(self, x): + x = x.rstrip("\n").lstrip(" \t") + if x=="" or x.startswith("#"): + pass + else: + sep = x.find("=") + if sep >= 0: + k = x[:sep] + v = x[sep+1:] + if k.endswith("+"): + k = k[:-1] + if not self.vars.has_key(k): + self.vars[k] = [("a","")]; + else: + self.vars[k] += [("s"," ")] + else: + self.vars[k] = [] + if not key_pattern.match(k): + raise MoeConfigInvalid, "Malformed name of configuration variable" + if v.startswith("'"): + v=v[1:] + if not v.endswith("'"): + raise MoeConfigInvalid, "Misquoted string" + self.vars[k].append(("s", v[:-1])) + elif v.startswith('"'): + v=v[1:] + if not v.endswith('"'): + raise MoeConfigInvalid, "Misquoted string" + self.parse_interpolated(self.vars[k], v[:-1]) + else: + self.parse_interpolated(self.vars[k], v) + else: + raise MoeConfigInvalid, "Parse error" + + def load(self, file): + lino = 0 + for x in file.readlines(): + lino += 1 + try: + self.parse_line(x) + except MoeConfigInvalid, x: + msg = x.message + " at line " + str(lino) + if hasattr(self, "name"): + msg += " of " + self.name + raise MoeConfigInvalid, msg + + def parse_interpolated(self, list, s): + while s<>"": + if s.startswith("$"): + s = s[1:] + if s.startswith("{"): + p = s.find("}") + if not p: + raise MoeConfigInvalid, "Unbalanced braces" + k, s = s[1:p], s[p+1:] + if not key_pattern.match(k): + raise MoeConfigInvalid, "Invalid variable name" + else: + m = ref_pattern.match(s) + if m: + k, s = s[:m.end()], s[m.end():] + else: + raise MoeConfigInvalid, "Invalid variable reference" + list.append(("i", k)) + else: + p = s.find("$") + if p < 0: + p = len(s) + list.append(("s", s[:p])) + s = s[p:] + + def dump(self, file=sys.stdout, prefix=""): + for k,v in self.vars.items(): + file.write(prefix) + file.write(k) + if len(v) > 0 and v[0][0] == "a": + file.write("+") + v = v[1:] + file.write("=") + for t,w in v: + if t == "s": + file.write("'" + w + "'") + elif t == "i": + file.write('"$' + w + '"') + file.write("\n") + +class MoeConfigStack: + """Stack of configuration files.""" + + def __init__(self, base=None): + if base: + self.stk = base.stk[:] + else: + self.stk = [] + self.in_progress = {} + + def push(self, cfg): + self.stk.append(cfg) + + def __getitem__(self, k): + if self.in_progress.has_key(k): + raise MoeConfigEvalError, "Definition of $%s is recursive" % k; + self.in_progress[k] = 1; + v = self.do_get(k, len(self.stk)-1) + del self.in_progress[k] + return v + + def do_get(self, k, pos): + while pos >= 0: + cfg = self.stk[pos] + if cfg.vars.has_key(k): + new = cfg.vars[k] + if len(new) > 0 and new[0][0] == "a": + v = self.do_get(k, pos-1) + if v != "" and not v.endswith(" "): + v += " " + else: + v = "" + for op,arg in new: + if op == "s": + v = v + arg + elif op == "i": + v = v + self[arg] + return v + pos -= 1 + return "" + + def keys(self): + seen = {} + for cfg in self.stk: + for k in cfg.vars.keys(): + seen[k] = None + return seen.keys() + + def dump(self, file=sys.stdout, prefix=""): + for k in sorted(self.keys()): + v = self[k] + file.write("%s%s=%s\n" % (prefix,k,v)) + + def dump_defs(self, file=sys.stdout, prefix=""): + level = 0 + for cfg in self.stk: + level += 1 + file.write("%s(level %d: %s)\n" % (prefix,level,cfg.type)) + cfg.dump(file, prefix + "\t") + file.write("%s(end)\n" % prefix) + + def apply_overrides(self, prefix): + newstk = [] + for cfg in self.stk: + over = MoeConfig(type = cfg.type + '-overrides') + changed = False + for k in cfg.vars.keys(): + if k.startswith(prefix): + over.vars[k[len(prefix):]] = cfg.vars[k] + changed = True + if changed: + clean = MoeConfig(type = cfg.type) + for k in cfg.vars.keys(): + if not k.startswith(prefix): + clean.vars[k] = cfg.vars[k] + newstk.append(clean) + newstk.append(over) + else: + newstk.append(cfg) + self.stk = newstk + +def parse_overrides(argv): + cfg = None + argv0 = argv.pop(0) + while len(argv) > 0 and argv[0].find("=") >= 0: + if cfg is None: + cfg = MoeConfig(type='cmdline') + cfg.parse_line(argv.pop(0)) + argv.insert(0, argv0) + return cfg diff --git a/t/moe/config_parser.py b/t/moe/config_parser.py new file mode 100644 index 0000000..73b5406 --- /dev/null +++ b/t/moe/config_parser.py @@ -0,0 +1,385 @@ +""" +config_parser.py +------------ + +Simple Moe configuration file syntax parser. + +TODO: decide neccessity of '()' in/around formulas +TODO: check escaping in expressions +TODO: should whitespace (incl. '\\n') be allowed (almost) everywhere? + can comment be anywhere whitespace can? + +Generally, whitespace and comments are alowed everywhere except in variable names and inside expressions. +Also, COMMENT must not contain '\\n'. + +FILE, BLOCK, STATEMENT, OPERATION, SUBTREE, CONDITION, FORMULA, AND, OR and NOT eat any preceding whitespace. TODO: check? + +The configuration syntax is the following: + +FILE = BLOCK +BLOCK = WS | STATEMENT ( SEP STATEMENT )* + +SEP = ( '\\n' | ';' ) +WS = ( ' ' | '\\t' | '\\n' | COMMENT )* + +COMMENT = re('#[^\\n]*\\n') + +STATEMENT = CONDITION | OPERATION | SUBTREE + +OPERATION = WS VARNAME WS ( '=' | '+=' ) WS EXPRESSION +SUBTREE = WS VARNAME WS '{' BLOCK WS '}' +CONDITION = WS 'if' FORMULA WS '{' BLOCK WS '}' + +FORMULA = WS (( EXPRESSION WS ( '!=' | '==' ) WS EXPRESSION ) | '(' AND WS ')' | '(' OR WS ')' | NOT ) +AND = FORMULA WS 'and' FORMULA +OR = FORMULA WS 'or' FORMULA +NOT = WS 'not' FORMULA + +NOTE: ';' or '\n' is currently required even after CONDITION and SUBTREE block + TODO: change to OPERATION only +NOTE: Formula may contain additional/extra parentheses + +EXPRESSION = '"' ( ECHAR | '{' VARNAME '}' )* '"' | re"'[^'\\n]*'" | VARNAME +ECHAR = re('([^\\{}]|\\\\|\\{|\\}|\\n)*') +VARNAME = re('[a-zA-Z0-9-_]+(\.[a-zA-Z0-9-_]+)*') +""" + +import re, types, itertools, logging as log +import traceback + +import moe.config as cf + + +class ConfigSyntaxError(cf.ConfigError): + + def __init__(self, msg, source='', line=None, column=None): + self.msg = msg + self.source = source + self.line = line + self.column = column + + def __str__(self): + return('ConfigSyntaxError %s:%d:%d: %s'%(self.source, self.line, self.column, self.msg)) + + +class ConfigParser(object): + c_varname_sep = u'.' + c_comment = u'#' + c_open = u'{' + c_close = u'}' + c_ws = u' \t\n' + c_sep = u';\n' + c_nl = u'\n' + c_if = u'if' + c_and = u'and' + c_or = u'or' + c_not = u'not' + c_eq = u'==' + c_neq = u'!=' + c_set = u'=' + c_append = u'+=' + + def __init__(self, s, tree, source='', level=0): + """Create a config file parser. + `s` is either a string, unicode or an open file. File is assumed to be utf-8, string is converted to unicode. + `tree` is a ConfigTree to fill the operations into. + `source` is an optional name of the file, for debugging and syntax errors. + `level` indicates the precedence the operations should have in the ConfigTree + """ + self.s = s # Unicode, ascii string or an open file + self.buf = u"" # Read-buffer for s file, whole unicode string for s string/unicode + if isinstance(self.s, types.StringTypes): + self.buf = unicode(self.s) + elif (not isinstance(self.s, file)) or self.s.closed: + raise TypeError("Expected unicode, str or open file.") + self.bufpos = 0 + self.source = source # Usually filename + self.line = 1 + self.column = 1 + self.tree = tree # ConfTree to fill + self.level = level # level of the parsed operations + self.prefix = '' # Prefix of variable name, may begin with '.' + self.conditions = [] # Stack of nested conditions, these are chained, so only the last is necessary + self.read_ops = [] # List of parsed operations (varname, `Operation`), returned by `self.parse()` + + def preread(self, l): + "Make sure buf contains at least `l` next characters, return True on succes and False on hitting EOF." + if isinstance(self.s, file): + self.buf = self.buf[self.bufpos:] + self.s.read(max(l, 1024)).decode('utf8') + self.bufpos = 0 + return len(self.buf) >= self.bufpos + l + + def peek(self, l = 1): + "Peek and return next `l` unicode characters or everything until EOF." + self.preread(l) + return self.buf[self.bufpos:self.bufpos+l] + + def peeks(self, s): + "Peek and compare next `len(s)` characters to `s`. Converts `s` to unicode. False on hitting EOF." + s = unicode(s) + return self.peek(len(s)) == s + + def next(self, l = 1): + "Eat and return next `l` unicode characters. Raise exception on EOF." + if not self.preread(l): + self.syntax_error("Unexpected end of file") + s = self.buf[self.bufpos:self.bufpos+l] + self.bufpos += l + rnl = s.rfind('\n') + if rnl<0: + # no newline + self.column += l + else: + # some newlines + self.line += s.count('\n') + self.column = l - rnl - 1 + return s + + def nexts(self, s): + """Compare next `len(s)` characters to `s`. On match, eat them and return True. Otherwise just return False. + Converts `s` to unicode. False on hitting EOF.""" + s = unicode(s) + if self.peeks(s): + self.next(len(s)) + return True + return False + + def eof(self): + "Check for end-of-stream." + return not self.preread(1) + + def expect(self, s, msg=None): + "Eat and compare next `len(s)` characters to `s`. If not equal, raise an error with `msg`. Unicode." + s = unicode(s) + if not self.nexts(s): + self.syntax_error(msg or u"%r expected."%(s,)) + + def syntax_error(self, msg, *args): + "Raise a syntax error with file/line/column info" + raise ConfigSyntaxError(source=self.source, line=self.line, column=self.column, msg=(msg%args)) + + def dbg(self): + n = None; s = '' + for i in traceback.extract_stack(): + if i[2][:2]=='p_': + s += ' ' + n = i[2] + if n: log.debug(s + n + ' ' + repr(self.peek(15)) + '...') + + def parse(self): + self.read_ops = [] + self.p_BLOCK() + return self.read_ops + + def p_BLOCK(self): + self.dbg() # Debug + self.p_WS() + while (not self.eof()) and (not self.peeks(self.c_close)): + self.p_STATEMENT() + l0 = self.line + self.p_WS() + if self.eof() or self.peeks(self.c_close): + break + if self.line == l0: # No newline skipped in p_WS + self.expect(';') + else: + self.nexts(';') # NOTE: this is weird - can ';' occur anywhere? Or at most once, but only after any p_WS debris? + self.p_WS() + + def p_WS(self): + self.dbg() # Debug + while not self.eof(): + if self.peek() in self.c_ws: + self.next() + elif self.peeks(self.c_comment): + self.p_COMMENT() + else: + break + + def p_COMMENT(self): + self.dbg() # Debug + self.expect(self.c_comment, "'#' expected at the beginning of a comment.") + while (not self.eof()) and (not self.nexts(self.c_nl)): + self.next(1) + + def p_STATEMENT(self): + self.dbg() # Debug + self.p_WS() + if self.peeks(self.c_if): + self.p_CONDITION() + else: + # for operation or subtree, read VARNAME + varname = self.p_VARNAME() + self.p_WS() + if self.peeks(self.c_open): + self.p_SUBTREE(varname) + else: + self.p_OPERATION(varname) + + def p_SUBTREE(self, varname=None): + self.dbg() # Debug + if not varname: + self.p_WS() + varname = self.p_VARNAME() + self.p_WS() + self.expect(self.c_open) + # backup and extend the variable name prefix + p = self.prefix + self.prefix = p + self.c_varname_sep + varname + self.p_BLOCK() + self.prefix = p + # close block and + self.p_WS() + self.expect(self.c_close) + + def p_OPERATION(self, varname=None): + self.dbg() # Debug + if not varname: + self.p_WS() + varname = self.p_VARNAME() + self.p_WS() + if self.nexts(self.c_set): + op = 'SET' + elif self.nexts(self.c_append): + op = 'APPEND' + else: + self.syntax_error('Unknown operation.') + self.p_WS() + exp = self.p_EXPRESSION() + vname = (self.prefix+self.c_varname_sep+varname).lstrip(self.c_varname_sep) + v = self.tree.lookup(vname) + if self.conditions: + cnd = self.conditions[-1] + else: + cnd = None + op = cf.Operation(op, cnd, exp, level=self.level, + source="%s:%d:%d"%(self.source, self.line, self.column)) + # NOTE/WARNING: The last character of operation will be reported in case of error. + v.add_operation(op) + self.read_ops.append( (vname, op) ) + + def p_CONDITION(self): + self.dbg() # Debug + self.p_WS() + t = u"condition at %s:%d:%d"%(self.source, self.line, self.column) + self.expect(self.c_if) + self.p_WS() + f = self.p_FORMULA() + cnd = cf.ConfigCondition(f, text=t, parent=(self.conditions and self.conditions[-1]) or None) + self.conditions.append(cnd) + # Parse a block + self.p_WS() + self.expect(self.c_open) + self.p_BLOCK() + self.p_WS() + self.expect(self.c_close) + # Cleanup + self.conditions.pop() + + def p_VARNAME(self): + self.dbg() # Debug + vnl = [] + while self.preread(1) and (self.peek().isalnum() or self.peek() in u'-_.'): + vnl.append(self.next()) + vn = u''.join(vnl) + if not cf.re_VARNAME.match(vn): + self.syntax_error('Invalid variable name %r', vn) + return vn + + def p_EXPRESSION(self): + self.dbg() # Debug + if self.peek() not in '\'"': + # Expect a variable name + varname = self.p_VARNAME() + return cf.ConfigExpression((self.tree.lookup(varname),), varname) + op = self.next() + # Parse literal expression + if op == u'\'': + exl = [] + while not self.peeks(op): + exl.append(self.next()) + self.expect(op) + s = u''.join(exl) + return cf.ConfigExpression((s,), s) + # Parse expression with variables + exl = [op] + expr = [] + while not self.peeks(op): + exl.append(self.peek()) + if self.nexts(u'\\'): + # Escape sequence + c = self.next() + if c not in u'\\"n' + self.c_open + self.c_close: + self.syntax_error('Illeal escape sequence in expression') + if c == 'n': + expr.append(u'\n') + else: + expr.append(c) + exl.append(c) + elif self.nexts(self.c_open): + # Parse a variable name in '{}' + varname = self.p_VARNAME() + self.expect(self.c_close) + exl.append(varname) + expr.append(self.tree.lookup(varname)) + else: + # Regular character + expr.append(self.next()) + self.expect(op) + exs = ''.join(exl) + # Concatenate consecutive characters in expr + expr2 = [] + for i in expr: + if expr2 and isinstance(expr2[-1], unicode) and isinstance(i, unicode): + expr2[-1] = expr2[-1] + i + else: + expr2.append(i) + return cf.ConfigExpression(expr2, exs) + + def p_FORMULA(self): + self.dbg() # Debug + self.p_WS() + # Combined logical formula + if self.nexts(u'('): + f1 = self.p_FORMULA() + self.p_WS() + if self.nexts(self.c_and): + if self.peek(1).isalnum(): + self.syntax_error('trailing characters after %r', self.c_and) + f2 = self.p_FORMULA() + self.p_WS() + self.expect(u')') + return ('AND', f1, f2) + elif self.nexts(self.c_or): + if self.peek(1).isalnum(): + self.syntax_error('trailing characters after %r', self.c_or) + f2 = self.p_FORMULA() + self.p_WS() + self.expect(u')') + return ('OR', f1, f2) + elif self.nexts(u')'): + # Only extra parenthes + return f1 + else: + self.syntax_error("Logic operator or ')' expected") + elif self.nexts(self.c_not): + if self.peek().isalnum(): + self.syntax_error('trailing characters after %r', self.c_not) + # 'not' formula + f = self.p_FORMULA() + return ('NOT', f) + else: + # Should be (in)equality condition + e1 = self.p_EXPRESSION() + self.p_WS() + if self.nexts(self.c_eq): + self.p_WS() + e2 = self.p_EXPRESSION() + return ('==', e1, e2) + elif self.nexts(self.c_neq): + self.p_WS() + e2 = self.p_EXPRESSION() + return ('!=', e1, e2) + else: + self.syntax_error("Comparation operator expected") + diff --git a/t/moe/config_test.py b/t/moe/config_test.py new file mode 100644 index 0000000..573934f --- /dev/null +++ b/t/moe/config_test.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- + +import moe.config as cf +from moe.config_parser import * +import logging as log +import unittest +import tempfile + +class TestConfig(unittest.TestCase): + + def setUp(s): + s.t = cf.ConfigTree() + + def parse(s, string, level=0, fname='test'): + cp = ConfigParser(string, s.t, fname, level) + ops = cp.parse() + cp.p_WS() + assert cp.eof() + return ops + + def var(s, varname, create=True): + return s.t.lookup(varname, create=create) + + def val(s, varname): + return s.var(varname, create=False).value() + + def eqparse(s, string, *args, **kwargs): + "Parse expression `s`, return parts that should be equal to anoter run of `eqparse(s)`." + return [(i[0], i[1].operation) for i in s.parse(string, *args, **kwargs)] + + +class TestParser(TestConfig): + s1 = r"""a="1";z{b='2';w{S.Tr_an-g.e='"'}};c.d='\n';e="{a}{b}";e+='{c.d}';a+="\"\n\{\}";f+='Z{a.b}';e+=c.d;e+=c;e+=a""" + s2 = '\t\n \n ' + s1.replace('=', '= \n ').replace(';', '\t \n\t \n ').replace('+=',' \n += ') + '\n\n ' + + def test_noWS(s): + assert len(s.parse(s.s1)) == 11 + + def test_noWS_COMMENT(s): + assert s.eqparse(s.s1+'#COMMENT') == s.eqparse(s.s1+'#') == s.eqparse(s.s1+'#\n') == s.eqparse(s.s1+'\n#') + + def test_manyWS(s): + assert s.eqparse(s.s2) == s.eqparse(s.s1) + + def test_manyWS_COMMENT(s): + assert s.eqparse(s.s2.replace('\n',' #COMMENT \n')) == s.eqparse(s.s2.replace('\n','#\n')) == s.eqparse(s.s1) + + def test_empty(s): + assert s.eqparse('') == s.eqparse('\n') == s.eqparse('') == s.eqparse('a{}') == \ + s.eqparse('a.b.c{if ""==\'\' {d.e{\n\n#Nothing\n}} }') == [] + + def test_syntax_errors(s): + s.assertRaises(ConfigSyntaxError, s.parse, "a=#") + s.assertRaises(ConfigSyntaxError, s.parse, "a='\"") + s.assertRaises(ConfigSyntaxError, s.parse, 'a="{a@b}"') + s.assertRaises(ConfigSyntaxError, s.parse, 'a="A{A"') + s.assertRaises(ConfigSyntaxError, s.parse, 'a=b"42"') + s.assertRaises(ConfigSyntaxError, s.parse, 'a=b.c.d.') + + def test_error_location(s): + try: s.parse('\t \n \n { \n \n ') + except ConfigSyntaxError, e: + assert e.line == 3 and e.column in range(2,4) + + def test_quoting(s): + s.parse(' a="\\"\\{a$b\\}\'\n\n\'{z}" ') + assert s.var('z', create=False) + # No escaping in '-string + s.assertRaises(ConfigSyntaxError, s.parse, " a='\"\\'\n\n' ") + # Variable should not be created + s.parse(" a='{z2}' ") + s.assertRaises(cf.ConfigError, s.var, 'z2', create=False) + + def test_conditions(s): + s.assertRaises(ConfigSyntaxError, s.parse, "if '{a}'=='{b}' and ''!='' {}") + s.parse('if ((#C\n (\n (not not not""!="")\n#C\n)\t ) ) {}') + s.parse('if (""=="" and not (not not ""!="" or ""=="")){}') + s.parse('if ((a=="{b}" and a.b.c!=c.b.a)or x!=y){}') + s.parse('if(""==""){a{if(""==""){if(""==""){b{if(""==""){if(""==""){}}}}}}}') + s.assertRaises(ConfigSyntaxError, s.parse, "if notnot'{a}'=='{b}' {}") + s.assertRaises(ConfigSyntaxError, s.parse, "if ('{a}'=='{b}' not and ''!='') {}") + s.assertRaises(ConfigSyntaxError, s.parse, "if ('{a}'=='{b}' ornot ''!='') {}") + s.assertRaises(ConfigSyntaxError, s.parse, "if 'a'<>'b' {}") + + +class TestConfigEval(TestConfig): + + def test_ops(s): + s.parse('c+="-C_APP"', level=20) + s.parse('a="A"; b="{a}-B"; c="C1-{b}-C2"; a+="FOO"; a="AA"') + assert s.val('c') == 'C1-AA-B-C2-C_APP' + s.parse('b+="-A:\{{a}\}";a+="A"', level=10) + assert s.val('c') == 'C1-AAA-B-A:{AAA}-C2-C_APP' + + def test_nested(s): + s.parse('a="0"; b{a="1"; b{a="2"; b{a="3"; b{a="4"; b{a="5"}}}}}') + assert s.val('b.b.b.a') == '3' + s.parse('b.b{b.b{b.a="5MOD"}}') + assert s.val('b.b.b.b.b.a') == '5MOD' + + def test_escape_chars(s): + s.parse(r"""a='{a}\\\\#\n'; b="{a}'\"\{\}"; c='\'; c+="\{{b}\}";""") + assert s.val('c') == r"""\{{a}\\\\#\n'"{}}""" + + def test_expressions(s): + s.parse('A="4"; B.B=\'2\'; B.A=A; B.C="{B.A}{B.B}"; if B.C=="42" {D=C}; if C==D{E=D}; C="OK"') + assert s.val('E') == 'OK' + + ts = 'a="A:"; if "{c1}"=="1" {a+="C1"; b="B"; if ("{c2a}"=="1" or not "{c2b}"=="1") { a+="C2"; '\ + 'if ("{c3a}"=="1" and "{c3b}"=="1") { a+="C3" }}}' + + def test_cond_chain(s): + s.parse(s.ts) + s.parse('c1="1"; c2="0"') + # b should have determined value, a should not (since c3a is undefined) + s.assertRaises(cf.UndefinedError, s.val, 'a') + assert s.val('b') == 'B' + s.parse('c1="0"') + # now b should be undefined + s.assertRaises(cf.UndefinedError, s.val, 'b') + # Normal evaluation + s.parse('c1="1"; c2a="1"; c2b="0"; c3a="0"') + assert s.val('a') == 'A:C1C2' + s.parse('c3a="1"; c3b="1"; c2b="1"') + assert s.val('a') == 'A:C1C2C3' + # tests condition invalidating + s.parse('c2a+="0"') + assert s.val('a') == 'A:C1' + + def test_cond_eager(s): + s.parse(s.ts) + # undefined c2b and c3a should not be evaluated + s.parse('c1="1"; c2a="1"; c3a="0"') + assert s.val('a') == 'A:C1C2' + # but now c3b should be evaluated + s.parse('c1="1"; c2a="1"; c3a="1"') + s.assertRaises(cf.UndefinedError, s.val, 'a') + s.parse('c1="1"; c2a="1"; c3b="1"') + assert s.val('a') == 'A:C1C2C3' + + def test_undef(s): + s.assertRaises(cf.UndefinedError, s.val, 'a') + s.parse('a="{b}"') + s.assertRaises(cf.UndefinedError, s.val, 'a') + s.parse('b+="1"') + s.assertRaises(cf.UndefinedError, s.val, 'b') + + def test_loopy_def(s): + s.parse('a="A"; a+="{a}"') + s.assertRaises(cf.CyclicConfigError, s.val, 'a') + s.parse('b="{c}"; c="{b}"') + s.assertRaises(cf.CyclicConfigError, s.val, 'b') + + def test_varname(s): + s.assertRaises(cf.VariableNameError, s.val, 'b/c') + s.assertRaises(cf.VariableNameError, s.val, '.b.c') + s.assertRaises(cf.VariableNameError, s.val, 'b.c.') + s.assertRaises(cf.VariableNameError, s.val, 'b..c') + + def test_remove(s): + l = s.parse('a="A1"; b="B1"; if "{cond}"=="1" {a+="A2"; b+="B2"}; a+="A3"; b+="B3"; cond="1"') + assert s.val('a') == 'A1A2A3' + assert s.val('b') == 'B1B2B3' + # remove b+="B2" + s.var('b').remove_operation(l[3][1]) + assert s.val('a') == 'A1A2A3' + assert s.val('b') == 'B1B3' + # are the dependencies still handled properly? + s.parse('cond+="-invalidated"') + assert s.val('a') == 'A1A3' + s.parse('cond="1"') + assert s.val('a') == 'A1A2A3' + + def test_fix(s): + s.parse(""" A='4'; B="{A}"; C="2"; B+="{C}"; D="{B}"; E="{D}" """) + s.var('D').fix() + # Break by C + l = s.parse('C="3"') + s.assertRaises(cf.VariableFixedError, s.val, "E") + s.assertRaises(cf.VariableFixedError, s.val, "D") + s.var('C').remove_operation(l[0][1]) + # Break directly by D + l = s.parse('D="41"') + s.assertRaises(cf.VariableFixedError, s.val, "D") + s.assertRaises(cf.VariableFixedError, s.val, "E") + # Unfix + s.var('D').unfix() + assert s.val('E') == '41' + s.var('D').remove_operation(l[0][1]) + assert s.val('D') == '42' + + def test_unicode(s): + # Ascii (1b) and unicode (2b) + s.parse(u'A="Ú"; C="ě"') + # String + s.parse('B=\'chyln\'') + # By escapes + s.parse(u'D=\'\u0159e\u017eav\xe1\'') + s.parse(u'E="ŽluŤ"') + # Via utf8 file + f = tempfile.TemporaryFile(mode='w+t') + f.write(u'S1="\xdachyln\u011b \u0159e\u017eav\xe1 \u017dlu\u0164" ; S2="{A}{B}{C} {D} {E}"'.encode('utf8')) + f.seek(0) + s.parse(f) + # Test + s.parse(u'if "{S1}"=="{S2}" { ANS="jó!" } ') + assert s.val('ANS') == u"jó!" + assert s.val('S1') == s.val('S2') == u'\xdachyln\u011b \u0159e\u017eav\xe1 \u017dlu\u0164' + + def test_priority(s): + s.var('a').add_operation(cf.Operation('APPEND', None, cf.ConfigExpression(["4"]), level=4)) + s.var('a').add_operation(cf.Operation('APPEND', None, cf.ConfigExpression(["3a"]), level=3)) + s.var('a').add_operation(cf.Operation('APPEND', None, cf.ConfigExpression(["1"]), level=1)) + s.var('a').add_operation(cf.Operation('SET', None, cf.ConfigExpression(["2"]), level=2)) + s.var('a').add_operation(cf.Operation('APPEND', None, cf.ConfigExpression(["3b"]), level=3)) + s.var('a').add_operation(cf.Operation('SET', None, cf.ConfigExpression(["0"]), level=0)) + s.var('a').add_operation(cf.Operation('APPEND', None, cf.ConfigExpression(["5"]), level=5)) + assert s.val('a')=='23a3b45' + + + +# TODO: Fail on 1st April +# TODO (OPT): Somehow add log.debug('Maximum encountered depth: %d', cf.debug_maxdepth) + +# Coverage via command "nosetests conftest --with-coverage --cover-html-dir=cover --cover-html" + diff --git a/t/moe/confparser.py b/t/moe/confparser.py deleted file mode 100644 index d024912..0000000 --- a/t/moe/confparser.py +++ /dev/null @@ -1,384 +0,0 @@ -""" -confparse.py ------------- - -Simple Moe configuration file syntax parser. - -TODO: decide neccessity of '()' in/around formulas -TODO: check escaping in expressions -TODO: should whitespace (incl. '\\n') be allowed (almost) everywhere? - can comment be anywhere whitespace can? - -Generally, whitespace and comments are alowed everywhere except in variable names and inside expressions. -Also, COMMENT must not contain '\\n'. - -FILE, BLOCK, STATEMENT, OPERATION, SUBTREE, CONDITION, FORMULA, AND, OR and NOT eat any preceding whitespace. TODO: check? - -The configuration syntax is the following: - -FILE = BLOCK -BLOCK = WS | STATEMENT ( SEP STATEMENT )* - -SEP = ( '\\n' | ';' ) -WS = ( ' ' | '\\t' | '\\n' | COMMENT )* - -COMMENT = re('#[^\\n]*\\n') - -STATEMENT = CONDITION | OPERATION | SUBTREE - -OPERATION = WS VARNAME WS ( '=' | '+=' ) WS EXPRESSION -SUBTREE = WS VARNAME WS '{' BLOCK WS '}' -CONDITION = WS 'if' FORMULA WS '{' BLOCK WS '}' - -FORMULA = WS (( EXPRESSION WS ( '!=' | '==' ) WS EXPRESSION ) | '(' AND WS ')' | '(' OR WS ')' | NOT ) -AND = FORMULA WS 'and' FORMULA -OR = FORMULA WS 'or' FORMULA -NOT = WS 'not' FORMULA - -NOTE: ';' or '\n' is currently required even after CONDITION and SUBTREE block - TODO: change to OPERATION only -NOTE: Formula may contain additional/extra parentheses - -EXPRESSION = '"' ( ECHAR | '{' VARNAME '}' )* '"' | re"'[^'\\n]*'" | VARNAME -ECHAR = re('([^\\{}]|\\\\|\\{|\\}|\\n)*') -VARNAME = re('[a-zA-Z0-9-_]+(\.[a-zA-Z0-9-_]+)*') -""" - -import re, types, itertools, logging as log -import traceback -import moe.conf as conf - - -class ConfigSyntaxError(conf.ConfigError): - - def __init__(self, msg, fname='', line=None, column=None): - self.msg = msg - self.fname = fname - self.line = line - self.column = column - - def __str__(self): - return('ConfigSyntaxError %s:%d:%d: %s'%(self.fname, self.line, self.column, self.msg)) - - -class ConfigParser(object): - c_varname_sep = u'.' - c_comment = u'#' - c_open = u'{' - c_close = u'}' - c_ws = u' \t\n' - c_sep = u';\n' - c_nl = u'\n' - c_if = u'if' - c_and = u'and' - c_or = u'or' - c_not = u'not' - c_eq = u'==' - c_neq = u'!=' - c_set = u'=' - c_append = u'+=' - - def __init__(self, s, tree, fname='', level=0): - """Create a config file parser. - `s` is either a string, unicode or an open file. File is assumed to be utf-8, string is converted to unicode. - `tree` is a ConfigTree to fill the operations into. - `fname` is an optional name of the file, for debugging and syntax errors. - `level` indicates the precedence the operations should have in the ConfigTree - """ - self.s = s # Unicode, ascii string or an open file - self.buf = u"" # Read-buffer for s file, whole unicode string for s string/unicode - if isinstance(self.s, types.StringTypes): - self.buf = unicode(self.s) - elif (not isinstance(self.s, file)) or self.s.closed: - raise TypeError("Expected unicode, str or open file.") - self.bufpos = 0 - self.fname = fname # Filename - self.line = 1 - self.column = 1 - self.tree = tree # ConfTree to fill - self.level = level # level of the parsed operations - self.prefix = '' # Prefix of variable name, may begin with '.' - self.conditions = [] # Stack of nested conditions, these are chained, so only the last is necessary - self.read_ops = [] # List of parsed operations (varname, `Operation`), returned by `self.parse()` - - def preread(self, l): - "Make sure buf contains at least `l` next characters, return True on succes and False on hitting EOF." - if isinstance(self.s, file): - self.buf = self.buf[self.bufpos:] + self.s.read(max(l, 1024)).decode('utf8') - self.bufpos = 0 - return len(self.buf) >= self.bufpos + l - - def peek(self, l = 1): - "Peek and return next `l` unicode characters or everything until EOF." - self.preread(l) - return self.buf[self.bufpos:self.bufpos+l] - - def peeks(self, s): - "Peek and compare next `len(s)` characters to `s`. Converts `s` to unicode. False on hitting EOF." - s = unicode(s) - return self.peek(len(s)) == s - - def next(self, l = 1): - "Eat and return next `l` unicode characters. Raise exception on EOF." - if not self.preread(l): - self.syntax_error("Unexpected end of file") - s = self.buf[self.bufpos:self.bufpos+l] - self.bufpos += l - rnl = s.rfind('\n') - if rnl<0: - # no newline - self.column += l - else: - # some newlines - self.line += s.count('\n') - self.column = l - rnl - 1 - return s - - def nexts(self, s): - """Compare next `len(s)` characters to `s`. On match, eat them and return True. Otherwise just return False. - Converts `s` to unicode. False on hitting EOF.""" - s = unicode(s) - if self.peeks(s): - self.next(len(s)) - return True - return False - - def eof(self): - "Check for end-of-stream." - return not self.preread(1) - - def expect(self, s, msg=None): - "Eat and compare next `len(s)` characters to `s`. If not equal, raise an error with `msg`. Unicode." - s = unicode(s) - if not self.nexts(s): - self.syntax_error(msg or u"%r expected."%(s,)) - - def syntax_error(self, msg, *args): - "Raise a syntax error with file/line/column info" - raise ConfigSyntaxError(fname=self.fname, line=self.line, column=self.column, msg=(msg%args)) - - def dbg(self): - n = None; s = '' - for i in traceback.extract_stack(): - if i[2][:2]=='p_': - s += ' ' - n = i[2] - if n: log.debug(s + n + ' ' + repr(self.peek(15)) + '...') - - def parse(self): - self.read_ops = [] - self.p_BLOCK() - return self.read_ops - - def p_BLOCK(self): - self.dbg() # Debug - self.p_WS() - while (not self.eof()) and (not self.peeks(self.c_close)): - self.p_STATEMENT() - l0 = self.line - self.p_WS() - if self.eof() or self.peeks(self.c_close): - break - if self.line == l0: # No newline skipped in p_WS - self.expect(';') - else: - self.nexts(';') # NOTE: this is weird - can ';' occur anywhere? Or at most once, but only after any p_WS debris? - self.p_WS() - - def p_WS(self): - self.dbg() # Debug - while not self.eof(): - if self.peek() in self.c_ws: - self.next() - elif self.peeks(self.c_comment): - self.p_COMMENT() - else: - break - - def p_COMMENT(self): - self.dbg() # Debug - self.expect(self.c_comment, "'#' expected at the beginning of a comment.") - while (not self.eof()) and (not self.nexts(self.c_nl)): - self.next(1) - - def p_STATEMENT(self): - self.dbg() # Debug - self.p_WS() - if self.peeks(self.c_if): - self.p_CONDITION() - else: - # for operation or subtree, read VARNAME - varname = self.p_VARNAME() - self.p_WS() - if self.peeks(self.c_open): - self.p_SUBTREE(varname) - else: - self.p_OPERATION(varname) - - def p_SUBTREE(self, varname=None): - self.dbg() # Debug - if not varname: - self.p_WS() - varname = self.p_VARNAME() - self.p_WS() - self.expect(self.c_open) - # backup and extend the variable name prefix - p = self.prefix - self.prefix = p + self.c_varname_sep + varname - self.p_BLOCK() - self.prefix = p - # close block and - self.p_WS() - self.expect(self.c_close) - - def p_OPERATION(self, varname=None): - self.dbg() # Debug - if not varname: - self.p_WS() - varname = self.p_VARNAME() - self.p_WS() - if self.nexts(self.c_set): - op = 'SET' - elif self.nexts(self.c_append): - op = 'APPEND' - else: - self.syntax_error('Unknown operation.') - self.p_WS() - exp = self.p_EXPRESSION() - vname = (self.prefix+self.c_varname_sep+varname).lstrip(self.c_varname_sep) - v = self.tree.lookup(vname) - if self.conditions: - cnd = self.conditions[-1] - else: - cnd = None - op = conf.Operation(op, cnd, exp, level=self.level, - source="%s:%d:%d"%(self.fname, self.line, self.column)) - # NOTE/WARNING: The last character of operation will be reported in case of error. - v.add_operation(op) - self.read_ops.append( (vname, op) ) - - def p_CONDITION(self): - self.dbg() # Debug - self.p_WS() - t = u"condition at %s:%d:%d"%(self.fname, self.line, self.column) - self.expect(self.c_if) - self.p_WS() - f = self.p_FORMULA() - cnd = conf.ConfigCondition(f, text=t, parent=(self.conditions and self.conditions[-1]) or None) - self.conditions.append(cnd) - # Parse a block - self.p_WS() - self.expect(self.c_open) - self.p_BLOCK() - self.p_WS() - self.expect(self.c_close) - # Cleanup - self.conditions.pop() - - def p_VARNAME(self): - self.dbg() # Debug - vnl = [] - while self.preread(1) and (self.peek().isalnum() or self.peek() in u'-_.'): - vnl.append(self.next()) - vn = u''.join(vnl) - if not conf.re_VARNAME.match(vn): - self.syntax_error('Invalid variable name %r', vn) - return vn - - def p_EXPRESSION(self): - self.dbg() # Debug - if self.peek() not in '\'"': - # Expect a variable name - varname = self.p_VARNAME() - return conf.ConfigExpression((self.tree.lookup(varname),), varname) - op = self.next() - # Parse literal expression - if op == u'\'': - exl = [] - while not self.peeks(op): - exl.append(self.next()) - self.expect(op) - s = u''.join(exl) - return conf.ConfigExpression((s,), s) - # Parse expression with variables - exl = [op] - expr = [] - while not self.peeks(op): - exl.append(self.peek()) - if self.nexts(u'\\'): - # Escape sequence - c = self.next() - if c not in u'\\"n' + self.c_open + self.c_close: - self.syntax_error('Illeal escape sequence in expression') - if c == 'n': - expr.append(u'\n') - else: - expr.append(c) - exl.append(c) - elif self.nexts(self.c_open): - # Parse a variable name in '{}' - varname = self.p_VARNAME() - self.expect(self.c_close) - exl.append(varname) - expr.append(self.tree.lookup(varname)) - else: - # Regular character - expr.append(self.next()) - self.expect(op) - exs = ''.join(exl) - # Concatenate consecutive characters in expr - expr2 = [] - for i in expr: - if expr2 and isinstance(expr2[-1], unicode) and isinstance(i, unicode): - expr2[-1] = expr2[-1] + i - else: - expr2.append(i) - return conf.ConfigExpression(expr2, exs) - - def p_FORMULA(self): - self.dbg() # Debug - self.p_WS() - # Combined logical formula - if self.nexts(u'('): - f1 = self.p_FORMULA() - self.p_WS() - if self.nexts(self.c_and): - if self.peek(1).isalnum(): - self.syntax_error('trailing characters after %r', self.c_and) - f2 = self.p_FORMULA() - self.p_WS() - self.expect(u')') - return ('AND', f1, f2) - elif self.nexts(self.c_or): - if self.peek(1).isalnum(): - self.syntax_error('trailing characters after %r', self.c_or) - f2 = self.p_FORMULA() - self.p_WS() - self.expect(u')') - return ('OR', f1, f2) - elif self.nexts(u')'): - # Only extra parenthes - return f1 - else: - self.syntax_error("Logic operator or ')' expected") - elif self.nexts(self.c_not): - if self.peek().isalnum(): - self.syntax_error('trailing characters after %r', self.c_not) - # 'not' formula - f = self.p_FORMULA() - return ('NOT', f) - else: - # Should be (in)equality condition - e1 = self.p_EXPRESSION() - self.p_WS() - if self.nexts(self.c_eq): - self.p_WS() - e2 = self.p_EXPRESSION() - return ('==', e1, e2) - elif self.nexts(self.c_neq): - self.p_WS() - e2 = self.p_EXPRESSION() - return ('!=', e1, e2) - else: - self.syntax_error("Comparation operator expected") - diff --git a/t/moe/conftest.py b/t/moe/conftest.py deleted file mode 100644 index 341683c..0000000 --- a/t/moe/conftest.py +++ /dev/null @@ -1,226 +0,0 @@ -# -*- coding: utf-8 -*- - -import moe.conf as conf -from moe.confparser import * -import logging as log -import unittest -import tempfile - -class TestConfig(unittest.TestCase): - - def setUp(s): - s.t = conf.ConfigTree() - - def parse(s, string, level=0, fname='test'): - c=ConfigParser(string, s.t, fname, level) - ops = c.parse() - c.p_WS() - assert c.eof() - return ops - - def var(s, varname, create=True): - return s.t.lookup(varname, create=create) - - def val(s, varname): - return s.var(varname, create=False).value() - - def eqparse(s, string, *args, **kwargs): - "Parse expression `s`, return parts that should be equal to anoter run of `eqparse(s)`." - return [(i[0], i[1].operation) for i in s.parse(string, *args, **kwargs)] - - -class TestParser(TestConfig): - s1 = r"""a="1";z{b='2';w{S.Tr_an-g.e='"'}};c.d='\n';e="{a}{b}";e+='{c.d}';a+="\"\n\{\}";f+='Z{a.b}';e+=c.d;e+=c;e+=a""" - s2 = '\t\n \n ' + s1.replace('=', '= \n ').replace(';', '\t \n\t \n ').replace('+=',' \n += ') + '\n\n ' - - def test_noWS(s): - assert len(s.parse(s.s1)) == 11 - - def test_noWS_COMMENT(s): - assert s.eqparse(s.s1+'#COMMENT') == s.eqparse(s.s1+'#') == s.eqparse(s.s1+'#\n') == s.eqparse(s.s1+'\n#') - - def test_manyWS(s): - assert s.eqparse(s.s2) == s.eqparse(s.s1) - - def test_manyWS_COMMENT(s): - assert s.eqparse(s.s2.replace('\n',' #COMMENT \n')) == s.eqparse(s.s2.replace('\n','#\n')) == s.eqparse(s.s1) - - def test_empty(s): - assert s.eqparse('') == s.eqparse('\n') == s.eqparse('') == s.eqparse('a{}') == \ - s.eqparse('a.b.c{if ""==\'\' {d.e{\n\n#Nothing\n}} }') == [] - - def test_syntax_errors(s): - s.assertRaises(ConfigSyntaxError, s.parse, "a=#") - s.assertRaises(ConfigSyntaxError, s.parse, "a='\"") - s.assertRaises(ConfigSyntaxError, s.parse, 'a="{a@b}"') - s.assertRaises(ConfigSyntaxError, s.parse, 'a="A{A"') - s.assertRaises(ConfigSyntaxError, s.parse, 'a=b"42"') - s.assertRaises(ConfigSyntaxError, s.parse, 'a=b.c.d.') - - def test_error_location(s): - try: s.parse('\t \n \n { \n \n ') - except ConfigSyntaxError, e: - assert e.line == 3 and e.column in range(2,4) - - def test_quoting(s): - s.parse(' a="\\"\\{a$b\\}\'\n\n\'{z}" ') - assert s.var('z', create=False) - # No escaping in '-string - s.assertRaises(ConfigSyntaxError, s.parse, " a='\"\\'\n\n' ") - # Variable should not be created - s.parse(" a='{z2}' ") - s.assertRaises(conf.ConfigError, s.var, 'z2', create=False) - - def test_conditions(s): - s.assertRaises(ConfigSyntaxError, s.parse, "if '{a}'=='{b}' and ''!='' {}") - s.parse('if ((#C\n (\n (not not not""!="")\n#C\n)\t ) ) {}') - s.parse('if (""=="" and not (not not ""!="" or ""=="")){}') - s.parse('if ((a=="{b}" and a.b.c!=c.b.a)or x!=y){}') - s.parse('if(""==""){a{if(""==""){if(""==""){b{if(""==""){if(""==""){}}}}}}}') - s.assertRaises(ConfigSyntaxError, s.parse, "if notnot'{a}'=='{b}' {}") - s.assertRaises(ConfigSyntaxError, s.parse, "if ('{a}'=='{b}' not and ''!='') {}") - s.assertRaises(ConfigSyntaxError, s.parse, "if ('{a}'=='{b}' ornot ''!='') {}") - s.assertRaises(ConfigSyntaxError, s.parse, "if 'a'<>'b' {}") - - -class TestConfigEval(TestConfig): - - def test_ops(s): - s.parse('c+="-C_APP"', level=20) - s.parse('a="A"; b="{a}-B"; c="C1-{b}-C2"; a+="FOO"; a="AA"') - assert s.val('c') == 'C1-AA-B-C2-C_APP' - s.parse('b+="-A:\{{a}\}";a+="A"', level=10) - assert s.val('c') == 'C1-AAA-B-A:{AAA}-C2-C_APP' - - def test_nested(s): - s.parse('a="0"; b{a="1"; b{a="2"; b{a="3"; b{a="4"; b{a="5"}}}}}') - assert s.val('b.b.b.a') == '3' - s.parse('b.b{b.b{b.a="5MOD"}}') - assert s.val('b.b.b.b.b.a') == '5MOD' - - def test_escape_chars(s): - s.parse(r"""a='{a}\\\\#\n'; b="{a}'\"\{\}"; c='\'; c+="\{{b}\}";""") - assert s.val('c') == r"""\{{a}\\\\#\n'"{}}""" - - def test_expressions(s): - s.parse('A="4"; B.B=\'2\'; B.A=A; B.C="{B.A}{B.B}"; if B.C=="42" {D=C}; if C==D{E=D}; C="OK"') - assert s.val('E') == 'OK' - - ts = 'a="A:"; if "{c1}"=="1" {a+="C1"; b="B"; if ("{c2a}"=="1" or not "{c2b}"=="1") { a+="C2"; '\ - 'if ("{c3a}"=="1" and "{c3b}"=="1") { a+="C3" }}}' - - def test_cond_chain(s): - s.parse(s.ts) - s.parse('c1="1"; c2="0"') - # b should have determined value, a should not (since c3a is undefined) - s.assertRaises(conf.UndefinedError, s.val, 'a') - assert s.val('b') == 'B' - s.parse('c1="0"') - # now b should be undefined - s.assertRaises(conf.UndefinedError, s.val, 'b') - # Normal evaluation - s.parse('c1="1"; c2a="1"; c2b="0"; c3a="0"') - assert s.val('a') == 'A:C1C2' - s.parse('c3a="1"; c3b="1"; c2b="1"') - assert s.val('a') == 'A:C1C2C3' - # tests condition invalidating - s.parse('c2a+="0"') - assert s.val('a') == 'A:C1' - - def test_cond_eager(s): - s.parse(s.ts) - # undefined c2b and c3a should not be evaluated - s.parse('c1="1"; c2a="1"; c3a="0"') - assert s.val('a') == 'A:C1C2' - # but now c3b should be evaluated - s.parse('c1="1"; c2a="1"; c3a="1"') - s.assertRaises(conf.UndefinedError, s.val, 'a') - s.parse('c1="1"; c2a="1"; c3b="1"') - assert s.val('a') == 'A:C1C2C3' - - def test_undef(s): - s.assertRaises(conf.UndefinedError, s.val, 'a') - s.parse('a="{b}"') - s.assertRaises(conf.UndefinedError, s.val, 'a') - s.parse('b+="1"') - s.assertRaises(conf.UndefinedError, s.val, 'b') - - def test_loopy_def(s): - s.parse('a="A"; a+="{a}"') - s.assertRaises(conf.CyclicConfigError, s.val, 'a') - s.parse('b="{c}"; c="{b}"') - s.assertRaises(conf.CyclicConfigError, s.val, 'b') - - def test_varname(s): - s.assertRaises(conf.VariableNameError, s.val, 'b/c') - s.assertRaises(conf.VariableNameError, s.val, '.b.c') - s.assertRaises(conf.VariableNameError, s.val, 'b.c.') - s.assertRaises(conf.VariableNameError, s.val, 'b..c') - - def test_remove(s): - l = s.parse('a="A1"; b="B1"; if "{cond}"=="1" {a+="A2"; b+="B2"}; a+="A3"; b+="B3"; cond="1"') - assert s.val('a') == 'A1A2A3' - assert s.val('b') == 'B1B2B3' - # remove b+="B2" - s.var('b').remove_operation(l[3][1]) - assert s.val('a') == 'A1A2A3' - assert s.val('b') == 'B1B3' - # are the dependencies still handled properly? - s.parse('cond+="-invalidated"') - assert s.val('a') == 'A1A3' - s.parse('cond="1"') - assert s.val('a') == 'A1A2A3' - - def test_fix(s): - s.parse(""" A='4'; B="{A}"; C="2"; B+="{C}"; D="{B}"; E="{D}" """) - s.var('D').fix() - # Break by C - l = s.parse('C="3"') - s.assertRaises(conf.VariableFixedError, s.val, "E") - s.assertRaises(conf.VariableFixedError, s.val, "D") - s.var('C').remove_operation(l[0][1]) - # Break directly by D - l = s.parse('D="41"') - s.assertRaises(conf.VariableFixedError, s.val, "D") - s.assertRaises(conf.VariableFixedError, s.val, "E") - # Unfix - s.var('D').unfix() - assert s.val('E') == '41' - s.var('D').remove_operation(l[0][1]) - assert s.val('D') == '42' - - def test_unicode(s): - # Ascii (1b) and unicode (2b) - s.parse(u'A="Ú"; C="ě"') - # String - s.parse('B=\'chyln\'') - # By escapes - s.parse(u'D=\'\u0159e\u017eav\xe1\'') - s.parse(u'E="ŽluŤ"') - # Via utf8 file - f = tempfile.TemporaryFile(mode='w+t') - f.write(u'S1="\xdachyln\u011b \u0159e\u017eav\xe1 \u017dlu\u0164" ; S2="{A}{B}{C} {D} {E}"'.encode('utf8')) - f.seek(0) - s.parse(f) - # Test - s.parse(u'if "{S1}"=="{S2}" { ANS="jó!" } ') - assert s.val('ANS') == u"jó!" - assert s.val('S1') == s.val('S2') == u'\xdachyln\u011b \u0159e\u017eav\xe1 \u017dlu\u0164' - - def test_priority(s): - s.var('a').add_operation(conf.Operation('APPEND', None, conf.ConfigExpression(["4"]), level=4)) - s.var('a').add_operation(conf.Operation('APPEND', None, conf.ConfigExpression(["3a"]), level=3)) - s.var('a').add_operation(conf.Operation('APPEND', None, conf.ConfigExpression(["1"]), level=1)) - s.var('a').add_operation(conf.Operation('SET', None, conf.ConfigExpression(["2"]), level=2)) - s.var('a').add_operation(conf.Operation('APPEND', None, conf.ConfigExpression(["3b"]), level=3)) - s.var('a').add_operation(conf.Operation('SET', None, conf.ConfigExpression(["0"]), level=0)) - s.var('a').add_operation(conf.Operation('APPEND', None, conf.ConfigExpression(["5"]), level=5)) - assert s.val('a')=='23a3b45' - - - -# TODO: Fail on 1st April -# TODO (OPT): Somehow add log.debug('Maximum encountered depth: %d', conf.debug_maxdepth) - -# Coverage via command "nosetests conftest --with-coverage --cover-html-dir=cover --cover-html" -