]> mj.ucw.cz Git - home-hw.git/blob - bsb/daemon/burrow-bsbd.c
BSB daemon: Frames should be ephemeral in MQTT
[home-hw.git] / bsb / daemon / burrow-bsbd.c
1 /*
2  *      Boiler System Bus Interface Daemon
3  *
4  *      (c) 2020 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/log.h>
9 #include <ucw/opt.h>
10 #include <ucw/string.h>
11 #include <ucw/unaligned.h>
12
13 #include <stdarg.h>
14 #include <stdio.h>
15 #include <stdint.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <syslog.h>
19 #include <time.h>
20 #include <unistd.h>
21
22 #include <libusb.h>
23 #include <mosquitto.h>
24
25 typedef unsigned char byte;
26 typedef uint32_t u32;
27 typedef unsigned int uint;
28
29 #include "../firmware/interface.h"
30
31 /*** MQTT ***/
32
33 static struct mosquitto *mosq;
34 static bool mqtt_connected;
35
36 static void mqtt_publish(const char *topic, const char *fmt, ...)
37 {
38         va_list args;
39         va_start(args, fmt);
40
41         if (mqtt_connected) {
42                 char m[256];
43                 int l = vsnprintf(m, sizeof(m), fmt, args);
44                 int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, true);
45                 if (err != MOSQ_ERR_SUCCESS)
46                         msg(L_ERROR, "Mosquitto: Publish failed, error=%d", err);
47         }
48
49         va_end(args);
50 }
51
52 static void mqtt_publish_ephemeral(const char *topic, const char *fmt, ...)
53 {
54         va_list args;
55         va_start(args, fmt);
56
57         if (mqtt_connected) {
58                 char m[256];
59                 int l = vsnprintf(m, sizeof(m), fmt, args);
60                 int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, false);
61                 if (err != MOSQ_ERR_SUCCESS)
62                         msg(L_ERROR, "Mosquitto: Publish failed, error=%d", err);
63         }
64
65         va_end(args);
66 }
67
68 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
69 {
70         if (!status) {
71                 msg(L_DEBUG, "MQTT: Connection established");
72                 mqtt_publish("status/bsb", "ok");
73                 mqtt_connected = true;
74         } else if (mqtt_connected) {
75                 msg(L_DEBUG, "MQTT: Connection lost");
76                 mqtt_connected = false;
77         }
78 }
79
80 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
81 {
82         msg(L_DEBUG, "MQTT(%d): %s", level, message);
83 }
84
85 static void mqtt_init(void)
86 {
87         mosquitto_lib_init();
88
89         mosq = mosquitto_new("bsbd", 1, NULL);
90         if (!mosq)
91                 die("Mosquitto: Initialization failed");
92
93         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
94         mosquitto_log_callback_set(mosq, mqtt_log_callback);
95
96         if (mosquitto_will_set(mosq, "status/bsb", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
97                 die("Mosquitto: Unable to set will");
98
99         if (mosquitto_connect_async(mosq, "127.0.0.1", 1883, 60) != MOSQ_ERR_SUCCESS)
100                 die("Mosquitto: Unable to connect");
101
102         if (mosquitto_loop_start(mosq))
103                 die("Mosquitto: Cannot start service thread");
104 }
105
106 /*** USB ***/
107
108 static struct libusb_context *usb_ctxt;
109 static struct libusb_device_handle *devh;
110
111 static void usb_error(const char *msg, ...)
112 {
113         va_list args;
114         va_start(args, msg);
115         ucw_vmsg(L_ERROR, msg, args);
116         fputc('\n', stderr);
117         va_end(args);
118
119         if (devh) {
120                 libusb_close(devh);
121                 devh = NULL;
122         }
123 }
124
125 static void open_device(void)
126 {
127         int err;
128         libusb_device **devlist;
129         ssize_t devn = libusb_get_device_list(usb_ctxt, &devlist);
130         if (devn < 0)
131                 die("Cannot enumerate USB devices: error %d", (int) devn);
132
133         for (ssize_t i=0; i<devn; i++) {
134                 struct libusb_device_descriptor desc;
135                 libusb_device *dev = devlist[i];
136                 if (!libusb_get_device_descriptor(dev, &desc)) {
137                         if (desc.idVendor == BSB_USB_VENDOR && desc.idProduct == BSB_USB_PRODUCT) {
138                                 msg(L_INFO, "Found device at usb%d.%d", libusb_get_bus_number(dev), libusb_get_device_address(dev));
139
140                                 if (err = libusb_open(dev, &devh)) {
141                                         usb_error("Cannot open device: error %d", err);
142                                         goto out;
143                                 }
144                                 libusb_reset_device(devh);
145                                 if (err = libusb_claim_interface(devh, 0)) {
146                                         usb_error("Cannot claim interface: error %d", err);
147                                         goto out;
148                                 }
149
150                                 goto out;
151                         }
152                 }
153         }
154
155 out:
156         libusb_free_device_list(devlist, 1);
157 }
158
159 static void init_usb(void)
160 {
161         int err;
162         if (err = libusb_init(&usb_ctxt))
163                 die("Cannot initialize libusb: error %d", err);
164         // libusb_set_debug(usb_ctxt, 3);
165         open_device();
166
167 }
168
169 /*** Protocol ***/
170
171 static const char * const stat_names[] = {
172 #define P(x) #x,
173         BSB_STATS
174 #undef P
175 };
176
177 static void process_stats(time_t t, byte *resp, uint len)
178 {
179         for (uint i=0; i < ARRAY_SIZE(stat_names) && 4*i + 3 < (uint) len; i++) {
180                 char item[64];
181                 snprintf(item, sizeof(item), "bsb/stats/%s", stat_names[i]);
182                 mqtt_publish(item, "%u", (uint) get_u32_le(resp+4*i), t);
183         }
184 }
185
186 static void process_info(byte *p, uint len)
187 {
188         if (len < 4)
189                 return;
190
191         u32 addr = get_u32_be(p);
192         p += 4;
193         len -= 4;
194
195         switch (addr) {
196                 case 0x05000219:
197                         if (len >= 4) {
198                                 uint temp = get_u16_be(p);
199                                 uint press = get_u16_be(p + 2);
200                                 mqtt_publish("burrow/heating/outside-temp", "%.2f", temp / 64.);
201                                 mqtt_publish("burrow/heating/water-pressure", "%.1f", press / 10.);
202                         }
203                         break;
204                 case 0x05000229:
205                         // AGU.2 status
206                         if (len >= 2) {
207                                 uint temp = get_u16_be(p);
208                                 mqtt_publish("burrow/heating/circuit1/mix-temp", "%.2f", temp / 64.);
209                         }
210                         break;
211                 case 0x05040227:
212                         // AGU.2 control
213                         if (len >= 2) {
214                                 uint m = get_u16_be(p);
215                                 mqtt_publish("burrow/heating/circuit1/mix-valve", "%u", m);
216                         }
217                         break;
218                 case 0x3d2d0215:
219                         // Room 1 status
220                         if (len >= 2) {
221                                 uint temp = get_u16_be(p);
222                                 mqtt_publish("burrow/heating/circuit1/room-temp", "%.2f", temp / 64.);
223                         }
224                         break;
225                 case 0x3e2e0215:
226                         // Room 2 status
227                         if (len >= 2) {
228                                 uint temp = get_u16_be(p);
229                                 mqtt_publish("burrow/heating/circuit2/room-temp", "%.2f", temp / 64.);
230                         }
231                         break;
232         }
233 }
234
235 static void process_answer(byte *p, uint len)
236 {
237         if (len < 4)
238                 return;
239
240         u32 addr = get_u32_be(p);
241         p += 4;
242         len -= 4;
243
244         switch (addr) {
245         }
246 }
247
248 static void process_frame(time_t t, byte *pkt, uint len)
249 {
250         if (!len) {
251                 msg(L_ERROR, "Received empty frame");
252                 return;
253         }
254
255         byte status = *pkt++;
256         len--;
257
258         if (!len) {
259                 msg(L_ERROR, "Frame transmit status: %u", status);
260                 return;
261         }
262
263         if (len < 6) {
264                 msg(L_ERROR, "Received truncated frame");
265                 return;
266         }
267
268         char hex[3*len + 1];
269         mem_to_hex(hex, pkt, len, ' ');
270         mqtt_publish_ephemeral("bsb/frame", "%s", hex, t);
271
272         msg(L_DEBUG, "<< %s", hex);
273
274         byte *body = pkt + BF_BODY;
275         uint body_len = len - BF_BODY - 2;      // 2 for CRC
276
277         switch (pkt[BF_OP]) {
278                 case BSB_OP_INFO:
279                         process_info(body, body_len);
280                         break;
281                 case BSB_OP_ANSWER:
282                         process_answer(body, body_len);
283                         break;
284         }
285 }
286
287 static int use_daemon;
288 static int use_debug;
289
290 static struct opt_section options = {
291         OPT_ITEMS {
292                 OPT_HELP("A daemon for controlling the air conditioning controller via MQTT"),
293                 OPT_HELP(""),
294                 OPT_HELP("Options:"),
295                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
296                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
297                 OPT_HELP_OPTION,
298                 OPT_CONF_OPTIONS,
299                 OPT_END
300         }
301 };
302
303 int main(int argc UNUSED, char **argv)
304 {
305         log_init(argv[0]);
306         opt_parse(&options, argv+1);
307
308         if (use_daemon) {
309                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
310                 log_set_default_stream(ls);
311         }
312         if (!use_debug)
313                 log_default_stream()->levels &= ~(1U << L_DEBUG);
314
315         mqtt_init();
316         init_usb();
317
318         time_t now = time(NULL);
319         time_t last_stats = 0;
320         // time_t last_query = now;
321         int err, received;
322         byte resp[64];
323
324         for (;;) {
325                 if (!devh) {
326                         msg(L_INFO, "Waiting for device to appear...");
327                         while (!devh) {
328                                 sleep(5);
329                                 open_device();
330                         }
331                 }
332
333                 now = time(NULL);
334                 if (last_stats + 60 < now) {
335                         if ((received = libusb_control_transfer(devh, 0xc0, 0x00, 0, 0, resp, sizeof(resp), 1000)) < 0) {
336                                 usb_error("Receive failed: error %d", received);
337                                 continue;
338                         }
339
340                         process_stats(now, resp, received);
341                         last_stats = now;
342                 }
343
344 #if 0
345                 if (last_query + 10 < now) {
346                         byte pkt[] = { 0xdc, 0xc2, 0x00, 0x0b, 0x06, 0x3d, 0x2e, 0x11, 0x25, 0x00, 0x00 };
347                         if (err = libusb_bulk_transfer(devh, 0x01, pkt, sizeof(pkt), &received, 2000)) {
348                                 usb_error("Send failed: error %d", err);
349                                 continue;
350                         } else {
351                                 // msg(L_DEBUG"Send OK: %d bytes", received);
352                         }
353                         last_query = now;
354                 }
355 #endif
356
357                 if (err = libusb_interrupt_transfer(devh, 0x82, resp, 64, &received, 1000)) {
358                         if (err != LIBUSB_ERROR_TIMEOUT)
359                                 usb_error("Receive failed: error %d", err);
360                         continue;
361                 }
362
363                 process_frame(now, resp, received);
364         }
365 }