--- /dev/null
+/*
+ * A gateway between MQTT and InfluxDB
+ *
+ * (c) 2018--2019 Martin Mares <mj@ucw.cz>
+ */
+
+#include <ucw/lib.h>
+#include <ucw/fastbuf.h>
+#include <ucw/log.h>
+#include <ucw/opt.h>
+#include <ucw/string.h>
+
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <syslog.h>
+#include <time.h>
+#include <unistd.h>
+
+#include <curl/curl.h>
+#include <mosquitto.h>
+
+#define MEASUREMENT_TIMEOUT 120
+
+#define INFLUX_URL "http://localhost:8086/write?db=burrow&precision=s"
+#define INFLUX_TIMEOUT 30
+#define INFLUX_VERBOSE 0
+#define INFLUX_INTERVAL 60
+
+static struct mosquitto *mosq;
+
+struct attr {
+ const char *metric;
+ const char *value_name;
+ const char *topic;
+};
+
+static const struct attr attr_table[] = {
+ {
+ .metric = "temp,where=loft",
+ .value_name = "t",
+ .topic = "burrow/temp/loft",
+ },
+ {
+ .metric = "temp,where=ursarium",
+ .value_name = "t",
+ .topic = "burrow/temp/ursarium"
+ },
+ {
+ .metric = "temp,where=catarium",
+ .value_name = "t",
+ .topic = "burrow/temp/catarium"
+ },
+ {
+ .metric = "temp,where=garage",
+ .value_name = "t",
+ .topic = "burrow/temp/garage"
+ },
+ {
+ .metric = "temp,where=kitchen",
+ .value_name = "t",
+ .topic = "burrow/temp/kitchen"
+ },
+ {
+ .metric = "rh,where=ursarium",
+ .value_name = "rh",
+ .topic = "burrow/temp/ursarium-rh"
+ },
+ {
+ .metric = "temp,where=catarium_clock",
+ .value_name = "t",
+ .topic = "burrow/temp/clock"
+ },
+ {
+ .metric = "pressure,where=catarium_clock",
+ .value_name = "pa",
+ .topic = "burrow/pressure/clock"
+ },
+ {
+ .metric = "air,where=inside_intake",
+ .value_name = "t",
+ .topic = "burrow/air/inside-intake",
+ },
+ {
+ .metric = "air,where=inside_exhaust",
+ .value_name = "t",
+ .topic = "burrow/air/inside-exhaust",
+ },
+ {
+ .metric = "air,where=outside_intake",
+ .value_name = "t",
+ .topic = "burrow/air/outside-intake",
+ },
+ {
+ .metric = "air,where=outside_exhaust",
+ .value_name = "t",
+ .topic = "burrow/air/outside-exhaust",
+ },
+ {
+ .metric = "air,where=mixed",
+ .value_name = "t",
+ .topic = "burrow/air/mixed",
+ },
+ {
+ .metric = "air,where=inside_intake_avg",
+ .value_name = "t",
+ .topic = "burrow/avg/air/inside-intake",
+ },
+ {
+ .metric = "air,where=outside_intake_avg",
+ .value_name = "t",
+ .topic = "burrow/avg/air/outside-intake",
+ },
+ {
+ .metric = "air_bypass",
+ .value_name = "on",
+ .topic = "burrow/air/bypass"
+ },
+ {
+ .metric = "air_fan_pwm",
+ .value_name = "pwm",
+ .topic = "burrow/air/exchanger-fan"
+ },
+#if 0
+ {
+ .metric = "loft_fan",
+ .topic = "burrow/loft/fan"
+ },
+#endif
+ {
+ .metric = "water_circ",
+ .value_name = "on",
+ .topic = "burrow/loft/circulation"
+ },
+ {
+ .metric = "pm_voltage,phase=L1N",
+ .value_name = "V",
+ .topic = "burrow/power/voltage/l1n",
+ },
+ {
+ .metric = "pm_voltage,phase=L2N",
+ .value_name = "V",
+ .topic = "burrow/power/voltage/l2n",
+ },
+ {
+ .metric = "pm_voltage,phase=L3N",
+ .value_name = "V",
+ .topic = "burrow/power/voltage/l3n",
+ },
+ {
+ .metric = "pm_current,phase=L1",
+ .value_name = "A",
+ .topic = "burrow/power/current/l1",
+ },
+ {
+ .metric = "pm_current,phase=L2",
+ .value_name = "A",
+ .topic = "burrow/power/current/l2",
+ },
+ {
+ .metric = "pm_current,phase=L3",
+ .value_name = "A",
+ .topic = "burrow/power/current/l3",
+ },
+ {
+ .metric = "pm_power",
+ .value_name = "W",
+ .topic = "burrow/power/power",
+ },
+ {
+ .metric = "pm_energy",
+ .value_name = "kWh",
+ .topic = "burrow/power/energy",
+ },
+ {
+ .metric = "pm_reactive_power",
+ .value_name = "VAr",
+ .topic = "burrow/power/reactive/power",
+ },
+ {
+ .metric = "pm_reactive_energy",
+ .value_name = "kVArh",
+ .topic = "burrow/power/reactive/energy",
+ },
+};
+
+/*** MQTT ***/
+
+static char *attr_values[ARRAY_SIZE(attr_table)];
+static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void mqtt_publish(const char *topic, const char *fmt, ...)
+{
+ va_list args;
+ va_start(args, fmt);
+ char m[256];
+ int l = vsnprintf(m, sizeof(m), fmt, args);
+ if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
+ msg(L_ERROR, "Mosquitto: publish failed");
+ va_end(args);
+}
+
+static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
+{
+ if (!status) {
+ msg(L_DEBUG, "MQTT: Connection established, subscribing");
+ if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: subscribe failed");
+
+ mqtt_publish("status/influx", "ok");
+ }
+}
+
+static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
+{
+ // msg(L_INFO, "MQTT(%d): %s", level, message);
+}
+
+static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
+{
+ char val[256];
+ if (m->payloadlen >= sizeof(val) - 1) {
+ msg(L_ERROR, "Invalid value for topic %s", m->topic);
+ return;
+ }
+ memcpy(val, m->payload, m->payloadlen);
+ val[m->payloadlen] = 0;
+ msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
+
+ for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
+ if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
+ pthread_mutex_lock(&attr_mutex);
+ if (attr_values[i]) {
+ xfree(attr_values[i]);
+ attr_values[i] = NULL;
+ }
+ if (val[0])
+ attr_values[i] = xstrdup(val);
+ pthread_mutex_unlock(&attr_mutex);
+ }
+ }
+}
+
+/*** InfluxDB ***/
+
+static CURL *curl;
+
+static struct fastbuf *influx_fb;
+static struct fastbuf *influx_err_fb;
+
+static size_t influx_write_callback(char *ptr, size_t size, size_t nmemb, void *userdata UNUSED)
+{
+ ASSERT(size == 1);
+ for (size_t i=0; i<nmemb; i++) {
+ int c = *(byte *)ptr;
+ ptr++;
+ if (c == '\r')
+ continue;
+ if (c == '\n')
+ c = ' ';
+ else if (c < 0x20 || c > 0x7e)
+ c = '?';
+ bputc(influx_err_fb, c);
+ }
+ return nmemb;
+}
+
+static void influx_init(void)
+{
+ curl = curl_easy_init();
+ if (!curl)
+ die("libcurl init failed");
+
+ curl_easy_setopt(curl, CURLOPT_URL, INFLUX_URL);
+ curl_easy_setopt(curl, CURLOPT_VERBOSE, (long) INFLUX_VERBOSE);
+ curl_easy_setopt(curl, CURLOPT_TIMEOUT, (long) INFLUX_TIMEOUT);
+ curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, influx_write_callback);
+
+ influx_fb = fbgrow_create(4096);
+ influx_err_fb = fbgrow_create(256);
+}
+
+static struct fastbuf *influx_write_start(void)
+{
+ fbgrow_reset(influx_fb);
+ return influx_fb;
+}
+
+static void influx_write_flush(void)
+{
+ bputc(influx_fb, 0);
+ byte *buf;
+ fbgrow_get_buf(influx_fb, &buf);
+ curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf);
+
+ msg(L_DEBUG, "InfluxDB: Sending data (%u bytes)", strlen(buf));
+ if (INFLUX_VERBOSE)
+ fprintf(stderr, "%s", buf);
+
+ fbgrow_reset(influx_err_fb);
+
+ CURLcode code = curl_easy_perform(curl);
+
+ if (code != CURLE_OK) {
+ msg(L_ERROR, "Write to InfluxDB failed: %s", curl_easy_strerror(code));
+ return;
+ }
+
+ long rc = 0;
+ curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc);
+ if (rc != 200 && rc != 204) {
+ bputc(influx_err_fb, 0);
+ byte *err;
+ fbgrow_get_buf(influx_err_fb, &err);
+ msg(L_DEBUG, "InfluxDB HTTP error %ld: %s [...]", rc, err);
+ }
+}
+
+/*** Main ***/
+
+static int use_daemon;
+static int use_debug;
+
+static struct opt_section options = {
+ OPT_ITEMS {
+ OPT_HELP("A daemon for transferring MQTT data to InfluxDB"),
+ OPT_HELP(""),
+ OPT_HELP("Options:"),
+ OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
+ OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
+ OPT_HELP_OPTION,
+ OPT_CONF_OPTIONS,
+ OPT_END
+ }
+};
+
+int main(int argc UNUSED, char **argv)
+{
+ log_init(argv[0]);
+ opt_parse(&options, argv+1);
+
+ if (use_daemon) {
+ struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
+ log_set_default_stream(ls);
+ }
+ if (!use_debug)
+ log_default_stream()->levels &= ~(1U << L_DEBUG);
+
+ mosquitto_lib_init();
+ mosq = mosquitto_new("influx", 1, NULL);
+ if (!mosq)
+ die("Mosquitto: initialization failed");
+
+ mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
+ mosquitto_log_callback_set(mosq, mqtt_log_callback);
+ mosquitto_message_callback_set(mosq, mqtt_msg_callback);
+
+ if (mosquitto_will_set(mosq, "status/influx", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: unable to set will");
+
+ if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: connect failed");
+
+ if (mosquitto_loop_start(mosq))
+ die("Mosquitto: cannot start service thread");
+
+ influx_init();
+
+ for (;;) {
+ struct fastbuf *f = influx_write_start();
+ char val[256], *w[2];
+ time_t now = time(NULL);
+
+ for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
+ const struct attr *a = &attr_table[i];
+
+ pthread_mutex_lock(&attr_mutex);
+ snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
+ pthread_mutex_unlock(&attr_mutex);
+
+ if (!val[0])
+ continue;
+ int fields = str_wordsplit(val, w, ARRAY_SIZE(w));
+ if (fields < 1)
+ continue;
+ if (fields >= 2) {
+ time_t t = atoll(w[1]);
+ if (t < now - MEASUREMENT_TIMEOUT)
+ continue;
+ }
+
+ bprintf(f, "%s %s=%s\n", a->metric, a->value_name, val);
+ }
+ influx_write_flush();
+ sleep(INFLUX_INTERVAL);
+ }
+}