2 * A gateway between MQTT and Prometheus
4 * (c) 2018--2019 Martin Mares <mj@ucw.cz>
8 #include <ucw/fastbuf.h>
10 #include <ucw/mempool.h>
12 #include <ucw/string.h>
14 #include <netinet/in.h>
21 #include <sys/socket.h>
26 #include <mosquitto.h>
28 #define MEASUREMENT_TIMEOUT 120
30 static struct mosquitto *mosq;
39 static const struct attr attr_table[] = {
41 .metric = "temp_loft",
42 .help = "Temperature in the loft [degC]",
44 .topic = "burrow/temp/loft",
48 .help = "Fan speed in the loft (0-3)",
50 .topic = "burrow/loft/fan"
53 .metric = "loft_circ",
54 .help = "Warm water circulation (0-1)",
56 .topic = "burrow/loft/circulation"
59 .metric = "temp_ursarium",
60 .help = "Temperature in the Ursarium [degC]",
62 .topic = "burrow/temp/ursarium"
65 .metric = "temp_catarium",
66 .help = "Temperature in the Catarium [degC]",
68 .topic = "burrow/temp/catarium"
71 .metric = "temp_garage",
72 .help = "Temperature in the garage [degC]",
74 .topic = "burrow/temp/garage"
77 .metric = "temp_kitchen",
78 .help = "Temperature in the kitchen [degC]",
80 .topic = "burrow/temp/kitchen"
83 .metric = "rh_ursarium",
84 .help = "Relative humidity in the Ursarium [%]",
86 .topic = "burrow/temp/ursarium-rh"
89 .metric = "temp_catarium_clock",
90 .help = "Temperature on Catarium clock [degC]",
92 .topic = "burrow/temp/clock"
95 .metric = "press_catarium_clock",
96 .help = "Pressure on Catarium clock [Pa]",
98 .topic = "burrow/pressure/clock"
101 .metric = "air_inside_intake",
102 .help = "Temperature of air intake from inside [degC]",
104 .topic = "burrow/air/inside-intake",
107 .metric = "air_inside_exhaust",
108 .help = "Temperature of air exhaust to inside [degC]",
110 .topic = "burrow/air/inside-exhaust",
113 .metric = "air_outside_intake",
114 .help = "Temperature of air intake from outside [degC]",
116 .topic = "burrow/air/outside-intake",
119 .metric = "air_outside_exhaust",
120 .help = "Temperature of air exhaust to outside [degC]",
122 .topic = "burrow/air/outside-exhaust",
125 .metric = "air_mixed",
126 .help = "Temperature of mixed air [degC]",
128 .topic = "burrow/air/mixed",
131 .metric = "air_inside_intake_avg",
132 .help = "Average temperature of air intake from inside [degC]",
134 .topic = "burrow/avg/air/inside-intake",
137 .metric = "air_outside_intake_avg",
138 .help = "Average temperature of air intake from outside [degC]",
140 .topic = "burrow/avg/air/outside-intake",
143 .metric = "air_bypass",
144 .help = "Heat exchanger bypass (0-1)",
146 .topic = "burrow/air/bypass"
149 .metric = "air_fan_pwm",
150 .help = "Heat exchanger fan PWM (0-255)",
152 .topic = "burrow/air/exchanger-fan"
155 // Common heading for all voltages
156 .metric = "pm_voltage",
157 .help = "Voltage between phases and neutral [V]",
161 .metric = "pm_voltage{phase=\"L1N\"}",
162 .topic = "burrow/power/voltage/l1n",
165 .metric = "pm_voltage{phase=\"L2N\"}",
166 .topic = "burrow/power/voltage/l2n",
169 .metric = "pm_voltage{phase=\"L3N\"}",
170 .topic = "burrow/power/voltage/l3n",
173 // Common heading for all currents
174 .metric = "pm_current",
175 .help = "Current through phases [A]",
179 .metric = "pm_current{phase=\"L1\"}",
180 .topic = "burrow/power/current/l1",
183 .metric = "pm_current{phase=\"L2\"}",
184 .topic = "burrow/power/current/l2",
187 .metric = "pm_current{phase=\"L3\"}",
188 .topic = "burrow/power/current/l3",
191 .metric = "pm_power",
192 .help = "Total power [W]",
194 .topic = "burrow/power/power",
197 .metric = "pm_energy",
198 .help = "Total energy [kWh]",
200 .topic = "burrow/power/energy",
203 .metric = "pm_reactive_power",
204 .help = "Total reactive power [VAr]",
206 .topic = "burrow/power/reactive/power",
209 .metric = "pm_reactive_energy",
210 .help = "Total reactive energy [kVArh]",
212 .topic = "burrow/power/reactive/energy",
216 static char *attr_values[ARRAY_SIZE(attr_table)];
217 static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
219 static void mqtt_publish(const char *topic, const char *fmt, ...)
224 int l = vsnprintf(m, sizeof(m), fmt, args);
225 if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
226 msg(L_ERROR, "Mosquitto: publish failed");
230 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
233 msg(L_DEBUG, "MQTT: Connection established, subscribing");
234 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
235 die("Mosquitto: subscribe failed");
237 mqtt_publish("status/prometheus", "ok");
241 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
243 // msg(L_INFO, "MQTT(%d): %s", level, message);
246 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
249 if (m->payloadlen >= sizeof(val) - 1) {
250 msg(L_ERROR, "Invalid value for topic %s", m->topic);
253 memcpy(val, m->payload, m->payloadlen);
254 val[m->payloadlen] = 0;
255 msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
257 for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
258 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
259 pthread_mutex_lock(&attr_mutex);
260 if (attr_values[i]) {
261 xfree(attr_values[i]);
262 attr_values[i] = NULL;
265 attr_values[i] = xstrdup(val);
266 pthread_mutex_unlock(&attr_mutex);
275 uint iobuf_pos, iobuf_max;
277 struct fbpool fbpool;
280 #define IO_TIMEOUT 60000 // in ms
281 #define IOBUF_SIZE 1024
282 #define LINEBUF_SIZE 1024
284 static void http_send(struct http *http)
286 byte *buf = fbpool_end(&http->fbpool);
287 uint len = mp_size(http->mp, buf);
290 struct pollfd pfd = { .fd = http->sk, .events = POLLOUT, .revents = 0 };
291 int res = poll(&pfd, 1, IO_TIMEOUT);
293 die("Poll failed: %m");
295 msg(L_ERROR, "HTTP write timed out");
299 res = write(http->sk, buf, len);
301 msg(L_ERROR, "HTTP write failed: %m");
309 static void http_error(struct http *http, const char *err)
311 msg(L_INFO, "HTTP error: %s", err);
312 fbpool_start(&http->fbpool, http->mp, 0);
313 bprintf(&http->fbpool.fb, "HTTP/1.1 %s\r\n", err);
317 static int http_get_char(struct http *http) {
318 if (http->iobuf_pos >= http->iobuf_max) {
319 struct pollfd pfd = { .fd = http->sk, .events = POLLIN, .revents = 0 };
320 int res = poll(&pfd, 1, IO_TIMEOUT);
322 die("Poll failed: %m");
324 msg(L_ERROR, "HTTP read timed out");
327 int len = read(http->sk, http->iobuf, IOBUF_SIZE);
329 msg(L_ERROR, "HTTP read error: %m");
333 msg(L_ERROR, "HTTP connection closed");
337 http->iobuf_max = len;
339 return http->iobuf[http->iobuf_pos++];
342 static bool http_get_line(struct http *http)
346 int c = http_get_char(http);
350 if (i > 0 && http->linebuf[i-1] == '\r')
352 http->linebuf[i] = 0;
355 if (i >= IOBUF_SIZE-1) {
356 http_error(http, "400 Line too long");
359 http->linebuf[i++] = c;
363 static void http_answer(struct fastbuf *fb)
365 char val[256], *w[2];
366 time_t now = time(NULL);
368 for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
369 const struct attr *a = &attr_table[i];
371 pthread_mutex_lock(&attr_mutex);
372 snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
373 pthread_mutex_unlock(&attr_mutex);
376 bprintf(fb, "# HELP %s %s\n", a->metric, a->help);
378 bprintf(fb, "# TYPE %s %s\n", a->metric, a->type);
382 int fields = str_wordsplit(val, w, ARRAY_SIZE(w));
386 time_t t = atoll(w[1]);
387 if (t < now - MEASUREMENT_TIMEOUT)
392 bprintf(fb, "%s %s", a->metric, val);
394 // Prometheus does not like our timestamps -- why?
396 bprintf(fb, " %s", w[1]);
403 static void http_connection(struct http *http)
405 http->iobuf = mp_alloc(http->mp, IOBUF_SIZE);
406 http->linebuf = mp_alloc(http->mp, LINEBUF_SIZE);
407 fbpool_init(&http->fbpool);
409 if (!http_get_line(http))
411 // msg(L_DEBUG, "Request line: <%s>", http->linebuf);
413 if (str_wordsplit(http->linebuf, words, 3) != 3) {
414 http_error(http, "400 Invalid request line");
417 if (strcmp(words[0], "GET")) {
418 http_error(http, "501 Method not implemented");
423 if (!http_get_line(http))
425 if (!http->linebuf[0])
427 // msg(L_DEBUG, "Header line: <%s>", http->linebuf);
430 fbpool_start(&http->fbpool, http->mp, 0);
431 struct fastbuf *fb = &http->fbpool.fb;
432 bprintf(fb, "HTTP/1.1 200 OK\r\n");
433 bprintf(fb, "Content-type: text/plain; version=0.0.4\r\n");
439 static int use_daemon;
440 static int use_debug;
442 static struct opt_section options = {
444 OPT_HELP("A daemon for transferring MQTT data to Prometheus"),
446 OPT_HELP("Options:"),
447 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
448 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
455 int main(int argc UNUSED, char **argv)
458 opt_parse(&options, argv+1);
461 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
462 log_set_default_stream(ls);
465 log_default_stream()->levels &= ~(1U << L_DEBUG);
467 mosquitto_lib_init();
468 mosq = mosquitto_new("prometheus", 1, NULL);
470 die("Mosquitto: initialization failed");
472 mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
473 mosquitto_log_callback_set(mosq, mqtt_log_callback);
474 mosquitto_message_callback_set(mosq, mqtt_msg_callback);
476 if (mosquitto_will_set(mosq, "status/prometheus", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
477 die("Mosquitto: unable to set will");
479 if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
480 die("Mosquitto: connect failed");
482 if (mosquitto_loop_start(mosq))
483 die("Mosquitto: cannot start service thread");
485 int listen_sk = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
487 die("Cannot create listening socket: %m");
490 if (setsockopt(listen_sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
491 die("Cannot set SO_REUSEADDR: %m");
493 struct sockaddr_in sin = { .sin_family = AF_INET, .sin_port = htons(1422), .sin_addr = INADDR_ANY };
494 if (bind(listen_sk, (const struct sockaddr *) &sin, sizeof(sin)) < 0)
495 die("Cannot bind listening socket: %m");
497 if (listen(listen_sk, 64) < 0)
498 die("Cannot listen: %m");
501 int sk = accept(listen_sk, NULL, NULL);
503 msg(L_ERROR, "HTTP accept failed: %m");
506 msg(L_DEBUG, "HTTP accepted connection");
507 struct mempool *mp = mp_new(4096);
508 struct http *http = mp_alloc_zero(mp, sizeof(*http));
511 http_connection(http);