]> mj.ucw.cz Git - home-hw.git/commitdiff
BSB: Packet analyser
authorMartin Mares <mj@ucw.cz>
Fri, 28 Feb 2020 21:36:34 +0000 (22:36 +0100)
committerMartin Mares <mj@ucw.cz>
Fri, 28 Feb 2020 21:36:34 +0000 (22:36 +0100)
bsb/analyser/analyse.py [new file with mode: 0644]

diff --git a/bsb/analyser/analyse.py b/bsb/analyser/analyse.py
new file mode 100644 (file)
index 0000000..f5fdd4b
--- /dev/null
@@ -0,0 +1,180 @@
+import os
+import sys
+import datetime
+import collections
+from typing import List
+
+class ParseError(RuntimeError):
+    pass
+
+
+class Frame:
+
+    def __init__(self):
+        self.reg = None
+
+    def next_u8(self):
+        if len(self.x):
+            return self.x.popleft()
+        else:
+            raise ValueError
+
+    def next_u16(self):
+        hi = self.next_u8()
+        lo = self.next_u8()
+        return (hi << 8) + lo
+
+    def next_u16le(self):
+        lo = self.next_u8()
+        hi = self.next_u8()
+        return (hi << 8) + lo
+
+    def next_u32(self):
+        hi = self.next_u16()
+        lo = self.next_u16()
+        return (hi << 16) + lo
+
+    def next_u32_swapped(self):
+        hi = self.next_u16le()
+        lo = self.next_u16()
+        return (hi << 16) + lo
+
+    def dump_rest(self):
+        return " ".join(["{:02x}".format(b) for b in self.x])
+
+    def _parse_reg(self):
+        self.reg = self.next_u32()
+        self.text = "{:08x}".format(self.reg)
+
+    def _parse_reg_swapped(self):
+        self.reg = self.next_u32_swapped()
+        self.text = "{:08x}".format(self.reg)
+
+    def _parse_null(self):
+        self.text = ""
+
+    op_table = {
+        1: {
+            "name": "REQUEST_INFO",
+            "parser": _parse_reg,
+        },
+        2: {
+            "name": "INFO",
+            "parser": _parse_reg,
+        },
+        3: {
+            "name": "SET",
+            "parser": _parse_reg_swapped,
+        },
+        4: {
+            "name": "ACK",
+            "parser": _parse_null,
+        },
+        5: {
+            "name": "NACK",
+            "parser": _parse_null,
+        },
+        6: {
+            "name": "QUERY",
+            "parser": _parse_reg_swapped,
+        },
+        7: {
+            "name": "ANSWER",
+            "parser": _parse_reg,
+        },
+        8: {
+            "name": "ERROR",
+            "parser": _parse_null,
+        },
+        0x0f: {
+            "name": "QUERY_DEF",
+            "parser": _parse_reg_swapped,
+        },
+        0x10: {
+            "name": "ANSWER_DEF",
+            "parser": _parse_reg_swapped,
+        },
+    }
+
+    def _parse(self):
+        # Parse frame header
+        if self.next_u8() != 0xdc:
+            raise ParseError("Missing SOF")
+        self.src = self.next_u8() ^ 0x80
+        self.dst = self.next_u8()
+        if self.next_u8() != len(self.orig):
+            raise ParseError("Invalid length byte")
+        self.op = self.next_u8()
+
+        # Remove CRC from the end
+        if len(self.x) < 2:
+            raise ParseError("Missing CRC")
+        else:
+            self.x.pop()
+            self.x.pop()
+
+        # Which operation it is?
+        if self.op in Frame.op_table:
+            op_def = Frame.op_table[self.op]
+            self.op_name = op_def["name"]
+            op_def["parser"](self)
+        else:
+            raise ParseError("Unknown OP")
+
+    def __str__(self):
+        rest = self.dump_rest()
+        return ("{:02x} -> {:02x} {}".format(self.src, self.dst, self.op_name) +
+                (" " if len(self.text) else "") +
+                self.text +
+                (": " if len(rest) else "") +
+                rest)
+
+    def parse(self, values: List[int]):
+        self.orig = values
+        self.x = collections.deque(values)
+        self._parse()
+
+
+file_output = True
+output_files = {}
+
+def get_file(frame):
+    if file_output:
+        if len(output_files) == 0:
+            try:
+                os.mkdir("out")
+            except FileExistsError:
+                pass
+        if frame.reg is not None:
+            key = "{:08x}".format(frame.reg)
+        else:
+            key = "OTHER"
+        if key not in output_files:
+            output_files[key] = open("out/" + key, 'w')
+        return output_files[key]
+    else:
+        return sys.stdout
+
+def close_all():
+    for f in output_files.values():
+        f.close()
+
+
+def parse_file(name: str):
+    with open(name) as f:
+        for line in f:
+            fields = line.split()
+            timestamp = int(fields.pop(0))
+            dt = datetime.datetime.fromtimestamp(timestamp)
+            values = [int(x, base=16) for x in fields]
+            frame = Frame()
+            try:
+                frame.parse(values)
+                f = get_file(frame)
+                print(dt, frame, file=f)
+            except ParseError as x:
+                print(dt, 'ERROR({}): '.format(x) + " ".join(fields))
+
+
+parse_file('/var/log/bsb-frames')
+close_all()
\ No newline at end of file