The variables in `self.variables` are referenced directly by the full name.
"""
+
def __init__(self):
self.variables = {}
+
def lookup(self, key, create = True):
"""
Lookup and return a variable.
raise UndefinedError('Config variable %r undefined.', key)
self.variables[key] = ConfigVar(key)
return self.variables[key]
+
def dump(self, prefix=''):
"""
Pretty printing of the tree.
self.variables[k].dump(prefix) for k in sorted(self.variables.keys())
])
+
class ConfigElem(object):
"""
Base class for cahed config elements - variables and conditions
"""
+
def __init__(self, name):
# Full name with separators, definition for conditions
self.name = name
# Cached value (may be None in case of evaluation error)
self.cached = False
self.cached_val = None
+
def invalidate(self, depth=0):
"""
Invalidate cached data and invalidate all dependants.
self.cached = False
for d in self.dependants:
d.invalidate(depth + 1)
+
def value(self, depth=0):
"Caching helper calling self.evaluate(), returns a value or throws an exception."
check_depth(depth)
if self.cached_val == None:
raise UndefinedError("Unable to evaluate %r."%(self.name,))
return self.cached_val
+
def __str__(self):
return self.name
+
class ConfigCondition(ConfigElem):
"""
Condition using equality and logic operators.
('AND', c1, c1), ('OR', c1, c2), ('NOT', c1),
('==', e1, e2), ('!=', e1, e2) where e1, e2 are `ConfigExpression`s.
"""
+
def __init__(self, formula, text=None, parent=None):
"""
Condition defined by `text` (informative), `formula` as in class definition,
v.dependants.add(self)
if self.parent:
self.parent.dependants.add(self)
+
def variables(self, cl=None):
"Return an iterator of variables used in formula `cl`"
if not cl:
if cl[0] in ['AND','OR']:
return itertools.chain(self.variables(cl[1]), self.variables(cl[2]))
return self.variables(cl[1]) # only 'NOT' left
+
def remove_dependencies(self):
"Remove self as a dependant from all used variables"
for v in self.variables():
v.dependants.discard(self)
if self.parent:
self.parent.dependants.discard(self)
+
def evaluate(self, cl=None, depth=0):
"""Evaluate formula `cl` (or the entire condition).
Partial evaluation for AND and OR. Tests the parent condition first."""
if cl[0] == 'OR' and v1: return True
if cl[0] == 'AND' and not v1: return False
return self.evaluate(cl=cl[2], depth=depth+1)
+
def formula_string(self, formula):
"Create a string representation of a formula."
if formula[0] == 'AND':
elif formula[0] in ['==', '!=']:
return itertools.chain(formula[1], formula[0], formula[2])
return iter(['<invalid formula>'])
+
def str(self, parents=False):
"Retur the defining expression, if `parents` set, then prefixed with parent conditions."
if parents and self.parent:
return self.parent.str(parents=True) + u' && ' + self.name
return self.name
+
def __str__(self):
return self.str(parents=False)
+
class Operation(object):
"Helper class for operation data. Must not be present in more variables or present multiple times."
+
def __init__(self, operation, condition, expression, level=0, source='?'):
# operation is currently 'SET' and 'APPEND'
self.operation = operation
self.expression = expression
self.level = level
self.source = source
+
def __str__(self):
return "%s <%d, %s> [%s] %r" % ( {'SET':'=', 'APPEND':'+'}[self.operation], self.level, self.source,
(self.condition and self.condition.str(parents=True)) or '', unicode(self.expression))
+
class ConfigVar(ConfigElem):
+
def __init__(self, name):
super(ConfigVar, self).__init__(name)
# Ordered list of `Operations` (ascending by `level`)
# Fixed to value (may be None)
self.fixed = False
self.fixed_val = None
+
def variables(self):
"Return a set of variables used in the expressions"
return set(sum([ list(op.expression.variables()) for op in self.operations ], []))
+
def fix(self):
"""
Fixes the value of the variable. Exception is raised should the variable
return
self.fixed = True
self.fixed_val = self.value()
+
def unfix(self):
"Set the variable to be modifiable again."
self.fixed = False
+
def value(self, depth=0):
"Handle the case when fixed, raise exc. on different evaluation"
val = super(ConfigVar,self).value(depth)
if self.fixed and self.fixed_val != val:
raise VariableFixedError("value of var %s was fixed to %r but evaluated to %r", self.name, self.fixed_val, val)
return val
+
def add_operation(self, operation):
"""
Inserts an operation. The operations are sorted by `level` (ascending), new operation goes last among
v.dependants.add(self)
if operation.condition:
operation.condition.dependants.add(self)
+
def remove_operation(self, operation):
"""
Remove the Operation.
# Remove the dependency on the conditions (if not used in another operation)
if operation.condition and operation.condition not in [op.condition for op in self.operations]:
operation.condition.dependants.remove(self)
+
def evaluate(self, depth=0):
"""
Find the last 'SET' operation that applies and return the result of concatenating with all
if op.operation == 'SET':
return u''.join(val)
return None
+
def dump(self, prefix=''):
"""
Pretty printing of the variable. Includes all operations.
#yield prefix+u' %s [%s] %s' % (op.operation, op.condition and op.condition.str(parents=True), op.expression)
yield prefix + u' ' + unicode(op)
+
class ConfigExpression(object):
"""
String expression with some unexpanded config variables. Used in variable operations and conditions.
Expression is given as a list of unicode strings and ConfigVar variables to be expanded.
"""
+
def __init__(self, exprlist, original = u'<unknown>'):
self.exprlist = exprlist
# Original defining string
if isinstance(e, types.StringTypes):
if not isinstance(e, unicode):
self.exprlist[i] = unicode(e, 'ascii')
+
def variables(self):
"Return an iterator of variables user in the expression"
return itertools.ifilter(lambda e: isinstance(e, ConfigVar), self.exprlist)
+
def __str__(self):
return self.original
+
def evaluate(self, depth):
check_depth(depth)
"Return unicode result of expansion of the variables."
import traceback
import moe.conf as conf
+
class ConfigSyntaxError(conf.ConfigError):
+
def __init__(self, msg, fname='<unknown>', line=None, column=None):
self.msg = msg
self.fname = fname
self.line = line
self.column = column
+
def __str__(self):
return('ConfigSyntaxError %s:%d:%d: %s'%(self.fname, self.line, self.column, self.msg))
+
class ConfigParser(object):
c_varname_sep = u'.'
c_comment = u'#'
c_neq = u'!='
c_set = u'='
c_append = u'+='
+
def __init__(self, s, tree, fname='<unknown>', level=0):
"""Create a config file parser.
`s` is either a string, unicode or an open file. File is assumed to be utf-8, string is converted to unicode.
self.prefix = '' # Prefix of variable name, may begin with '.'
self.conditions = [] # Stack of nested conditions, these are chained, so only the last is necessary
self.read_ops = [] # List of parsed operations (varname, `Operation`), returned by `self.parse()`
+
def preread(self, l):
"Make sure buf contains at least `l` next characters, return True on succes and False on hitting EOF."
if isinstance(self.s, file):
self.buf = self.buf[self.bufpos:] + self.s.read(max(l, 1024)).decode('utf8')
self.bufpos = 0
return len(self.buf) >= self.bufpos + l
+
def peek(self, l = 1):
"Peek and return next `l` unicode characters or everything until EOF."
self.preread(l)
return self.buf[self.bufpos:self.bufpos+l]
+
def peeks(self, s):
"Peek and compare next `len(s)` characters to `s`. Converts `s` to unicode. False on hitting EOF."
s = unicode(s)
return self.peek(len(s)) == s
+
def next(self, l = 1):
"Eat and return next `l` unicode characters. Raise exception on EOF."
if not self.preread(l):
self.line += s.count('\n')
self.column = l - rnl - 1
return s
+
def nexts(self, s):
"""Compare next `len(s)` characters to `s`. On match, eat them and return True. Otherwise just return False.
Converts `s` to unicode. False on hitting EOF."""
self.next(len(s))
return True
return False
+
def eof(self):
"Check for end-of-stream."
return not self.preread(1)
+
def expect(self, s, msg=None):
"Eat and compare next `len(s)` characters to `s`. If not equal, raise an error with `msg`. Unicode."
s = unicode(s)
if not self.nexts(s):
self.syntax_error(msg or u"%r expected."%(s,))
+
def syntax_error(self, msg, *args):
"Raise a syntax error with file/line/column info"
raise ConfigSyntaxError(fname=self.fname, line=self.line, column=self.column, msg=(msg%args))
+
def dbg(self):
n = None; s = ''
for i in traceback.extract_stack():
s += ' '
n = i[2]
if n: log.debug(s + n + ' ' + repr(self.peek(15)) + '...')
+
def parse(self):
self.read_ops = []
self.p_BLOCK()
return self.read_ops
+
def p_BLOCK(self):
self.dbg() # Debug
self.p_WS()
else:
self.nexts(';') # NOTE: this is weird - can ';' occur anywhere? Or at most once, but only after any p_WS debris?
self.p_WS()
+
def p_WS(self):
self.dbg() # Debug
while not self.eof():
self.p_COMMENT()
else:
break
+
def p_COMMENT(self):
self.dbg() # Debug
self.expect(self.c_comment, "'#' expected at the beginning of a comment.")
while (not self.eof()) and (not self.nexts(self.c_nl)):
self.next(1)
+
def p_STATEMENT(self):
self.dbg() # Debug
self.p_WS()
self.p_SUBTREE(varname)
else:
self.p_OPERATION(varname)
+
def p_SUBTREE(self, varname=None):
self.dbg() # Debug
if not varname:
# close block and
self.p_WS()
self.expect(self.c_close)
+
def p_OPERATION(self, varname=None):
self.dbg() # Debug
if not varname:
# NOTE/WARNING: The last character of operation will be reported in case of error.
v.add_operation(op)
self.read_ops.append( (vname, op) )
+
def p_CONDITION(self):
self.dbg() # Debug
self.p_WS()
self.expect(self.c_close)
# Cleanup
self.conditions.pop()
+
def p_VARNAME(self):
self.dbg() # Debug
vnl = []
if not conf.re_VARNAME.match(vn):
self.syntax_error('Invalid variable name %r', vn)
return vn
+
def p_EXPRESSION(self):
self.dbg() # Debug
op = self.next()
else:
expr2.append(i)
return conf.ConfigExpression(tuple(expr2), exs)
+
def p_FORMULA(self):
self.dbg() # Debug
self.p_WS()
import unittest
class TestConfig(unittest.TestCase):
+
def setUp(s):
s.t = conf.ConfigTree()
+
def parse(s, string, level=0, fname='test'):
c=ConfigParser(string, s.t, fname, level)
ops = c.parse()
c.p_WS()
assert c.eof()
return ops
+
def val(s, varname):
return s.t.lookup(varname, create=False).value()
+
def eqparse(s, string, *args, **kwargs):
return [(i[0], i[1].operation) for i in s.parse(string, *args, **kwargs)]
+
class TestParser(TestConfig):
s1 = r"""a="1";z{b='2';w{S.Tr_an-g.e='""'}};c.d='\n';e="{a}{b}";e+='{c.d}';a+="\"\n\{\}";f+='Z{a.b}'"""
s2 = '\t\n \n ' + s1.replace('=', '= \n ').replace(';', '\t \n\t \n ').replace('+=',' \n += ') + '\n\n '
+
def test_noWS(s):
assert len(s.parse(s.s1)) == 8
+
def test_noWS_COMMENT(s):
assert s.eqparse(s.s1+'#COMMENT') == s.eqparse(s.s1+'#') == s.eqparse(s.s1+'#\n') == s.eqparse(s.s1+'\n#')
+
def test_manyWS(s):
assert s.eqparse(s.s2) == s.eqparse(s.s1)
+
def test_manyWS_COMMENT(s):
assert s.eqparse(s.s2.replace('\n',' #COMMENT \n')) == s.eqparse(s.s2.replace('\n','#\n')) == s.eqparse(s.s1)
+
def test_empty(s):
assert s.eqparse('') == s.eqparse('\n') == s.eqparse('') == s.eqparse('a{}') == \
s.eqparse('a.b.c{if ""==\'\' {d.e{\n\n#Nothing\n}} }') == []
+
def test_syntax_errors(s):
s.assertRaises(ConfigSyntaxError, s.parse, "a=#")
s.assertRaises(ConfigSyntaxError, s.parse, "a='\"")
s.assertRaises(ConfigSyntaxError, s.parse, 'a="{a@b}"')
s.assertRaises(ConfigSyntaxError, s.parse, 'a="A{A"')
+
def test_error_location(s):
try: s.parse('\t \n \n { \n \n ')
except ConfigSyntaxError, e:
assert e.line == 3 and e.column in range(2,4)
+
def test_quoting(s):
s.parse(' a="\\"\\{a$b\\}\'\n\n\'{z}" ')
assert s.t.lookup('z', create=False)
# Variable should not be created
s.parse(" a='{z2}' ")
s.assertRaises(conf.ConfigError, s.t.lookup, 'z2', create=False)
+
def test_conditions(s):
s.assertRaises(ConfigSyntaxError, s.parse, "if '{a}'=='{b}' and ''!='' {}")
s.parse('if ((#C\n (\n (not not not""!="")\n#C\n)\t ) ) {}')
s.assertRaises(ConfigSyntaxError, s.parse, "if ('{a}'=='{b}' not and ''!='') {}")
s.assertRaises(ConfigSyntaxError, s.parse, "if ('{a}'=='{b}' ornot ''!='') {}")
+
class TestConfigEval(TestConfig):
+
def test_ops(s):
s.parse('c+="-C_APP"', level=20)
s.parse('a="A"; b="{a}-B"; c="C1-{b}-C2"; a+="FOO"; a="AA"')
assert s.val('c') == 'C1-AA-B-C2-C_APP'
s.parse('b+="-A:\{{a}\}";a+="A"', level=10)
assert s.val('c') == 'C1-AAA-B-A:{AAA}-C2-C_APP'
+
def test_nested(s):
s.parse('a="0"; b{a="1"; b{a="2"; b{a="3"; b{a="4"; b{a="5"}}}}}')
assert s.val('b.b.b.a') == '3'
s.parse('b.b{b.b{b.a="5MOD"}}')
assert s.val('b.b.b.b.b.a') == '5MOD'
+
def test_escape_chars(s):
s.parse(r"""a='{a}\\\\#\n'; b="{a}'\"\{\}"; c='\'; c+="\{{b}\}";""")
assert s.val('c') == r"""\{{a}\\\\#\n'"{}}"""
ts = 'a="A:"; if "{c1}"=="1" {a+="C1"; b="B"; if ("{c2a}"=="1" or not "{c2b}"=="1") { a+="C2"; '\
'if ("{c3a}"=="1" and "{c3b}"=="1") { a+="C3" }}}'
+
def test_cond_chain(s):
s.parse(s.ts)
s.parse('c1="1"; c2="0"')
# tests condition invalidating
s.parse('c2a+="0"')
assert s.val('a') == 'A:C1'
+
def test_cond_eager(s):
s.parse(s.ts)
# undefined c2b and c3a should not be evaluated
s.assertRaises(conf.UndefinedError, s.val, 'a')
s.parse('c1="1"; c2a="1"; c3b="1"')
assert s.val('a') == 'A:C1C2C3'
+
def test_undef(s):
s.assertRaises(conf.UndefinedError, s.val, 'a')
s.parse('a="{b}"')
s.assertRaises(conf.UndefinedError, s.val, 'a')
s.parse('b+="1"')
s.assertRaises(conf.UndefinedError, s.val, 'b')
+
def test_loopy_def(s):
s.parse('a="A"; a+="{a}"')
s.assertRaises(conf.CyclicConfigError, s.val, 'a')
s.parse('b="{c}"; c="{b}"')
s.assertRaises(conf.CyclicConfigError, s.val, 'b')
+
def test_varname(s):
s.assertRaises(conf.VariableNameError, s.val, 'b/c')
s.assertRaises(conf.VariableNameError, s.val, '.b.c')
s.assertRaises(conf.VariableNameError, s.val, 'b.c.')
s.assertRaises(conf.VariableNameError, s.val, 'b..c')
+
def test_remove(s):
l = s.parse('a="A1"; b="B1"; if "{cond}"=="1" {a+="A2"; b+="B2"}; a+="A3"; b+="B3"; cond="1"')
assert s.val('a') == 'A1A2A3'
# TODO: fixing, fail on 1st April
# TODO: coverage
+
if __name__ == '__main__':
log.getLogger().setLevel(log.WARN)
#log.getLogger().setLevel(log.DEBUG)