]> mj.ucw.cz Git - home-hw.git/blob - bsb/daemon/burrow-bsbd.c
BSB Daemon: MQTT timestamps
[home-hw.git] / bsb / daemon / burrow-bsbd.c
1 /*
2  *      Boiler System Bus Interface Daemon
3  *
4  *      (c) 2020 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/log.h>
9 #include <ucw/opt.h>
10 #include <ucw/string.h>
11 #include <ucw/unaligned.h>
12
13 #include <stdarg.h>
14 #include <stdio.h>
15 #include <stdint.h>
16 #include <stdlib.h>
17 #include <string.h>
18 #include <syslog.h>
19 #include <time.h>
20 #include <unistd.h>
21
22 #include <libusb.h>
23 #include <mosquitto.h>
24
25 typedef unsigned char byte;
26 typedef uint32_t u32;
27 typedef unsigned int uint;
28
29 #include "../firmware/interface.h"
30
31 /*** MQTT ***/
32
33 static struct mosquitto *mosq;
34 static bool mqtt_connected;
35
36 static void mqtt_publish(const char *topic, const char *fmt, ...)
37 {
38         va_list args;
39         va_start(args, fmt);
40
41         if (mqtt_connected) {
42                 char m[256];
43                 int l = vsnprintf(m, sizeof(m), fmt, args);
44                 int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, true);
45                 if (err != MOSQ_ERR_SUCCESS)
46                         msg(L_ERROR, "Mosquitto: Publish failed, error=%d", err);
47         }
48
49         va_end(args);
50 }
51
52 static void mqtt_publish_ephemeral(const char *topic, const char *fmt, ...)
53 {
54         va_list args;
55         va_start(args, fmt);
56
57         if (mqtt_connected) {
58                 char m[256];
59                 int l = vsnprintf(m, sizeof(m), fmt, args);
60                 int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, false);
61                 if (err != MOSQ_ERR_SUCCESS)
62                         msg(L_ERROR, "Mosquitto: Publish failed, error=%d", err);
63         }
64
65         va_end(args);
66 }
67
68 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
69 {
70         if (!status) {
71                 msg(L_DEBUG, "MQTT: Connection established");
72                 mqtt_publish("status/bsb", "ok");
73                 mqtt_connected = true;
74         } else if (mqtt_connected) {
75                 msg(L_DEBUG, "MQTT: Connection lost");
76                 mqtt_connected = false;
77         }
78 }
79
80 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
81 {
82         msg(L_DEBUG, "MQTT(%d): %s", level, message);
83 }
84
85 static void mqtt_init(void)
86 {
87         mosquitto_lib_init();
88
89         mosq = mosquitto_new("bsbd", 1, NULL);
90         if (!mosq)
91                 die("Mosquitto: Initialization failed");
92
93         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
94         mosquitto_log_callback_set(mosq, mqtt_log_callback);
95
96         if (mosquitto_will_set(mosq, "status/bsb", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
97                 die("Mosquitto: Unable to set will");
98
99         if (mosquitto_connect_async(mosq, "127.0.0.1", 1883, 60) != MOSQ_ERR_SUCCESS)
100                 die("Mosquitto: Unable to connect");
101
102         if (mosquitto_loop_start(mosq))
103                 die("Mosquitto: Cannot start service thread");
104 }
105
106 /*** USB ***/
107
108 static struct libusb_context *usb_ctxt;
109 static struct libusb_device_handle *devh;
110
111 static void usb_error(const char *msg, ...)
112 {
113         va_list args;
114         va_start(args, msg);
115         ucw_vmsg(L_ERROR, msg, args);
116         fputc('\n', stderr);
117         va_end(args);
118
119         if (devh) {
120                 libusb_close(devh);
121                 devh = NULL;
122         }
123 }
124
125 static void open_device(void)
126 {
127         int err;
128         libusb_device **devlist;
129         ssize_t devn = libusb_get_device_list(usb_ctxt, &devlist);
130         if (devn < 0)
131                 die("Cannot enumerate USB devices: error %d", (int) devn);
132
133         for (ssize_t i=0; i<devn; i++) {
134                 struct libusb_device_descriptor desc;
135                 libusb_device *dev = devlist[i];
136                 if (!libusb_get_device_descriptor(dev, &desc)) {
137                         if (desc.idVendor == BSB_USB_VENDOR && desc.idProduct == BSB_USB_PRODUCT) {
138                                 msg(L_INFO, "Found device at usb%d.%d", libusb_get_bus_number(dev), libusb_get_device_address(dev));
139
140                                 if (err = libusb_open(dev, &devh)) {
141                                         usb_error("Cannot open device: error %d", err);
142                                         goto out;
143                                 }
144                                 libusb_reset_device(devh);
145                                 if (err = libusb_claim_interface(devh, 0)) {
146                                         usb_error("Cannot claim interface: error %d", err);
147                                         goto out;
148                                 }
149
150                                 goto out;
151                         }
152                 }
153         }
154
155 out:
156         libusb_free_device_list(devlist, 1);
157 }
158
159 static void init_usb(void)
160 {
161         int err;
162         if (err = libusb_init(&usb_ctxt))
163                 die("Cannot initialize libusb: error %d", err);
164         // libusb_set_debug(usb_ctxt, 3);
165         open_device();
166
167 }
168
169 /*** Protocol ***/
170
171 static const char * const stat_names[] = {
172 #define P(x) #x,
173         BSB_STATS
174 #undef P
175 };
176
177 static int get_s16_be(const byte *x)
178 {
179         int val = get_u16_be(x);
180         if (val >= 0x8000)
181                 return val - 0x10000;
182         else
183                 return val;
184 }
185
186 static void process_stats(time_t t, byte *resp, uint len)
187 {
188         for (uint i=0; i < ARRAY_SIZE(stat_names) && 4*i + 3 < (uint) len; i++) {
189                 char item[64];
190                 snprintf(item, sizeof(item), "bsb/stats/%s", stat_names[i]);
191                 mqtt_publish(item, "%u %lld", (uint) get_u32_le(resp+4*i), (long long) t);
192         }
193 }
194
195 static void process_info(time_t t, byte *p, uint len)
196 {
197         if (len < 4)
198                 return;
199
200         u32 addr = get_u32_be(p);
201         p += 4;
202         len -= 4;
203
204         switch (addr) {
205                 case 0x05000219:
206                         if (len >= 4) {
207                                 int temp = get_s16_be(p);
208                                 uint press = get_u16_be(p + 2);
209                                 mqtt_publish("burrow/heating/outside-temp", "%.2f %lld", temp / 64., (long long) t);
210                                 mqtt_publish("burrow/heating/water-pressure", "%.1f %lld", press / 10., (long long) t);
211                         }
212                         break;
213                 case 0x05000229:
214                         // AGU.2 status
215                         if (len >= 2) {
216                                 int temp = get_s16_be(p);
217                                 mqtt_publish("burrow/heating/circuit1/mix-temp", "%.2f %lld", temp / 64., (long long) t);
218                         }
219                         break;
220                 case 0x05040227:
221                         // AGU.2 control
222                         if (len >= 2) {
223                                 uint m = get_u16_be(p);
224                                 mqtt_publish("burrow/heating/circuit1/mix-valve", "%u %lld", m, (long long) t);
225                         }
226                         break;
227                 case 0x3d2d0215:
228                         // Room 1 status
229                         if (len >= 2) {
230                                 int temp = get_s16_be(p);
231                                 mqtt_publish("burrow/heating/circuit1/room-temp", "%.2f %lld", temp / 64., (long long) t);
232                         }
233                         break;
234                 case 0x3e2e0215:
235                         // Room 2 status
236                         if (len >= 2) {
237                                 int temp = get_s16_be(p);
238                                 mqtt_publish("burrow/heating/circuit2/room-temp", "%.2f %lld", temp / 64., (long long) t);
239                         }
240                         break;
241         }
242 }
243
244 static void process_answer(time_t t, byte *p, uint len)
245 {
246         if (len < 4)
247                 return;
248
249         u32 addr = get_u32_be(p);
250         p += 4;
251         len -= 4;
252
253         switch (addr) {
254         }
255 }
256
257 static void process_frame(time_t t, byte *pkt, uint len)
258 {
259         if (!len) {
260                 msg(L_ERROR, "Received empty frame");
261                 return;
262         }
263
264         byte status = *pkt++;
265         len--;
266
267         if (!len) {
268                 msg(L_ERROR, "Frame transmit status: %u", status);
269                 return;
270         }
271
272         if (len < 6) {
273                 msg(L_ERROR, "Received truncated frame");
274                 return;
275         }
276
277         char hex[3*len + 1];
278         mem_to_hex(hex, pkt, len, ' ');
279         mqtt_publish_ephemeral("bsb/frame", "%s", hex, t);
280
281         msg(L_DEBUG, "<< %s", hex);
282
283         byte *body = pkt + BF_BODY;
284         uint body_len = len - BF_BODY - 2;      // 2 for CRC
285
286         switch (pkt[BF_OP]) {
287                 case BSB_OP_INFO:
288                         process_info(t, body, body_len);
289                         break;
290                 case BSB_OP_ANSWER:
291                         process_answer(t, body, body_len);
292                         break;
293         }
294 }
295
296 static int use_daemon;
297 static int use_debug;
298
299 static struct opt_section options = {
300         OPT_ITEMS {
301                 OPT_HELP("A daemon for controlling the air conditioning controller via MQTT"),
302                 OPT_HELP(""),
303                 OPT_HELP("Options:"),
304                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
305                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
306                 OPT_HELP_OPTION,
307                 OPT_CONF_OPTIONS,
308                 OPT_END
309         }
310 };
311
312 int main(int argc UNUSED, char **argv)
313 {
314         log_init(argv[0]);
315         opt_parse(&options, argv+1);
316
317         if (use_daemon) {
318                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
319                 log_set_default_stream(ls);
320         }
321         if (!use_debug)
322                 log_default_stream()->levels &= ~(1U << L_DEBUG);
323
324         mqtt_init();
325         init_usb();
326
327         time_t now = time(NULL);
328         time_t last_stats = 0;
329         // time_t last_query = now;
330         int err, received;
331         byte resp[64];
332
333         for (;;) {
334                 if (!devh) {
335                         msg(L_INFO, "Waiting for device to appear...");
336                         while (!devh) {
337                                 sleep(5);
338                                 open_device();
339                         }
340                 }
341
342                 now = time(NULL);
343                 if (last_stats + 60 < now) {
344                         if ((received = libusb_control_transfer(devh, 0xc0, 0x00, 0, 0, resp, sizeof(resp), 1000)) < 0) {
345                                 usb_error("Receive failed: error %d", received);
346                                 continue;
347                         }
348
349                         process_stats(now, resp, received);
350                         last_stats = now;
351                 }
352
353 #if 0
354                 if (last_query + 10 < now) {
355                         byte pkt[] = { 0xdc, 0xc2, 0x00, 0x0b, 0x06, 0x3d, 0x2e, 0x11, 0x25, 0x00, 0x00 };
356                         if (err = libusb_bulk_transfer(devh, 0x01, pkt, sizeof(pkt), &received, 2000)) {
357                                 usb_error("Send failed: error %d", err);
358                                 continue;
359                         } else {
360                                 // msg(L_DEBUG"Send OK: %d bytes", received);
361                         }
362                         last_query = now;
363                 }
364 #endif
365
366                 if (err = libusb_interrupt_transfer(devh, 0x82, resp, 64, &received, 1000)) {
367                         if (err != LIBUSB_ERROR_TIMEOUT)
368                                 usb_error("Receive failed: error %d", err);
369                         continue;
370                 }
371
372                 process_frame(now, resp, received);
373         }
374 }