]> mj.ucw.cz Git - moe.git/blob - t/moe/testcase.py
e129a5305d88ac87d40ac9b3638302a561d8d307
[moe.git] / t / moe / testcase.py
1 #!/usr/bin/env python
2
3 import os.path
4 import moe
5 import moe.config
6 import moe.eval
7 import moe.log
8 import shutil
9
10 def configure_test(e, test):
11     e.cfgs = moe.config.MoeConfigStack(e.cfgs)
12     e.test_builtins = moe.config.MoeConfig(type="test-builtins")
13     e.test_builtins.set("TEST", test)
14     e.cfgs.push(e.test_builtins)
15
16     test_cf = os.path.join(e["PDIR"], test + ".config")
17     if os.path.exists(test_cf):
18         cfg = moe.config.MoeConfig(name=test_cf, type="test")
19         e.cfgs.push(cfg)
20
21     e.cfgs.apply_overrides("TEST_" + test + "_")
22     ext = e["EXT"]
23     if ext != "":
24         e.cfgs.apply_overrides("EXT_" + ext + "_")
25
26     log = moe.log.MoeLog()
27     log.verbosity = e.log.verbosity
28     log.open(os.path.join(e["TDIR"], test + ".log"))
29     log.say("Test case %s\n\n" % test)
30     e.log = log
31     moe.log.default = log
32
33     e.log_config(2, "for the test")
34
35     e.test_stat = moe.status.MoeStatus()
36     e.test_stat["id"] = test
37     e.stat.get_list("tests").append(e.test_stat)
38
39 def collect_status(e):
40     sf = os.path.join(e["TDIR"], e["TESTCASE_STATUS"])
41     if os.path.exists(sf):
42         e.log.verbose("Reading status from %s\n" % sf)
43         stat = moe.status.MoeStatus()
44         stat.read(name=sf)
45         if e.log.verbosity > 1:
46             stat.write_nested(e.log.log_file, 1)
47         for k in stat.keys():
48             e.test_stat[k] = stat[k]
49         os.unlink(sf)
50     else:
51         e.log.verbose("No status file present\n")
52
53 def tmpname(e):
54     return os.path.join(e["TDIR"], e["TEST"] + ".tmp")
55
56 def collect_tmp_line(tmp_file):
57     if os.path.exists(tmp_file):
58         f = open(tmp_file, "r")
59         v = f.readline().rstrip("\n")
60         f.close()
61         os.unlink(tmp_file)
62     else:
63         v = ""
64     return v
65
66 def collect_verdict(e, verdict_file):
67     v = collect_tmp_line(verdict_file)
68     if len(v) >= 4 and v[0].isalnum and v[1].isalnum and v[2] == ":" and v[3] == " ":
69         e.test_stat["status"] = v[0:2]
70         e.log.verbose("Judge's status: %s\n" % v[0:2])
71         v = v[4:]
72     v.strip()
73     if v != "":
74         e.test_stat["message"] = v
75         e.log.verbose("Judge's verdict: %s\n" % v)
76
77 def collect_points(e):
78     p = collect_tmp_line(os.path.join(e["TDIR"], e["TESTCASE_PTS"]))
79     if p != "":
80         e.log.verbose("Judge has supplied points: %s\n" % p)
81         e.test_stat["points"] = p
82
83 def setup(e):
84     pdir = e["PDIR"]
85     tdir = e["TDIR"]
86     inn = e["TESTCASE_IN"]
87     out = e["TESTCASE_OUT"]
88     ok = e["TESTCASE_OK"]
89
90     if os.path.exists(os.path.join(pdir, inn)):
91         moe.util.link_or_copy(os.path.join(pdir, inn), os.path.join(tdir, inn))
92     if os.path.exists(os.path.join(pdir, out)):
93         moe.util.link_or_copy(os.path.join(pdir, out), os.path.join(tdir, ok))
94
95 def judge(e):
96     cmd = e["OUTPUT_CHECK"]
97     if cmd == "":
98         return
99     verdict_file = tmpname(e)
100     cmd = "exec 2>%s ; %s" % (verdict_file,cmd)
101
102     e.log.progress("<check> ")
103     e.log.verbose("Checking output: %s\n" % cmd)
104     e.log.flush()
105     rc = os.system(cmd)
106     collect_verdict(e, verdict_file)
107     collect_points(e)
108     collect_status(e)
109     if os.WIFEXITED(rc):
110         if os.WEXITSTATUS(rc) == 0:
111             return
112         elif os.WEXITSTATUS(rc) == 1:
113             raise moe.TestError("Wrong answer", "WA")
114     raise moe.MoeError("Judge failure")
115
116 def points(e):
117     if e.test_stat["points"] is None:
118         e.test_stat["points"] = e["POINTS_PER_TEST"]
119
120 def filter(e):
121     cmd = e["OUTPUT_FILTER"]
122     if cmd == "":
123         return
124
125     os.rename(os.path.join(e["TDIR"], e["TESTCASE_OUT"]), os.path.join(e["TDIR"], e["TESTCASE_RAW"]))
126     e.log.progress("<filter> ")
127     e.log.verbose("Filtering output: %s\n" % cmd)
128     e.log.flush()
129     rc = os.system(cmd)
130     if os.WIFEXITED(rc) and os.WEXITSTATUS(rc) == 0:
131         if not os.path.exists(os.path.join(e["TDIR"], e["TESTCASE_OUT"])):
132             raise moe.MoeError("Filter has generated no output")
133     else:
134         raise moe.MoeError("Filter failure")
135
136 def syntax(e):
137     cmd = e["SYNTAX_CHECK"]
138     if cmd == "":
139         return
140     verdict_file = tmpname(e)
141     cmd = "exec 2>%s ; %s" % (verdict_file,cmd)
142
143     e.log.progress("<syntax> ")
144     e.log.verbose("Checking syntax: %s\n" % cmd)
145     e.log.flush()
146     rc = os.system(cmd)
147     collect_verdict(e, verdict_file)
148     collect_status(e)
149     if os.WIFEXITED(rc):
150         if os.WEXITSTATUS(rc) == 0:
151             return
152         elif os.WEXITSTATUS(rc) == 1:
153             raise moe.TestError("Wrong syntax", "SY")
154     raise moe.MoeError("Syntax checker failure")
155
156 def run_test(e, test):
157     configure_test(e, test)
158
159     ## FIXME: interactive tasks
160     e.test_pipe.configure(e["TESTCASE_HOOKS"])
161     if e.log.verbosity >= 2:
162         e.test_pipe.dump(e.log.log_file, prefix="\t")
163     e.test_pipe.run(e)
164
165 def wrap_run_test(e, test):
166     try:
167         run_test(e, test)
168     except moe.MoeError, err:
169         raise moe.TestError(err, "XX")
170
171 def conclude_test(e):
172     stat = e.test_stat
173     if not stat["points"]:
174         stat["points"] = 0
175
176     if e.log.verbosity > 1:
177         e.log.verbose("Final status:\n")
178         stat.write_nested(e.log.log_file, 1)
179
180     if stat["status"]:
181         msg = "%s: %s" % (stat["status"], e.test_stat["message"])
182     else:
183         msg = "OK"
184     msg += " (%s points" % stat["points"]
185     if stat["time"]:
186         msg += ", %s s" % stat["time"]
187     if stat["mem"]:
188         msg += ", %d MB" % ((int(stat["mem"]) + 524288) // 1048576)
189     msg += ")\n"
190     e.log.progress(msg)
191     e.log.say(msg)
192
193 def run_tests(e):
194     e.test_pipe.insert(0, "setup", setup)
195     e.test_pipe.insert(400, "filter", filter)
196     e.test_pipe.insert(500, "syntax", syntax)
197     e.test_pipe.insert(600, "judge", judge)
198     e.test_pipe.insert(700, "points", points)
199
200     for test in e["TESTS"].split():
201         e.log.progress("Test %s: " % test)
202         old_cfgs = e.cfgs
203         old_log = e.log
204
205         try:
206             wrap_run_test(e, test)
207         except moe.TestError, err:
208             if not e.test_stat["status"]:
209                 e.test_stat["status"] = err.stat_code
210             if not e.test_stat["message"]:
211                 e.test_stat["message"] = err.message
212
213         conclude_test(e)
214         
215         e.cfgs = old_cfgs
216         e.log = old_log
217         moe.log.default = old_log