test:
- nosetests moe.config_test
+ nosetests moe.status_test
+ nosetests moe.config_test
key_pattern = re.compile("\A[A-Za-z0-9_-]+\Z")
-class MoeInvalidStatusFile(Exception):
+class InvalidStatusFile(Exception):
pass
class Status:
def __setitem__(self, k, v):
self.d[k] = v
+ def __eq__(self, s):
+ return self.d == s.d
+
def keys(self):
return self.d.keys()
for k,v in self.d.items():
if isinstance(v, Status):
l.append(prefix + k + " (")
- l.extend(self.dump(prefix+" "))
+ l.extend(v.dump(prefix+" "))
l.append(prefix + ")")
else:
d = str(v).split('\n')
for l in self.dump():
f.write(l+"\n")
- def read(self, f=None, name=None):
+ def read(self, f=None, name=None, lines=None):
"""
- Parse Status from File ``f`` or from file ``name`` or from ``stdin`` (otherwise)
- Deletes all contents of the Status.
+ Parse Status file
+ * from File ``f``
+ * or from file ``name`` opened for reading 8-bit ASCII
+ * or from ``lines`` (a list/iterator of lines)
+
+ Deletes all previous contents of the Status.
"""
self.d = {}
- if not f and name is not None:
+ if f is not None:
+ return self.do_read(f.readlines())
+ if name is not None:
with open(name, 'r') as f:
- self.do_read(f)
- else:
- if not f:
- f = sys.stdin
- self.do_read(f)
+ return self.do_read(f.readlines())
+ if lines is not None:
+ return self.do_read(lines)
+ raise ValueError('Provide at least one parameter to Status.read()')
def read_val(self, k, v):
"""
- Internal: Safely add a new value
+ Internal: Safely add a new value to Status
"""
if not key_pattern.match(k):
- raise MoeInvalidStatusFile("Parse error: invalid key %r"%k)
+ raise InvalidStatusFile("Parse error: invalid key %r"%k)
if k in self.d:
- raise MoeInvalidStatusFile("Multiple occurences of key %r"%k)
+ raise InvalidStatusFile("Multiple occurences of key %r"%k)
self.d[k]=v
- def do_read(self, f):
+ def do_read(self, lines):
"""
- Internal: Parse an open file
+ Internal: Parse a status file given as list/iterator of lines
"""
- stk = []
- this = self
- for x in f.readlines():
+ stk = [] # stack
+ this = self # currently read nested Status
+ lastk = None # for multiline appending
+ for x in lines:
x = x.rstrip("\n").lstrip(" \t")
if x=="" or x.startswith("#"):
- pass
+ lastk = None
else:
sep = x.find(":")
- if sep >= 0:
+ if sep > 0: # key:value
k = x[:sep].rstrip(" \t")
v = x[sep+1:]
this.read_val(k, v)
- elif x.endswith("("):
+ lastk = k
+ elif sep == 0: # continuation of multiline :value
+ if not lastk:
+ raise InvalidStatusFile("Parse error: key expected before ':'")
+ v = x[sep+1:]
+ this[lastk] += '\n' + v
+ elif x.endswith("("): # close subtree
k = x[:-1].rstrip(" \t")
new = Status()
this.read_val(k, new)
stk.append(this)
this = new
+ lastk = None
elif x == ")":
+ lastk = None
if len(stk) == 0:
- raise MoeInvalidStatusFile("Parse error: incorrect nesting")
+ raise InvalidStatusFile("Parse error: incorrect nesting")
else:
this = stk.pop()
else:
- raise MoeInvalidStatusFile("Parse error: malformed line")
+ raise InvalidStatusFile("Parse error: malformed line")
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from moe.status import Status, InvalidStatusFile
+import unittest
+import tempfile
+
+class TestStatus(unittest.TestCase):
+
+# def setUp(s):
+# s.st = st.Status()
+
+ def parse(s, string):
+ st = Status()
+ st.read(lines=string.split('\n'))
+ return st
+
+ def test_basic_read(s):
+ st = s.parse(""); assert(not st.d)
+ st = s.parse("\t\t \n \t a\t \t :\t b\t"); assert(st['a'] == '\t b\t')
+ st = s.parse("a:\t\n"); assert(st['a'] == '\t')
+ st = s.parse("\t\t \n \t a\t \t (\nb:\n)"); assert(st['a']['b'] == '')
+ st = s.parse("a(\nz:\n:\na(\na(\na(\na(\n)\n)\nx:y\n)\n)\n)")
+ assert(st['a']['a']['a']['x'] == 'y')
+ assert(st['a']['z'] == '\n')
+ s.assertRaises(InvalidStatusFile, s.parse, "a")
+ s.assertRaises(InvalidStatusFile, s.parse, "a\n(\n)")
+
+ def test_unique(s):
+ s.assertRaises(InvalidStatusFile, s.parse, "a:1\na:1")
+ s.assertRaises(InvalidStatusFile, s.parse, "a(\n)\na(\n)")
+ s.assertRaises(InvalidStatusFile, s.parse, "a(\n)\na:0")
+
+ def test_multiline(s):
+ s.assertRaises(InvalidStatusFile, s.parse, ":\na:b\n")
+ s.assertRaises(InvalidStatusFile, s.parse, "x:1\na(\n:\n)\n")
+ s.assertRaises(InvalidStatusFile, s.parse, "a(\na:b\n)\n:c")
+ # the continuation of entry must be at consecutive lines
+ s.assertRaises(InvalidStatusFile, s.parse, "a:\n:\n\n:z")
+ s.assertRaises(InvalidStatusFile, s.parse, "a:\n#\n:\n")
+ # Valid entries
+ st = s.parse("a:\t\n\t \t : \t\n:"); assert(st['a'] == '\t\n \t\n')
+ st = s.parse(" a : \n :x"); assert(st['a'] == ' \nx')
+ st = s.parse("a :\n :\n \t:"); assert(st['a'] == '\n\n')
+
+ def test_comments(s):
+ st = s.parse("#a:b"); assert(not st.d)
+ st = s.parse("a:b#c:d"); assert(st['a'] == 'b#c:d')
+ s.assertRaises(InvalidStatusFile, s.parse, "a( #comm\n)")
+ s.assertRaises(InvalidStatusFile, s.parse, "a(\n)#comm")
+
+ def test_file_utf8(s):
+ st = Status()
+ f = tempfile.TemporaryFile(mode='w+t')
+ f.write(u'a:ášďëå'.encode('utf8'))
+ f.seek(0)
+ st.read(f=f)
+ assert(st['a'] == '\xc3\xa1\xc5\xa1\xc4\x8f\xc3\xab\xc3\xa5')
+
+ def test_file_wr_eq_l2(s):
+ st = s.parse('a:' + u'ášďě'.encode('l2') + """
+ b(
+ c(
+ d:esdf
+ :frty
+ )
+ d:
+ x:
+ :
+ :
+ )
+ """)
+ f = tempfile.NamedTemporaryFile()
+ st.write(name = f.name)
+ st2 = Status()
+ st2.read(name = f.name)
+ assert(st['a'] == '\xe1\xb9\xef\xec')
+ print('\n'.join(st.dump()), '\n'.join(st2.dump()))
+ assert(st == st2)
+
+