]> mj.ucw.cz Git - home-hw.git/blob - prometheus/burrow-prometheus.c
Influx: make clean
[home-hw.git] / prometheus / burrow-prometheus.c
1 /*
2  *      A gateway between MQTT and Prometheus
3  *
4  *      (c) 2018--2019 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/fastbuf.h>
9 #include <ucw/log.h>
10 #include <ucw/mempool.h>
11 #include <ucw/opt.h>
12 #include <ucw/string.h>
13
14 #include <netinet/in.h>
15 #include <pthread.h>
16 #include <stdarg.h>
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <sys/poll.h>
21 #include <sys/socket.h>
22 #include <syslog.h>
23 #include <time.h>
24 #include <unistd.h>
25
26 #include <mosquitto.h>
27
28 #define MEASUREMENT_TIMEOUT 120
29
30 static struct mosquitto *mosq;
31
32 struct attr {
33         const char *metric;
34         const char *help;
35         const char *type;
36         const char *topic;
37 };
38
39 static const struct attr attr_table[] = {
40         {
41                 .metric = "temp_loft",
42                 .help = "Temperature in the loft [degC]",
43                 .type = "gauge",
44                 .topic = "burrow/temp/loft",
45         },
46         {
47                 .metric = "loft_fan",
48                 .help = "Fan speed in the loft (0-3)",
49                 .type = "gauge",
50                 .topic = "burrow/loft/fan"
51         },
52         {
53                 .metric = "loft_circ",
54                 .help = "Warm water circulation (0-1)",
55                 .type = "gauge",
56                 .topic = "burrow/loft/circulation"
57         },
58         {
59                 .metric = "temp_ursarium",
60                 .help = "Temperature in the Ursarium [degC]",
61                 .type = "gauge",
62                 .topic = "burrow/temp/ursarium"
63         },
64         {
65                 .metric = "temp_catarium",
66                 .help = "Temperature in the Catarium [degC]",
67                 .type = "gauge",
68                 .topic = "burrow/temp/catarium"
69         },
70         {
71                 .metric = "temp_garage",
72                 .help = "Temperature in the garage [degC]",
73                 .type = "gauge",
74                 .topic = "burrow/temp/garage"
75         },
76         {
77                 .metric = "temp_kitchen",
78                 .help = "Temperature in the kitchen [degC]",
79                 .type = "gauge",
80                 .topic = "burrow/temp/kitchen"
81         },
82         {
83                 .metric = "rh_ursarium",
84                 .help = "Relative humidity in the Ursarium [%]",
85                 .type = "gauge",
86                 .topic = "burrow/temp/ursarium-rh"
87         },
88         {
89                 .metric = "temp_catarium_clock",
90                 .help = "Temperature on Catarium clock [degC]",
91                 .type = "gauge",
92                 .topic = "burrow/temp/clock"
93         },
94         {
95                 .metric = "press_catarium_clock",
96                 .help = "Pressure on Catarium clock [Pa]",
97                 .type = "gauge",
98                 .topic = "burrow/pressure/clock"
99         },
100         {
101                 .metric = "air_inside_intake",
102                 .help = "Temperature of air intake from inside [degC]",
103                 .type = "gauge",
104                 .topic = "burrow/air/inside-intake",
105         },
106         {
107                 .metric = "air_inside_exhaust",
108                 .help = "Temperature of air exhaust to inside [degC]",
109                 .type = "gauge",
110                 .topic = "burrow/air/inside-exhaust",
111         },
112         {
113                 .metric = "air_outside_intake",
114                 .help = "Temperature of air intake from outside [degC]",
115                 .type = "gauge",
116                 .topic = "burrow/air/outside-intake",
117         },
118         {
119                 .metric = "air_outside_exhaust",
120                 .help = "Temperature of air exhaust to outside [degC]",
121                 .type = "gauge",
122                 .topic = "burrow/air/outside-exhaust",
123         },
124         {
125                 .metric = "air_mixed",
126                 .help = "Temperature of mixed air [degC]",
127                 .type = "gauge",
128                 .topic = "burrow/air/mixed",
129         },
130         {
131                 .metric = "air_inside_intake_avg",
132                 .help = "Average temperature of air intake from inside [degC]",
133                 .type = "gauge",
134                 .topic = "burrow/avg/air/inside-intake",
135         },
136         {
137                 .metric = "air_outside_intake_avg",
138                 .help = "Average temperature of air intake from outside [degC]",
139                 .type = "gauge",
140                 .topic = "burrow/avg/air/outside-intake",
141         },
142         {
143                 .metric = "air_bypass",
144                 .help = "Heat exchanger bypass (0-1)",
145                 .type = "gauge",
146                 .topic = "burrow/air/bypass"
147         },
148         {
149                 .metric = "air_fan_pwm",
150                 .help = "Heat exchanger fan PWM (0-255)",
151                 .type = "gauge",
152                 .topic = "burrow/air/exchanger-fan"
153         },
154         {
155                 // Common heading for all voltages
156                 .metric = "pm_voltage",
157                 .help = "Voltage between phases and neutral [V]",
158                 .type = "gauge",
159         },
160         {
161                 .metric = "pm_voltage{phase=\"L1N\"}",
162                 .topic = "burrow/power/voltage/l1n",
163         },
164         {
165                 .metric = "pm_voltage{phase=\"L2N\"}",
166                 .topic = "burrow/power/voltage/l2n",
167         },
168         {
169                 .metric = "pm_voltage{phase=\"L3N\"}",
170                 .topic = "burrow/power/voltage/l3n",
171         },
172         {
173                 // Common heading for all currents
174                 .metric = "pm_current",
175                 .help = "Current through phases [A]",
176                 .type = "gauge",
177         },
178         {
179                 .metric = "pm_current{phase=\"L1\"}",
180                 .topic = "burrow/power/current/l1",
181         },
182         {
183                 .metric = "pm_current{phase=\"L2\"}",
184                 .topic = "burrow/power/current/l2",
185         },
186         {
187                 .metric = "pm_current{phase=\"L3\"}",
188                 .topic = "burrow/power/current/l3",
189         },
190         {
191                 .metric = "pm_power",
192                 .help = "Total power [W]",
193                 .type = "gauge",
194                 .topic = "burrow/power/power",
195         },
196         {
197                 .metric = "pm_energy",
198                 .help = "Total energy [kWh]",
199                 .type = "gauge",
200                 .topic = "burrow/power/energy",
201         },
202         {
203                 .metric = "pm_reactive_power",
204                 .help = "Total reactive power [VAr]",
205                 .type = "gauge",
206                 .topic = "burrow/power/reactive/power",
207         },
208         {
209                 .metric = "pm_reactive_energy",
210                 .help = "Total reactive energy [kVArh]",
211                 .type = "gauge",
212                 .topic = "burrow/power/reactive/energy",
213         },
214 };
215
216 static char *attr_values[ARRAY_SIZE(attr_table)];
217 static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
218
219 static void mqtt_publish(const char *topic, const char *fmt, ...)
220 {
221         va_list args;
222         va_start(args, fmt);
223         char m[256];
224         int l = vsnprintf(m, sizeof(m), fmt, args);
225         if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
226                 msg(L_ERROR, "Mosquitto: publish failed");
227         va_end(args);
228 }
229
230 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
231 {
232         if (!status) {
233                 msg(L_DEBUG, "MQTT: Connection established, subscribing");
234                 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
235                         die("Mosquitto: subscribe failed");
236
237                 mqtt_publish("status/prometheus", "ok");
238         }
239 }
240
241 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
242 {
243         // msg(L_INFO, "MQTT(%d): %s", level, message);
244 }
245
246 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
247 {
248         char val[256];
249         if (m->payloadlen >= sizeof(val) - 1) {
250                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
251                 return;
252         }
253         memcpy(val, m->payload, m->payloadlen);
254         val[m->payloadlen] = 0;
255         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
256
257         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
258                 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
259                         pthread_mutex_lock(&attr_mutex);
260                         if (attr_values[i]) {
261                                 xfree(attr_values[i]);
262                                 attr_values[i] = NULL;
263                         }
264                         if (val[0])
265                                 attr_values[i] = xstrdup(val);
266                         pthread_mutex_unlock(&attr_mutex);
267                 }
268         }
269 }
270
271 struct http {
272         struct mempool *mp;
273         int sk;
274         char *iobuf;
275         uint iobuf_pos, iobuf_max;
276         char *linebuf;
277         struct fbpool fbpool;
278 };
279
280 #define IO_TIMEOUT 60000        // in ms
281 #define IOBUF_SIZE 1024
282 #define LINEBUF_SIZE 1024
283
284 static void http_send(struct http *http)
285 {
286         byte *buf = fbpool_end(&http->fbpool);
287         uint len = mp_size(http->mp, buf);
288
289         while (len) {
290                 struct pollfd pfd = { .fd = http->sk, .events = POLLOUT, .revents = 0 };
291                 int res = poll(&pfd, 1, IO_TIMEOUT);
292                 if (res < 0)
293                         die("Poll failed: %m");
294                 if (!res) {
295                         msg(L_ERROR, "HTTP write timed out");
296                         return;
297                 }
298
299                 res = write(http->sk, buf, len);
300                 if (res < 0) {
301                         msg(L_ERROR, "HTTP write failed: %m");
302                         return;
303                 }
304                 buf += res;
305                 len -= res;
306         }
307 }
308
309 static void http_error(struct http *http, const char *err)
310 {
311         msg(L_INFO, "HTTP error: %s", err);
312         fbpool_start(&http->fbpool, http->mp, 0);
313         bprintf(&http->fbpool.fb, "HTTP/1.1 %s\r\n", err);
314         http_send(http);
315 }
316
317 static int http_get_char(struct http *http) {
318         if (http->iobuf_pos >= http->iobuf_max) {
319                 struct pollfd pfd = { .fd = http->sk, .events = POLLIN, .revents = 0 };
320                 int res = poll(&pfd, 1, IO_TIMEOUT);
321                 if (res < 0)
322                         die("Poll failed: %m");
323                 if (!res) {
324                         msg(L_ERROR, "HTTP read timed out");
325                         return -1;
326                 }
327                 int len = read(http->sk, http->iobuf, IOBUF_SIZE);
328                 if (len < 0) {
329                         msg(L_ERROR, "HTTP read error: %m");
330                         return -1;
331                 }
332                 if (!len) {
333                         msg(L_ERROR, "HTTP connection closed");
334                         return -1;
335                 }
336                 http->iobuf_pos = 0;
337                 http->iobuf_max = len;
338         }
339         return http->iobuf[http->iobuf_pos++];
340 }
341
342 static bool http_get_line(struct http *http)
343 {
344         uint i = 0;
345         for (;;) {
346                 int c = http_get_char(http);
347                 if (c < 0)
348                         return false;
349                 if (c == '\n') {
350                         if (i > 0 && http->linebuf[i-1] == '\r')
351                                 i--;
352                         http->linebuf[i] = 0;
353                         return true;
354                 }
355                 if (i >= IOBUF_SIZE-1) {
356                         http_error(http, "400 Line too long");
357                         return false;
358                 }
359                 http->linebuf[i++] = c;
360         }
361 }
362
363 static void http_answer(struct fastbuf *fb)
364 {
365         char val[256], *w[2];
366         time_t now = time(NULL);
367
368         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
369                 const struct attr *a = &attr_table[i];
370
371                 pthread_mutex_lock(&attr_mutex);
372                 snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
373                 pthread_mutex_unlock(&attr_mutex);
374
375                 if (a->help)
376                         bprintf(fb, "# HELP %s %s\n", a->metric, a->help);
377                 if (a->type)
378                         bprintf(fb, "# TYPE %s %s\n", a->metric, a->type);
379
380                 if (!val[0])
381                         continue;
382                 int fields = str_wordsplit(val, w, ARRAY_SIZE(w));
383                 if (fields < 1)
384                         continue;
385                 if (fields >= 2) {
386                         time_t t = atoll(w[1]);
387                         if (t < now - MEASUREMENT_TIMEOUT)
388                                 continue;
389                 }
390
391                 if (a->topic) {
392                         bprintf(fb, "%s %s", a->metric, val);
393 #if 0
394                         // Prometheus does not like our timestamps -- why?
395                         if (fields >= 2)
396                                 bprintf(fb, " %s", w[1]);
397 #endif
398                         bputc(fb, '\n');
399                 }
400         }
401 }
402
403 static void http_connection(struct http *http)
404 {
405         http->iobuf = mp_alloc(http->mp, IOBUF_SIZE);
406         http->linebuf = mp_alloc(http->mp, LINEBUF_SIZE);
407         fbpool_init(&http->fbpool);
408
409         if (!http_get_line(http))
410                 return;
411         // msg(L_DEBUG, "Request line: <%s>", http->linebuf);
412         char *words[3];
413         if (str_wordsplit(http->linebuf, words, 3) != 3) {
414                 http_error(http, "400 Invalid request line");
415                 return;
416         }
417         if (strcmp(words[0], "GET")) {
418                 http_error(http, "501 Method not implemented");
419                 return;
420         }
421
422         for (;;) {
423                 if (!http_get_line(http))
424                         return;
425                 if (!http->linebuf[0])
426                         break;
427                 // msg(L_DEBUG, "Header line: <%s>", http->linebuf);
428         }
429
430         fbpool_start(&http->fbpool, http->mp, 0);
431         struct fastbuf *fb = &http->fbpool.fb;
432         bprintf(fb, "HTTP/1.1 200 OK\r\n");
433         bprintf(fb, "Content-type: text/plain; version=0.0.4\r\n");
434         bprintf(fb, "\r\n");
435         http_answer(fb);
436         http_send(http);
437 }
438
439 static int use_daemon;
440 static int use_debug;
441
442 static struct opt_section options = {
443         OPT_ITEMS {
444                 OPT_HELP("A daemon for transferring MQTT data to Prometheus"),
445                 OPT_HELP(""),
446                 OPT_HELP("Options:"),
447                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
448                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
449                 OPT_HELP_OPTION,
450                 OPT_CONF_OPTIONS,
451                 OPT_END
452         }
453 };
454
455 int main(int argc UNUSED, char **argv)
456 {
457         log_init(argv[0]);
458         opt_parse(&options, argv+1);
459
460         if (use_daemon) {
461                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
462                 log_set_default_stream(ls);
463         }
464         if (!use_debug)
465                 log_default_stream()->levels &= ~(1U << L_DEBUG);
466
467         mosquitto_lib_init();
468         mosq = mosquitto_new("prometheus", 1, NULL);
469         if (!mosq)
470                 die("Mosquitto: initialization failed");
471
472         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
473         mosquitto_log_callback_set(mosq, mqtt_log_callback);
474         mosquitto_message_callback_set(mosq, mqtt_msg_callback);
475
476         if (mosquitto_will_set(mosq, "status/prometheus", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
477                 die("Mosquitto: unable to set will");
478
479         if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
480                 die("Mosquitto: connect failed");
481
482         if (mosquitto_loop_start(mosq))
483                 die("Mosquitto: cannot start service thread");
484
485         int listen_sk = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
486         if (listen_sk < 0)
487                 die("Cannot create listening socket: %m");
488
489         int one = 1;
490         if (setsockopt(listen_sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
491                 die("Cannot set SO_REUSEADDR: %m");
492
493         struct sockaddr_in sin = { .sin_family = AF_INET, .sin_port = htons(1422), .sin_addr = INADDR_ANY };
494         if (bind(listen_sk, (const struct sockaddr *) &sin, sizeof(sin)) < 0)
495                 die("Cannot bind listening socket: %m");
496
497         if (listen(listen_sk, 64) < 0)
498                 die("Cannot listen: %m");
499
500         for (;;) {
501                 int sk = accept(listen_sk, NULL, NULL);
502                 if (sk < 0) {
503                         msg(L_ERROR, "HTTP accept failed: %m");
504                         continue;
505                 }
506                 msg(L_DEBUG, "HTTP accepted connection");
507                 struct mempool *mp = mp_new(4096);
508                 struct http *http = mp_alloc_zero(mp, sizeof(*http));
509                 http->mp = mp;
510                 http->sk = sk;
511                 http_connection(http);
512                 mp_delete(mp);
513                 close(sk);
514         }
515 }