]> mj.ucw.cz Git - home-hw.git/blob - analyse.py
86a52c1cbceb4a1af9a2fa63a4229291c01f4b6b
[home-hw.git] / analyse.py
1 #!/usr/bin/python3
2
3 import os
4 import sys
5 import datetime
6 import collections
7 from typing import List
8
9 class ParseError(RuntimeError):
10     pass
11
12
13 class Frame:
14
15     def __init__(self):
16         self.reg = None
17
18     def next_u8(self):
19         if len(self.x):
20             return self.x.popleft()
21         else:
22             raise ValueError
23
24     def next_u16(self):
25         hi = self.next_u8()
26         lo = self.next_u8()
27         return (hi << 8) + lo
28
29     def next_u16le(self):
30         lo = self.next_u8()
31         hi = self.next_u8()
32         return (hi << 8) + lo
33
34     def next_u32(self):
35         hi = self.next_u16()
36         lo = self.next_u16()
37         return (hi << 16) + lo
38
39     def next_u32_swapped(self):
40         hi = self.next_u16le()
41         lo = self.next_u16()
42         return (hi << 16) + lo
43
44     def dump_rest(self):
45         return " ".join(["{:02x}".format(b) for b in self.x])
46
47     def _parse_reg(self):
48         self.reg = self.next_u32()
49         self.text = "{:08x}".format(self.reg)
50
51     def _parse_reg_swapped(self):
52         self.reg = self.next_u32_swapped()
53         self.text = "{:08x}".format(self.reg)
54
55     def _parse_null(self):
56         self.text = ""
57
58     op_table = {
59         1: {
60             "name": "REQUEST_INFO",
61             "parser": _parse_reg,
62         },
63         2: {
64             "name": "INFO",
65             "parser": _parse_reg,
66         },
67         3: {
68             "name": "SET",
69             "parser": _parse_reg_swapped,
70         },
71         4: {
72             "name": "ACK",
73             "parser": _parse_null,
74         },
75         5: {
76             "name": "NACK",
77             "parser": _parse_null,
78         },
79         6: {
80             "name": "QUERY",
81             "parser": _parse_reg_swapped,
82         },
83         7: {
84             "name": "ANSWER",
85             "parser": _parse_reg,
86         },
87         8: {
88             "name": "ERROR",
89             "parser": _parse_null,
90         },
91         0x0f: {
92             "name": "QUERY_DEF",
93             "parser": _parse_reg_swapped,
94         },
95         0x10: {
96             "name": "ANSWER_DEF",
97             "parser": _parse_reg_swapped,
98         },
99     }
100
101     def _parse(self):
102         # Parse frame header
103         if self.next_u8() != 0xdc:
104             raise ParseError("Missing SOF")
105         self.src = self.next_u8() ^ 0x80
106         self.dst = self.next_u8()
107         if self.next_u8() != len(self.orig):
108             raise ParseError("Invalid length byte")
109         self.op = self.next_u8()
110
111         # Remove CRC from the end
112         if len(self.x) < 2:
113             raise ParseError("Missing CRC")
114         else:
115             self.x.pop()
116             self.x.pop()
117
118         # Which operation it is?
119         if self.op in Frame.op_table:
120             op_def = Frame.op_table[self.op]
121             self.op_name = op_def["name"]
122             op_def["parser"](self)
123         else:
124             raise ParseError("Unknown OP")
125
126     def __str__(self):
127         rest = self.dump_rest()
128         return ("{:02x} -> {:02x} {}".format(self.src, self.dst, self.op_name) +
129                 (" " if len(self.text) else "") +
130                 self.text +
131                 (": " if len(rest) else "") +
132                 rest)
133
134     def parse(self, values: List[int]):
135         self.orig = values
136         self.x = collections.deque(values)
137         self._parse()
138
139
140 file_output = True
141 output_files = {}
142
143 def get_file(frame):
144     if file_output:
145         if len(output_files) == 0:
146             try:
147                 os.mkdir("out")
148             except FileExistsError:
149                 pass
150         if frame.reg is not None:
151             key = "{:08x}".format(frame.reg)
152         else:
153             key = "OTHER"
154         if key not in output_files:
155             output_files[key] = open("out/" + key, 'w')
156         return output_files[key]
157     else:
158         return sys.stdout
159
160 def close_all():
161     for f in output_files.values():
162         f.close()
163
164
165 def parse_file(name: str):
166     with open(name) as f:
167         for line in f:
168             fields = line.split()
169             timestamp = int(fields.pop(0))
170             dt = datetime.datetime.fromtimestamp(timestamp)
171             values = [int(x, base=16) for x in fields]
172             frame = Frame()
173             try:
174                 frame.parse(values)
175                 f = get_file(frame)
176                 print(dt, frame, file=f)
177             except ParseError as x:
178                 print(dt, 'ERROR({}): '.format(x) + " ".join(fields))
179
180
181 if len(sys.argv) > 1:
182     parse_file(sys.argv[1])
183 else:
184     parse_file('/var/log/bsb-frames')
185 close_all()