]> mj.ucw.cz Git - home-hw.git/blob - influx/burrow-influx.c
Influx: Typo
[home-hw.git] / influx / burrow-influx.c
1 /*
2  *      A gateway between MQTT and InfluxDB
3  *
4  *      (c) 2018--2020 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/fastbuf.h>
9 #include <ucw/log.h>
10 #include <ucw/opt.h>
11 #include <ucw/string.h>
12
13 #include <pthread.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <syslog.h>
18 #include <time.h>
19 #include <unistd.h>
20
21 #include <curl/curl.h>
22 #include <mosquitto.h>
23
24 #define MEASUREMENT_TIMEOUT 120
25
26 #define INFLUX_URL "http://localhost:8086/write?db=burrow&precision=s"
27 #define INFLUX_TIMEOUT 30
28 #define INFLUX_VERBOSE 0
29 #define INFLUX_INTERVAL 60
30
31 static struct mosquitto *mosq;
32
33 struct attr {
34         const char *metric;
35         const char *value_name;
36         const char *topic;
37         uint timeout;
38 };
39
40 static const struct attr attr_table[] = {
41         {
42                 .metric = "temp,where=loft",
43                 .value_name = "t",
44                 .topic = "burrow/temp/loft",
45         },
46         {
47                 .metric = "temp,where=ursarium",
48                 .value_name = "t",
49                 .topic = "burrow/temp/ursarium"
50         },
51         {
52                 .metric = "temp,where=catarium",
53                 .value_name = "t",
54                 .topic = "burrow/temp/catarium"
55         },
56         {
57                 .metric = "temp,where=garage",
58                 .value_name = "t",
59                 .topic = "burrow/temp/garage"
60         },
61         {
62                 .metric = "temp,where=kitchen",
63                 .value_name = "t",
64                 .topic = "burrow/temp/kitchen"
65         },
66         {
67                 .metric = "rh,where=ursarium",
68                 .value_name = "rh",
69                 .topic = "burrow/temp/ursarium-rh"
70         },
71         {
72                 .metric = "temp,where=catarium_clock",
73                 .value_name = "t",
74                 .topic = "burrow/temp/clock"
75         },
76         {
77                 .metric = "pressure,where=catarium_clock",
78                 .value_name = "pa",
79                 .topic = "burrow/pressure/clock"
80         },
81         {
82                 .metric = "air,where=inside_intake",
83                 .value_name = "t",
84                 .topic = "burrow/air/inside-intake",
85         },
86         {
87                 .metric = "air,where=inside_exhaust",
88                 .value_name = "t",
89                 .topic = "burrow/air/inside-exhaust",
90         },
91         {
92                 .metric = "air,where=outside_intake",
93                 .value_name = "t",
94                 .topic = "burrow/air/outside-intake",
95         },
96         {
97                 .metric = "air,where=outside_exhaust",
98                 .value_name = "t",
99                 .topic = "burrow/air/outside-exhaust",
100         },
101         {
102                 .metric = "air,where=mixed",
103                 .value_name = "t",
104                 .topic = "burrow/air/mixed",
105         },
106         {
107                 .metric = "air,where=inside_intake_avg",
108                 .value_name = "t",
109                 .topic = "burrow/avg/air/inside-intake",
110         },
111         {
112                 .metric = "air,where=outside_intake_avg",
113                 .value_name = "t",
114                 .topic = "burrow/avg/air/outside-intake",
115         },
116         {
117                 .metric = "air_bypass",
118                 .value_name = "on",
119                 .topic = "burrow/air/bypass"
120         },
121         {
122                 .metric = "air_fan_pwm",
123                 .value_name = "pwm",
124                 .topic = "burrow/air/exchanger-fan"
125         },
126 #if 0
127         {
128                 .metric = "loft_fan",
129                 .topic = "burrow/loft/fan"
130         },
131 #endif
132         {
133                 .metric = "water_circ",
134                 .value_name = "on",
135                 .topic = "burrow/loft/circulation"
136         },
137         {
138                 .metric = "pm_voltage,phase=L1N",
139                 .value_name = "V",
140                 .topic = "burrow/power/voltage/l1n",
141         },
142         {
143                 .metric = "pm_voltage,phase=L2N",
144                 .value_name = "V",
145                 .topic = "burrow/power/voltage/l2n",
146         },
147         {
148                 .metric = "pm_voltage,phase=L3N",
149                 .value_name = "V",
150                 .topic = "burrow/power/voltage/l3n",
151         },
152         {
153                 .metric = "pm_current,phase=L1",
154                 .value_name = "A",
155                 .topic = "burrow/power/current/l1",
156         },
157         {
158                 .metric = "pm_current,phase=L2",
159                 .value_name = "A",
160                 .topic = "burrow/power/current/l2",
161         },
162         {
163                 .metric = "pm_current,phase=L3",
164                 .value_name = "A",
165                 .topic = "burrow/power/current/l3",
166         },
167         {
168                 .metric = "pm_power",
169                 .value_name = "W",
170                 .topic = "burrow/power/power",
171         },
172         {
173                 .metric = "pm_energy",
174                 .value_name = "kWh",
175                 .topic = "burrow/power/energy",
176         },
177         {
178                 .metric = "pm_reactive_power",
179                 .value_name = "VAr",
180                 .topic = "burrow/power/reactive/power",
181         },
182         {
183                 .metric = "pm_reactive_energy",
184                 .value_name = "kVArh",
185                 .topic = "burrow/power/reactive/energy",
186         },
187         {
188                 .metric = "heating_water_pressure",
189                 .value_name = "p",
190                 .topic = "burrow/heating/water-pressure",
191                 .timeout = 300,
192         },
193         {
194                 .metric = "heating_outside_temp",
195                 .value_name = "t",
196                 .topic = "burrow/heating/outside-temp",
197                 .timeout = 660,
198         },
199         {
200                 .metric = "heating_room_temp,circuit=1",
201                 .value_name = "t",
202                 .topic = "burrow/heating/circuit1/room-temp",
203                 .timeout = 300,
204         },
205         {
206                 .metric = "heating_mix-temp,circuit=1",
207                 .value_name = "t",
208                 .topic = "burrow/heating/circuit1/mix-temp",
209                 .timeout = 300,
210         },
211         {
212                 .metric = "heating_mix-valve,circuit=1",
213                 .value_name = "x",
214                 .topic = "burrow/heating/circuit1/mix-valve",
215                 .timeout = 300,
216         },
217         {
218                 .metric = "heating_active,circuit=1",
219                 .value_name = "x",
220                 .topic = "burrow/heating/circuit1/active",
221                 .timeout = 660,
222         },
223         {
224                 .metric = "heating_pump_active,circuit=1",
225                 .value_name = "x",
226                 .topic = "burrow/heating/circuit1/pump",
227                 .timeout = 300,
228         },
229         {
230                 .metric = "heating_room_temp,circuit=2",
231                 .value_name = "t",
232                 .topic = "burrow/heating/circuit2/room-temp",
233                 .timeout = 300,
234         },
235         {
236                 .metric = "heating_active,circuit=2",
237                 .value_name = "x",
238                 .topic = "burrow/heating/circuit2/active",
239                 .timeout = 660,
240         },
241         {
242                 .metric = "heating_active,circuit=water",
243                 .value_name = "x",
244                 .topic = "burrow/heating/water/active",
245                 .timeout = 660,
246         },
247 };
248
249 /*** MQTT ***/
250
251 static char *attr_values[ARRAY_SIZE(attr_table)];
252 static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
253
254 static void mqtt_publish(const char *topic, const char *fmt, ...)
255 {
256         va_list args;
257         va_start(args, fmt);
258         char m[256];
259         int l = vsnprintf(m, sizeof(m), fmt, args);
260         if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
261                 msg(L_ERROR, "Mosquitto: publish failed");
262         va_end(args);
263 }
264
265 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
266 {
267         if (!status) {
268                 msg(L_DEBUG, "MQTT: Connection established, subscribing");
269                 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
270                         die("Mosquitto: subscribe failed");
271
272                 mqtt_publish("status/influx", "ok");
273         }
274 }
275
276 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
277 {
278         // msg(L_INFO, "MQTT(%d): %s", level, message);
279 }
280
281 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
282 {
283         char val[256];
284         if (m->payloadlen >= sizeof(val) - 1) {
285                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
286                 return;
287         }
288         memcpy(val, m->payload, m->payloadlen);
289         val[m->payloadlen] = 0;
290         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
291
292         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
293                 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
294                         pthread_mutex_lock(&attr_mutex);
295                         if (attr_values[i]) {
296                                 xfree(attr_values[i]);
297                                 attr_values[i] = NULL;
298                         }
299                         if (val[0])
300                                 attr_values[i] = xstrdup(val);
301                         pthread_mutex_unlock(&attr_mutex);
302                 }
303         }
304 }
305
306 /*** InfluxDB ***/
307
308 static CURL *curl;
309
310 static struct fastbuf *influx_fb;
311 static struct fastbuf *influx_err_fb;
312
313 static size_t influx_write_callback(char *ptr, size_t size, size_t nmemb, void *userdata UNUSED)
314 {
315         ASSERT(size == 1);
316         for (size_t i=0; i<nmemb; i++) {
317                 int c = *(byte *)ptr;
318                 ptr++;
319                 if (c == '\r')
320                         continue;
321                 if (c == '\n')
322                         c = ' ';
323                 else if (c < 0x20 || c > 0x7e)
324                         c = '?';
325                 bputc(influx_err_fb, c);
326         }
327         return nmemb;
328 }
329
330 static void influx_init(void)
331 {
332         curl = curl_easy_init();
333         if (!curl)
334                 die("libcurl init failed");
335
336         curl_easy_setopt(curl, CURLOPT_URL, INFLUX_URL);
337         curl_easy_setopt(curl, CURLOPT_VERBOSE, (long) INFLUX_VERBOSE);
338         curl_easy_setopt(curl, CURLOPT_TIMEOUT, (long) INFLUX_TIMEOUT);
339         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, influx_write_callback);
340
341         influx_fb = fbgrow_create(4096);
342         influx_err_fb = fbgrow_create(256);
343 }
344
345 static struct fastbuf *influx_write_start(void)
346 {
347         fbgrow_reset(influx_fb);
348         return influx_fb;
349 }
350
351 static void influx_write_flush(void)
352 {
353         bputc(influx_fb, 0);
354         byte *buf;
355         fbgrow_get_buf(influx_fb, &buf);
356         curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf);
357
358         msg(L_DEBUG, "InfluxDB: Sending data (%u bytes)", strlen(buf));
359         if (INFLUX_VERBOSE)
360                 fprintf(stderr, "%s", buf);
361
362         fbgrow_reset(influx_err_fb);
363
364         CURLcode code = curl_easy_perform(curl);
365
366         if (code != CURLE_OK) {
367                 msg(L_ERROR, "Write to InfluxDB failed: %s", curl_easy_strerror(code));
368                 return;
369         }
370
371         long rc = 0;
372         curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc);
373         if (rc != 200 && rc != 204) {
374                 bputc(influx_err_fb, 0);
375                 byte *err;
376                 fbgrow_get_buf(influx_err_fb, &err);
377                 msg(L_DEBUG, "InfluxDB HTTP error %ld: %s [...]", rc, err);
378         }
379 }
380
381 /*** Main ***/
382
383 static int use_daemon;
384 static int use_debug;
385
386 static struct opt_section options = {
387         OPT_ITEMS {
388                 OPT_HELP("A daemon for transferring MQTT data to InfluxDB"),
389                 OPT_HELP(""),
390                 OPT_HELP("Options:"),
391                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
392                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
393                 OPT_HELP_OPTION,
394                 OPT_CONF_OPTIONS,
395                 OPT_END
396         }
397 };
398
399 int main(int argc UNUSED, char **argv)
400 {
401         log_init(argv[0]);
402         opt_parse(&options, argv+1);
403
404         if (use_daemon) {
405                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
406                 log_set_default_stream(ls);
407         }
408         if (!use_debug)
409                 log_default_stream()->levels &= ~(1U << L_DEBUG);
410
411         mosquitto_lib_init();
412         mosq = mosquitto_new("influx", 1, NULL);
413         if (!mosq)
414                 die("Mosquitto: initialization failed");
415
416         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
417         mosquitto_log_callback_set(mosq, mqtt_log_callback);
418         mosquitto_message_callback_set(mosq, mqtt_msg_callback);
419
420         if (mosquitto_will_set(mosq, "status/influx", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
421                 die("Mosquitto: unable to set will");
422
423         if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
424                 die("Mosquitto: connect failed");
425
426         if (mosquitto_loop_start(mosq))
427                 die("Mosquitto: cannot start service thread");
428
429         influx_init();
430
431         for (;;) {
432                 struct fastbuf *f = influx_write_start();
433                 char val[256], *w[2];
434                 time_t now = time(NULL);
435
436                 for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
437                         const struct attr *a = &attr_table[i];
438
439                         pthread_mutex_lock(&attr_mutex);
440                         snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
441                         pthread_mutex_unlock(&attr_mutex);
442
443                         if (!val[0])
444                                 continue;
445                         int fields = str_wordsplit(val, w, ARRAY_SIZE(w));
446                         if (fields < 1)
447                                 continue;
448                         if (fields >= 2) {
449                                 time_t t = atoll(w[1]);
450                                 uint timeout = a->timeout ? : MEASUREMENT_TIMEOUT;
451                                 if (t < now - timeout)
452                                         continue;
453                         }
454
455                         bprintf(f, "%s %s=%s\n", a->metric, a->value_name, val);
456                 }
457                 influx_write_flush();
458                 sleep(INFLUX_INTERVAL);
459         }
460 }