]> mj.ucw.cz Git - home-hw.git/blob - bsb/analyser/analyse.py
BSB: Packet analyser
[home-hw.git] / bsb / analyser / analyse.py
1 import os
2 import sys
3 import datetime
4 import collections
5 from typing import List
6
7 class ParseError(RuntimeError):
8     pass
9
10
11 class Frame:
12
13     def __init__(self):
14         self.reg = None
15
16     def next_u8(self):
17         if len(self.x):
18             return self.x.popleft()
19         else:
20             raise ValueError
21
22     def next_u16(self):
23         hi = self.next_u8()
24         lo = self.next_u8()
25         return (hi << 8) + lo
26
27     def next_u16le(self):
28         lo = self.next_u8()
29         hi = self.next_u8()
30         return (hi << 8) + lo
31
32     def next_u32(self):
33         hi = self.next_u16()
34         lo = self.next_u16()
35         return (hi << 16) + lo
36
37     def next_u32_swapped(self):
38         hi = self.next_u16le()
39         lo = self.next_u16()
40         return (hi << 16) + lo
41
42     def dump_rest(self):
43         return " ".join(["{:02x}".format(b) for b in self.x])
44
45     def _parse_reg(self):
46         self.reg = self.next_u32()
47         self.text = "{:08x}".format(self.reg)
48
49     def _parse_reg_swapped(self):
50         self.reg = self.next_u32_swapped()
51         self.text = "{:08x}".format(self.reg)
52
53     def _parse_null(self):
54         self.text = ""
55
56     op_table = {
57         1: {
58             "name": "REQUEST_INFO",
59             "parser": _parse_reg,
60         },
61         2: {
62             "name": "INFO",
63             "parser": _parse_reg,
64         },
65         3: {
66             "name": "SET",
67             "parser": _parse_reg_swapped,
68         },
69         4: {
70             "name": "ACK",
71             "parser": _parse_null,
72         },
73         5: {
74             "name": "NACK",
75             "parser": _parse_null,
76         },
77         6: {
78             "name": "QUERY",
79             "parser": _parse_reg_swapped,
80         },
81         7: {
82             "name": "ANSWER",
83             "parser": _parse_reg,
84         },
85         8: {
86             "name": "ERROR",
87             "parser": _parse_null,
88         },
89         0x0f: {
90             "name": "QUERY_DEF",
91             "parser": _parse_reg_swapped,
92         },
93         0x10: {
94             "name": "ANSWER_DEF",
95             "parser": _parse_reg_swapped,
96         },
97     }
98
99     def _parse(self):
100         # Parse frame header
101         if self.next_u8() != 0xdc:
102             raise ParseError("Missing SOF")
103         self.src = self.next_u8() ^ 0x80
104         self.dst = self.next_u8()
105         if self.next_u8() != len(self.orig):
106             raise ParseError("Invalid length byte")
107         self.op = self.next_u8()
108
109         # Remove CRC from the end
110         if len(self.x) < 2:
111             raise ParseError("Missing CRC")
112         else:
113             self.x.pop()
114             self.x.pop()
115
116         # Which operation it is?
117         if self.op in Frame.op_table:
118             op_def = Frame.op_table[self.op]
119             self.op_name = op_def["name"]
120             op_def["parser"](self)
121         else:
122             raise ParseError("Unknown OP")
123
124     def __str__(self):
125         rest = self.dump_rest()
126         return ("{:02x} -> {:02x} {}".format(self.src, self.dst, self.op_name) +
127                 (" " if len(self.text) else "") +
128                 self.text +
129                 (": " if len(rest) else "") +
130                 rest)
131
132     def parse(self, values: List[int]):
133         self.orig = values
134         self.x = collections.deque(values)
135         self._parse()
136
137
138 file_output = True
139 output_files = {}
140
141 def get_file(frame):
142     if file_output:
143         if len(output_files) == 0:
144             try:
145                 os.mkdir("out")
146             except FileExistsError:
147                 pass
148         if frame.reg is not None:
149             key = "{:08x}".format(frame.reg)
150         else:
151             key = "OTHER"
152         if key not in output_files:
153             output_files[key] = open("out/" + key, 'w')
154         return output_files[key]
155     else:
156         return sys.stdout
157
158 def close_all():
159     for f in output_files.values():
160         f.close()
161
162
163 def parse_file(name: str):
164     with open(name) as f:
165         for line in f:
166             fields = line.split()
167             timestamp = int(fields.pop(0))
168             dt = datetime.datetime.fromtimestamp(timestamp)
169             values = [int(x, base=16) for x in fields]
170             frame = Frame()
171             try:
172                 frame.parse(values)
173                 f = get_file(frame)
174                 print(dt, frame, file=f)
175             except ParseError as x:
176                 print(dt, 'ERROR({}): '.format(x) + " ".join(fields))
177
178
179 parse_file('/var/log/bsb-frames')
180 close_all()