From b7c9d960143220d6e92a77546bdc9a603517fdd5 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Fri, 28 Feb 2020 22:36:34 +0100 Subject: [PATCH] BSB: Packet analyser --- bsb/analyser/analyse.py | 180 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 bsb/analyser/analyse.py diff --git a/bsb/analyser/analyse.py b/bsb/analyser/analyse.py new file mode 100644 index 0000000..f5fdd4b --- /dev/null +++ b/bsb/analyser/analyse.py @@ -0,0 +1,180 @@ +import os +import sys +import datetime +import collections +from typing import List + +class ParseError(RuntimeError): + pass + + +class Frame: + + def __init__(self): + self.reg = None + + def next_u8(self): + if len(self.x): + return self.x.popleft() + else: + raise ValueError + + def next_u16(self): + hi = self.next_u8() + lo = self.next_u8() + return (hi << 8) + lo + + def next_u16le(self): + lo = self.next_u8() + hi = self.next_u8() + return (hi << 8) + lo + + def next_u32(self): + hi = self.next_u16() + lo = self.next_u16() + return (hi << 16) + lo + + def next_u32_swapped(self): + hi = self.next_u16le() + lo = self.next_u16() + return (hi << 16) + lo + + def dump_rest(self): + return " ".join(["{:02x}".format(b) for b in self.x]) + + def _parse_reg(self): + self.reg = self.next_u32() + self.text = "{:08x}".format(self.reg) + + def _parse_reg_swapped(self): + self.reg = self.next_u32_swapped() + self.text = "{:08x}".format(self.reg) + + def _parse_null(self): + self.text = "" + + op_table = { + 1: { + "name": "REQUEST_INFO", + "parser": _parse_reg, + }, + 2: { + "name": "INFO", + "parser": _parse_reg, + }, + 3: { + "name": "SET", + "parser": _parse_reg_swapped, + }, + 4: { + "name": "ACK", + "parser": _parse_null, + }, + 5: { + "name": "NACK", + "parser": _parse_null, + }, + 6: { + "name": "QUERY", + "parser": _parse_reg_swapped, + }, + 7: { + "name": "ANSWER", + "parser": _parse_reg, + }, + 8: { + "name": "ERROR", + "parser": _parse_null, + }, + 0x0f: { + "name": "QUERY_DEF", + "parser": _parse_reg_swapped, + }, + 0x10: { + "name": "ANSWER_DEF", + "parser": _parse_reg_swapped, + }, + } + + def _parse(self): + # Parse frame header + if self.next_u8() != 0xdc: + raise ParseError("Missing SOF") + self.src = self.next_u8() ^ 0x80 + self.dst = self.next_u8() + if self.next_u8() != len(self.orig): + raise ParseError("Invalid length byte") + self.op = self.next_u8() + + # Remove CRC from the end + if len(self.x) < 2: + raise ParseError("Missing CRC") + else: + self.x.pop() + self.x.pop() + + # Which operation it is? + if self.op in Frame.op_table: + op_def = Frame.op_table[self.op] + self.op_name = op_def["name"] + op_def["parser"](self) + else: + raise ParseError("Unknown OP") + + def __str__(self): + rest = self.dump_rest() + return ("{:02x} -> {:02x} {}".format(self.src, self.dst, self.op_name) + + (" " if len(self.text) else "") + + self.text + + (": " if len(rest) else "") + + rest) + + def parse(self, values: List[int]): + self.orig = values + self.x = collections.deque(values) + self._parse() + + +file_output = True +output_files = {} + +def get_file(frame): + if file_output: + if len(output_files) == 0: + try: + os.mkdir("out") + except FileExistsError: + pass + if frame.reg is not None: + key = "{:08x}".format(frame.reg) + else: + key = "OTHER" + if key not in output_files: + output_files[key] = open("out/" + key, 'w') + return output_files[key] + else: + return sys.stdout + +def close_all(): + for f in output_files.values(): + f.close() + + +def parse_file(name: str): + with open(name) as f: + for line in f: + fields = line.split() + timestamp = int(fields.pop(0)) + dt = datetime.datetime.fromtimestamp(timestamp) + values = [int(x, base=16) for x in fields] + frame = Frame() + try: + frame.parse(values) + f = get_file(frame) + print(dt, frame, file=f) + except ParseError as x: + print(dt, 'ERROR({}): '.format(x) + " ".join(fields)) + + +parse_file('/var/log/bsb-frames') +close_all() \ No newline at end of file -- 2.39.2