2 * A gateway between MQTT and InfluxDB
4 * (c) 2018--2020 Martin Mares <mj@ucw.cz>
8 #include <ucw/fastbuf.h>
11 #include <ucw/string.h>
21 #include <curl/curl.h>
22 #include <mosquitto.h>
24 #define MEASUREMENT_TIMEOUT 120
26 #define INFLUX_URL "http://localhost:8086/write?db=burrow&precision=s"
27 #define INFLUX_TIMEOUT 30
28 #define INFLUX_VERBOSE 0
29 #define INFLUX_INTERVAL 60
31 static struct mosquitto *mosq;
35 const char *value_name;
41 static const struct attr attr_table[] = {
43 .metric = "temp,where=loft",
45 .topic = "burrow/temp/loft",
48 .metric = "temp,where=ursarium",
50 .topic = "burrow/temp/ursarium"
53 .metric = "temp,where=catarium",
55 .topic = "burrow/temp/catarium"
58 .metric = "temp,where=garage",
60 .topic = "burrow/temp/garage"
63 .metric = "temp,where=terarium",
65 .topic = "burrow/temp/terarium"
68 .metric = "rh,where=ursarium",
70 .topic = "burrow/temp/ursarium-rh"
73 .metric = "temp,where=catarium_clock",
75 .topic = "burrow/temp/clock"
78 .metric = "pressure,where=catarium_clock",
80 .topic = "burrow/pressure/clock"
83 .metric = "air,where=inside_intake",
85 .topic = "burrow/air/inside-intake",
88 .metric = "air,where=inside_exhaust",
90 .topic = "burrow/air/inside-exhaust",
93 .metric = "air,where=outside_intake",
95 .topic = "burrow/air/outside-intake",
98 .metric = "air,where=outside_exhaust",
100 .topic = "burrow/air/outside-exhaust",
103 .metric = "air,where=mixed",
105 .topic = "burrow/air/mixed",
108 .metric = "air,where=inside_intake_avg",
110 .topic = "burrow/avg/air/inside-intake",
113 .metric = "air,where=outside_intake_avg",
115 .topic = "burrow/avg/air/outside-intake",
118 .metric = "air_bypass",
120 .topic = "burrow/air/bypass"
123 .metric = "air_fan_pwm",
125 .topic = "burrow/air/exchanger-fan"
128 .metric = "loft_fan",
129 .topic = "burrow/loft/fan"
132 .metric = "water_circ",
134 .topic = "burrow/loft/circulation"
137 .metric = "pm_voltage,phase=L1N",
139 .topic = "burrow/power/voltage/l1n",
142 .metric = "pm_voltage,phase=L2N",
144 .topic = "burrow/power/voltage/l2n",
147 .metric = "pm_voltage,phase=L3N",
149 .topic = "burrow/power/voltage/l3n",
152 .metric = "pm_current,phase=L1",
154 .topic = "burrow/power/current/l1",
157 .metric = "pm_current,phase=L2",
159 .topic = "burrow/power/current/l2",
162 .metric = "pm_current,phase=L3",
164 .topic = "burrow/power/current/l3",
167 .metric = "pm_power",
169 .topic = "burrow/power/power",
172 .metric = "pm_energy",
174 .topic = "burrow/power/energy",
177 .metric = "pm_reactive_power",
179 .topic = "burrow/power/reactive/power",
182 .metric = "pm_reactive_energy",
183 .value_name = "kVArh",
184 .topic = "burrow/power/reactive/energy",
187 .metric = "heating_water_pressure",
189 .topic = "burrow/heating/water-pressure",
193 .metric = "heating_outside_temp",
195 .topic = "burrow/heating/outside-temp",
199 .metric = "heating_room_temp,circuit=1",
201 .topic = "burrow/heating/circuit1/room-temp",
205 .metric = "heating_mix-temp,circuit=1",
207 .topic = "burrow/heating/circuit1/mix-temp",
211 .metric = "heating_mix-valve,circuit=1",
213 .topic = "burrow/heating/circuit1/mix-valve",
217 .metric = "heating_active,circuit=1",
219 .topic = "burrow/heating/circuit1/active",
223 .metric = "heating_pump_active,circuit=1",
225 .topic = "burrow/heating/circuit1/pump",
229 .metric = "heating_room_temp,circuit=2",
231 .topic = "burrow/heating/circuit2/room-temp",
235 .metric = "heating_active,circuit=2",
237 .topic = "burrow/heating/circuit2/active",
241 .metric = "heating_active,circuit=water",
243 .topic = "burrow/heating/water/active",
247 .metric = "heating_error",
249 .topic = "burrow/heating/error",
253 .metric = "heating_clock",
255 .topic = "burrow/heating/clock",
263 static char *attr_values[ARRAY_SIZE(attr_table)];
264 static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
266 static void mqtt_publish(const char *topic, const char *fmt, ...)
271 int l = vsnprintf(m, sizeof(m), fmt, args);
272 if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
273 msg(L_ERROR, "Mosquitto: publish failed");
277 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
280 msg(L_DEBUG, "MQTT: Connection established, subscribing");
281 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
282 die("Mosquitto: subscribe failed");
284 mqtt_publish("status/influx", "ok");
288 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
290 // msg(L_INFO, "MQTT(%d): %s", level, message);
293 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
296 if (m->payloadlen >= sizeof(val) - 1) {
297 msg(L_ERROR, "Invalid value for topic %s", m->topic);
300 memcpy(val, m->payload, m->payloadlen);
301 val[m->payloadlen] = 0;
302 msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
304 for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
305 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
306 pthread_mutex_lock(&attr_mutex);
307 if (attr_values[i]) {
308 xfree(attr_values[i]);
309 attr_values[i] = NULL;
312 attr_values[i] = xstrdup(val);
313 pthread_mutex_unlock(&attr_mutex);
322 static struct fastbuf *influx_fb;
323 static struct fastbuf *influx_err_fb;
325 static size_t influx_write_callback(char *ptr, size_t size, size_t nmemb, void *userdata UNUSED)
328 for (size_t i=0; i<nmemb; i++) {
329 int c = *(byte *)ptr;
335 else if (c < 0x20 || c > 0x7e)
337 bputc(influx_err_fb, c);
342 static void influx_init(void)
344 curl = curl_easy_init();
346 die("libcurl init failed");
348 curl_easy_setopt(curl, CURLOPT_URL, INFLUX_URL);
349 curl_easy_setopt(curl, CURLOPT_VERBOSE, (long) INFLUX_VERBOSE);
350 curl_easy_setopt(curl, CURLOPT_TIMEOUT, (long) INFLUX_TIMEOUT);
351 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, influx_write_callback);
353 influx_fb = fbgrow_create(4096);
354 influx_err_fb = fbgrow_create(256);
357 static struct fastbuf *influx_write_start(void)
359 fbgrow_reset(influx_fb);
363 static void influx_write_flush(void)
367 fbgrow_get_buf(influx_fb, &buf);
368 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf);
370 msg(L_DEBUG, "InfluxDB: Sending data (%u bytes)", strlen(buf));
372 fprintf(stderr, "%s", buf);
374 fbgrow_reset(influx_err_fb);
376 CURLcode code = curl_easy_perform(curl);
378 if (code != CURLE_OK) {
379 msg(L_ERROR, "Write to InfluxDB failed: %s", curl_easy_strerror(code));
384 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc);
385 if (rc != 200 && rc != 204) {
386 bputc(influx_err_fb, 0);
388 fbgrow_get_buf(influx_err_fb, &err);
389 msg(L_DEBUG, "InfluxDB HTTP error %ld: %s [...]", rc, err);
395 static int use_daemon;
396 static int use_debug;
398 static struct opt_section options = {
400 OPT_HELP("A daemon for transferring MQTT data to InfluxDB"),
402 OPT_HELP("Options:"),
403 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
404 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
411 int main(int argc UNUSED, char **argv)
414 opt_parse(&options, argv+1);
417 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
418 log_set_default_stream(ls);
421 log_default_stream()->levels &= ~(1U << L_DEBUG);
423 mosquitto_lib_init();
424 mosq = mosquitto_new("influx", 1, NULL);
426 die("Mosquitto: initialization failed");
428 mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
429 mosquitto_log_callback_set(mosq, mqtt_log_callback);
430 mosquitto_message_callback_set(mosq, mqtt_msg_callback);
432 if (mosquitto_will_set(mosq, "status/influx", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
433 die("Mosquitto: unable to set will");
435 if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS)
436 die("Mosquitto: unable to set TLS parameters");
438 if (mosquitto_connect_async(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS)
439 die("Mosquitto: connect failed");
441 if (mosquitto_loop_start(mosq))
442 die("Mosquitto: cannot start service thread");
447 struct fastbuf *f = influx_write_start();
448 char val[256], *w[2];
449 time_t now = time(NULL);
451 for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
452 const struct attr *a = &attr_table[i];
454 pthread_mutex_lock(&attr_mutex);
455 snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
456 pthread_mutex_unlock(&attr_mutex);
460 int fields = str_wordsplit(val, w, ARRAY_SIZE(w));
464 time_t t = atoll(w[1]);
465 uint timeout = a->timeout ? : MEASUREMENT_TIMEOUT;
466 if (t < now - timeout)
471 bprintf(f, "%s %s=%s\n", a->metric, a->value_name, val);
473 bprintf(f, "%s %s=\"%s\"\n", a->metric, a->value_name, val);
475 influx_write_flush();
476 sleep(INFLUX_INTERVAL);