]> mj.ucw.cz Git - home-hw.git/blob - influx/burrow-influx.c
Influx: Log loft_fan again
[home-hw.git] / influx / burrow-influx.c
1 /*
2  *      A gateway between MQTT and InfluxDB
3  *
4  *      (c) 2018--2020 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/fastbuf.h>
9 #include <ucw/log.h>
10 #include <ucw/opt.h>
11 #include <ucw/string.h>
12
13 #include <pthread.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <syslog.h>
18 #include <time.h>
19 #include <unistd.h>
20
21 #include <curl/curl.h>
22 #include <mosquitto.h>
23
24 #define MEASUREMENT_TIMEOUT 120
25
26 #define INFLUX_URL "http://localhost:8086/write?db=burrow&precision=s"
27 #define INFLUX_TIMEOUT 30
28 #define INFLUX_VERBOSE 0
29 #define INFLUX_INTERVAL 60
30
31 static struct mosquitto *mosq;
32
33 struct attr {
34         const char *metric;
35         const char *value_name;
36         const char *topic;
37         uint timeout;
38         bool is_string;
39 };
40
41 static const struct attr attr_table[] = {
42         {
43                 .metric = "temp,where=loft",
44                 .value_name = "t",
45                 .topic = "burrow/temp/loft",
46         },
47         {
48                 .metric = "temp,where=ursarium",
49                 .value_name = "t",
50                 .topic = "burrow/temp/ursarium"
51         },
52         {
53                 .metric = "temp,where=catarium",
54                 .value_name = "t",
55                 .topic = "burrow/temp/catarium"
56         },
57         {
58                 .metric = "temp,where=garage",
59                 .value_name = "t",
60                 .topic = "burrow/temp/garage"
61         },
62         {
63                 .metric = "temp,where=terarium",
64                 .value_name = "t",
65                 .topic = "burrow/temp/terarium"
66         },
67         {
68                 .metric = "rh,where=ursarium",
69                 .value_name = "rh",
70                 .topic = "burrow/temp/ursarium-rh"
71         },
72         {
73                 .metric = "temp,where=catarium_clock",
74                 .value_name = "t",
75                 .topic = "burrow/temp/clock"
76         },
77         {
78                 .metric = "pressure,where=catarium_clock",
79                 .value_name = "pa",
80                 .topic = "burrow/pressure/clock"
81         },
82         {
83                 .metric = "air,where=inside_intake",
84                 .value_name = "t",
85                 .topic = "burrow/air/inside-intake",
86         },
87         {
88                 .metric = "air,where=inside_exhaust",
89                 .value_name = "t",
90                 .topic = "burrow/air/inside-exhaust",
91         },
92         {
93                 .metric = "air,where=outside_intake",
94                 .value_name = "t",
95                 .topic = "burrow/air/outside-intake",
96         },
97         {
98                 .metric = "air,where=outside_exhaust",
99                 .value_name = "t",
100                 .topic = "burrow/air/outside-exhaust",
101         },
102         {
103                 .metric = "air,where=mixed",
104                 .value_name = "t",
105                 .topic = "burrow/air/mixed",
106         },
107         {
108                 .metric = "air,where=inside_intake_avg",
109                 .value_name = "t",
110                 .topic = "burrow/avg/air/inside-intake",
111         },
112         {
113                 .metric = "air,where=outside_intake_avg",
114                 .value_name = "t",
115                 .topic = "burrow/avg/air/outside-intake",
116         },
117         {
118                 .metric = "air_bypass",
119                 .value_name = "on",
120                 .topic = "burrow/air/bypass"
121         },
122         {
123                 .metric = "air_fan_pwm",
124                 .value_name = "pwm",
125                 .topic = "burrow/air/exchanger-fan"
126         },
127         {
128                 .metric = "loft_fan",
129                 .topic = "burrow/loft/fan"
130         },
131         {
132                 .metric = "water_circ",
133                 .value_name = "on",
134                 .topic = "burrow/loft/circulation"
135         },
136         {
137                 .metric = "pm_voltage,phase=L1N",
138                 .value_name = "V",
139                 .topic = "burrow/power/voltage/l1n",
140         },
141         {
142                 .metric = "pm_voltage,phase=L2N",
143                 .value_name = "V",
144                 .topic = "burrow/power/voltage/l2n",
145         },
146         {
147                 .metric = "pm_voltage,phase=L3N",
148                 .value_name = "V",
149                 .topic = "burrow/power/voltage/l3n",
150         },
151         {
152                 .metric = "pm_current,phase=L1",
153                 .value_name = "A",
154                 .topic = "burrow/power/current/l1",
155         },
156         {
157                 .metric = "pm_current,phase=L2",
158                 .value_name = "A",
159                 .topic = "burrow/power/current/l2",
160         },
161         {
162                 .metric = "pm_current,phase=L3",
163                 .value_name = "A",
164                 .topic = "burrow/power/current/l3",
165         },
166         {
167                 .metric = "pm_power",
168                 .value_name = "W",
169                 .topic = "burrow/power/power",
170         },
171         {
172                 .metric = "pm_energy",
173                 .value_name = "kWh",
174                 .topic = "burrow/power/energy",
175         },
176         {
177                 .metric = "pm_reactive_power",
178                 .value_name = "VAr",
179                 .topic = "burrow/power/reactive/power",
180         },
181         {
182                 .metric = "pm_reactive_energy",
183                 .value_name = "kVArh",
184                 .topic = "burrow/power/reactive/energy",
185         },
186         {
187                 .metric = "heating_water_pressure",
188                 .value_name = "p",
189                 .topic = "burrow/heating/water-pressure",
190                 .timeout = 300,
191         },
192         {
193                 .metric = "heating_outside_temp",
194                 .value_name = "t",
195                 .topic = "burrow/heating/outside-temp",
196                 .timeout = 660,
197         },
198         {
199                 .metric = "heating_room_temp,circuit=1",
200                 .value_name = "t",
201                 .topic = "burrow/heating/circuit1/room-temp",
202                 .timeout = 300,
203         },
204         {
205                 .metric = "heating_mix-temp,circuit=1",
206                 .value_name = "t",
207                 .topic = "burrow/heating/circuit1/mix-temp",
208                 .timeout = 300,
209         },
210         {
211                 .metric = "heating_mix-valve,circuit=1",
212                 .value_name = "x",
213                 .topic = "burrow/heating/circuit1/mix-valve",
214                 .timeout = 300,
215         },
216         {
217                 .metric = "heating_active,circuit=1",
218                 .value_name = "x",
219                 .topic = "burrow/heating/circuit1/active",
220                 .timeout = 660,
221         },
222         {
223                 .metric = "heating_pump_active,circuit=1",
224                 .value_name = "x",
225                 .topic = "burrow/heating/circuit1/pump",
226                 .timeout = 300,
227         },
228         {
229                 .metric = "heating_room_temp,circuit=2",
230                 .value_name = "t",
231                 .topic = "burrow/heating/circuit2/room-temp",
232                 .timeout = 300,
233         },
234         {
235                 .metric = "heating_active,circuit=2",
236                 .value_name = "x",
237                 .topic = "burrow/heating/circuit2/active",
238                 .timeout = 660,
239         },
240         {
241                 .metric = "heating_active,circuit=water",
242                 .value_name = "x",
243                 .topic = "burrow/heating/water/active",
244                 .timeout = 660,
245         },
246         {
247                 .metric = "heating_error",
248                 .value_name = "err",
249                 .topic = "burrow/heating/error",
250                 .timeout = 300,
251         },
252         {
253                 .metric = "heating_clock",
254                 .value_name = "t",
255                 .topic = "burrow/heating/clock",
256                 .timeout = 660,
257                 .is_string = true,
258         },
259 };
260
261 /*** MQTT ***/
262
263 static char *attr_values[ARRAY_SIZE(attr_table)];
264 static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
265
266 static void mqtt_publish(const char *topic, const char *fmt, ...)
267 {
268         va_list args;
269         va_start(args, fmt);
270         char m[256];
271         int l = vsnprintf(m, sizeof(m), fmt, args);
272         if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
273                 msg(L_ERROR, "Mosquitto: publish failed");
274         va_end(args);
275 }
276
277 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
278 {
279         if (!status) {
280                 msg(L_DEBUG, "MQTT: Connection established, subscribing");
281                 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
282                         die("Mosquitto: subscribe failed");
283
284                 mqtt_publish("status/influx", "ok");
285         }
286 }
287
288 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
289 {
290         // msg(L_INFO, "MQTT(%d): %s", level, message);
291 }
292
293 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
294 {
295         char val[256];
296         if (m->payloadlen >= sizeof(val) - 1) {
297                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
298                 return;
299         }
300         memcpy(val, m->payload, m->payloadlen);
301         val[m->payloadlen] = 0;
302         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
303
304         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
305                 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
306                         pthread_mutex_lock(&attr_mutex);
307                         if (attr_values[i]) {
308                                 xfree(attr_values[i]);
309                                 attr_values[i] = NULL;
310                         }
311                         if (val[0])
312                                 attr_values[i] = xstrdup(val);
313                         pthread_mutex_unlock(&attr_mutex);
314                 }
315         }
316 }
317
318 /*** InfluxDB ***/
319
320 static CURL *curl;
321
322 static struct fastbuf *influx_fb;
323 static struct fastbuf *influx_err_fb;
324
325 static size_t influx_write_callback(char *ptr, size_t size, size_t nmemb, void *userdata UNUSED)
326 {
327         ASSERT(size == 1);
328         for (size_t i=0; i<nmemb; i++) {
329                 int c = *(byte *)ptr;
330                 ptr++;
331                 if (c == '\r')
332                         continue;
333                 if (c == '\n')
334                         c = ' ';
335                 else if (c < 0x20 || c > 0x7e)
336                         c = '?';
337                 bputc(influx_err_fb, c);
338         }
339         return nmemb;
340 }
341
342 static void influx_init(void)
343 {
344         curl = curl_easy_init();
345         if (!curl)
346                 die("libcurl init failed");
347
348         curl_easy_setopt(curl, CURLOPT_URL, INFLUX_URL);
349         curl_easy_setopt(curl, CURLOPT_VERBOSE, (long) INFLUX_VERBOSE);
350         curl_easy_setopt(curl, CURLOPT_TIMEOUT, (long) INFLUX_TIMEOUT);
351         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, influx_write_callback);
352
353         influx_fb = fbgrow_create(4096);
354         influx_err_fb = fbgrow_create(256);
355 }
356
357 static struct fastbuf *influx_write_start(void)
358 {
359         fbgrow_reset(influx_fb);
360         return influx_fb;
361 }
362
363 static void influx_write_flush(void)
364 {
365         bputc(influx_fb, 0);
366         byte *buf;
367         fbgrow_get_buf(influx_fb, &buf);
368         curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf);
369
370         msg(L_DEBUG, "InfluxDB: Sending data (%u bytes)", strlen(buf));
371         if (INFLUX_VERBOSE)
372                 fprintf(stderr, "%s", buf);
373
374         fbgrow_reset(influx_err_fb);
375
376         CURLcode code = curl_easy_perform(curl);
377
378         if (code != CURLE_OK) {
379                 msg(L_ERROR, "Write to InfluxDB failed: %s", curl_easy_strerror(code));
380                 return;
381         }
382
383         long rc = 0;
384         curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc);
385         if (rc != 200 && rc != 204) {
386                 bputc(influx_err_fb, 0);
387                 byte *err;
388                 fbgrow_get_buf(influx_err_fb, &err);
389                 msg(L_DEBUG, "InfluxDB HTTP error %ld: %s [...]", rc, err);
390         }
391 }
392
393 /*** Main ***/
394
395 static int use_daemon;
396 static int use_debug;
397
398 static struct opt_section options = {
399         OPT_ITEMS {
400                 OPT_HELP("A daemon for transferring MQTT data to InfluxDB"),
401                 OPT_HELP(""),
402                 OPT_HELP("Options:"),
403                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
404                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
405                 OPT_HELP_OPTION,
406                 OPT_CONF_OPTIONS,
407                 OPT_END
408         }
409 };
410
411 int main(int argc UNUSED, char **argv)
412 {
413         log_init(argv[0]);
414         opt_parse(&options, argv+1);
415
416         if (use_daemon) {
417                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
418                 log_set_default_stream(ls);
419         }
420         if (!use_debug)
421                 log_default_stream()->levels &= ~(1U << L_DEBUG);
422
423         mosquitto_lib_init();
424         mosq = mosquitto_new("influx", 1, NULL);
425         if (!mosq)
426                 die("Mosquitto: initialization failed");
427
428         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
429         mosquitto_log_callback_set(mosq, mqtt_log_callback);
430         mosquitto_message_callback_set(mosq, mqtt_msg_callback);
431
432         if (mosquitto_will_set(mosq, "status/influx", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
433                 die("Mosquitto: unable to set will");
434
435         if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS)
436                 die("Mosquitto: unable to set TLS parameters");
437
438         if (mosquitto_connect_async(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS)
439                 die("Mosquitto: connect failed");
440
441         if (mosquitto_loop_start(mosq))
442                 die("Mosquitto: cannot start service thread");
443
444         influx_init();
445
446         for (;;) {
447                 struct fastbuf *f = influx_write_start();
448                 char val[256], *w[2];
449                 time_t now = time(NULL);
450
451                 for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
452                         const struct attr *a = &attr_table[i];
453
454                         pthread_mutex_lock(&attr_mutex);
455                         snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
456                         pthread_mutex_unlock(&attr_mutex);
457
458                         if (!val[0])
459                                 continue;
460                         int fields = str_wordsplit(val, w, ARRAY_SIZE(w));
461                         if (fields < 1)
462                                 continue;
463                         if (fields >= 2) {
464                                 time_t t = atoll(w[1]);
465                                 uint timeout = a->timeout ? : MEASUREMENT_TIMEOUT;
466                                 if (t < now - timeout)
467                                         continue;
468                         }
469
470                         if (!a->is_string)
471                                 bprintf(f, "%s %s=%s\n", a->metric, a->value_name, val);
472                         else
473                                 bprintf(f, "%s %s=\"%s\"\n", a->metric, a->value_name, val);
474                 }
475                 influx_write_flush();
476                 sleep(INFLUX_INTERVAL);
477         }
478 }