]> mj.ucw.cz Git - home-hw.git/commitdiff
burrow-influx
authorMartin Mares <mj@ucw.cz>
Sat, 2 Nov 2019 14:59:31 +0000 (15:59 +0100)
committerMartin Mares <mj@ucw.cz>
Sat, 2 Nov 2019 14:59:31 +0000 (15:59 +0100)
influx/Makefile [new file with mode: 0644]
influx/burrow-influx.c [new file with mode: 0644]

diff --git a/influx/Makefile b/influx/Makefile
new file mode 100644 (file)
index 0000000..b08120e
--- /dev/null
@@ -0,0 +1,16 @@
+PC=pkg-config
+UCW_CFLAGS := $(shell $(PC) --cflags libucw)
+UCW_LIBS := $(shell $(PC) --libs libucw)
+
+CURL_CFLAGS := $(shell curl-config --cflags)
+CURL_LIBS := $(shell curl-config --libs)
+
+CFLAGS=$(UCW_CFLAGS) $(CURL_CFLAGS) -std=gnu99
+LDFLAGS=$(UCW_LIBS) $(CURL_LIBS) -lmosquitto
+
+all: burrow-influx
+
+burrow-influx: burrow-influx.c
+
+install: all
+       install burrow-influx /usr/local/sbin/
diff --git a/influx/burrow-influx.c b/influx/burrow-influx.c
new file mode 100644 (file)
index 0000000..9521300
--- /dev/null
@@ -0,0 +1,398 @@
+/*
+ *     A gateway between MQTT and InfluxDB
+ *
+ *     (c) 2018--2019 Martin Mares <mj@ucw.cz>
+ */
+
+#include <ucw/lib.h>
+#include <ucw/fastbuf.h>
+#include <ucw/log.h>
+#include <ucw/opt.h>
+#include <ucw/string.h>
+
+#include <pthread.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <syslog.h>
+#include <time.h>
+#include <unistd.h>
+
+#include <curl/curl.h>
+#include <mosquitto.h>
+
+#define MEASUREMENT_TIMEOUT 120
+
+#define INFLUX_URL "http://localhost:8086/write?db=burrow&precision=s"
+#define INFLUX_TIMEOUT 30
+#define INFLUX_VERBOSE 0
+#define INFLUX_INTERVAL 60
+
+static struct mosquitto *mosq;
+
+struct attr {
+       const char *metric;
+       const char *value_name;
+       const char *topic;
+};
+
+static const struct attr attr_table[] = {
+       {
+               .metric = "temp,where=loft",
+               .value_name = "t",
+               .topic = "burrow/temp/loft",
+       },
+       {
+               .metric = "temp,where=ursarium",
+               .value_name = "t",
+               .topic = "burrow/temp/ursarium"
+       },
+       {
+               .metric = "temp,where=catarium",
+               .value_name = "t",
+               .topic = "burrow/temp/catarium"
+       },
+       {
+               .metric = "temp,where=garage",
+               .value_name = "t",
+               .topic = "burrow/temp/garage"
+       },
+       {
+               .metric = "temp,where=kitchen",
+               .value_name = "t",
+               .topic = "burrow/temp/kitchen"
+       },
+       {
+               .metric = "rh,where=ursarium",
+               .value_name = "rh",
+               .topic = "burrow/temp/ursarium-rh"
+       },
+       {
+               .metric = "temp,where=catarium_clock",
+               .value_name = "t",
+               .topic = "burrow/temp/clock"
+       },
+       {
+               .metric = "pressure,where=catarium_clock",
+               .value_name = "pa",
+               .topic = "burrow/pressure/clock"
+       },
+       {
+               .metric = "air,where=inside_intake",
+               .value_name = "t",
+               .topic = "burrow/air/inside-intake",
+       },
+       {
+               .metric = "air,where=inside_exhaust",
+               .value_name = "t",
+               .topic = "burrow/air/inside-exhaust",
+       },
+       {
+               .metric = "air,where=outside_intake",
+               .value_name = "t",
+               .topic = "burrow/air/outside-intake",
+       },
+       {
+               .metric = "air,where=outside_exhaust",
+               .value_name = "t",
+               .topic = "burrow/air/outside-exhaust",
+       },
+       {
+               .metric = "air,where=mixed",
+               .value_name = "t",
+               .topic = "burrow/air/mixed",
+       },
+       {
+               .metric = "air,where=inside_intake_avg",
+               .value_name = "t",
+               .topic = "burrow/avg/air/inside-intake",
+       },
+       {
+               .metric = "air,where=outside_intake_avg",
+               .value_name = "t",
+               .topic = "burrow/avg/air/outside-intake",
+       },
+       {
+               .metric = "air_bypass",
+               .value_name = "on",
+               .topic = "burrow/air/bypass"
+       },
+       {
+               .metric = "air_fan_pwm",
+               .value_name = "pwm",
+               .topic = "burrow/air/exchanger-fan"
+       },
+#if 0
+       {
+               .metric = "loft_fan",
+               .topic = "burrow/loft/fan"
+       },
+#endif
+       {
+               .metric = "water_circ",
+               .value_name = "on",
+               .topic = "burrow/loft/circulation"
+       },
+       {
+               .metric = "pm_voltage,phase=L1N",
+               .value_name = "V",
+               .topic = "burrow/power/voltage/l1n",
+       },
+       {
+               .metric = "pm_voltage,phase=L2N",
+               .value_name = "V",
+               .topic = "burrow/power/voltage/l2n",
+       },
+       {
+               .metric = "pm_voltage,phase=L3N",
+               .value_name = "V",
+               .topic = "burrow/power/voltage/l3n",
+       },
+       {
+               .metric = "pm_current,phase=L1",
+               .value_name = "A",
+               .topic = "burrow/power/current/l1",
+       },
+       {
+               .metric = "pm_current,phase=L2",
+               .value_name = "A",
+               .topic = "burrow/power/current/l2",
+       },
+       {
+               .metric = "pm_current,phase=L3",
+               .value_name = "A",
+               .topic = "burrow/power/current/l3",
+       },
+       {
+               .metric = "pm_power",
+               .value_name = "W",
+               .topic = "burrow/power/power",
+       },
+       {
+               .metric = "pm_energy",
+               .value_name = "kWh",
+               .topic = "burrow/power/energy",
+       },
+       {
+               .metric = "pm_reactive_power",
+               .value_name = "VAr",
+               .topic = "burrow/power/reactive/power",
+       },
+       {
+               .metric = "pm_reactive_energy",
+               .value_name = "kVArh",
+               .topic = "burrow/power/reactive/energy",
+       },
+};
+
+/*** MQTT ***/
+
+static char *attr_values[ARRAY_SIZE(attr_table)];
+static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static void mqtt_publish(const char *topic, const char *fmt, ...)
+{
+       va_list args;
+       va_start(args, fmt);
+       char m[256];
+       int l = vsnprintf(m, sizeof(m), fmt, args);
+       if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
+               msg(L_ERROR, "Mosquitto: publish failed");
+       va_end(args);
+}
+
+static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
+{
+       if (!status) {
+               msg(L_DEBUG, "MQTT: Connection established, subscribing");
+               if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
+                       die("Mosquitto: subscribe failed");
+
+               mqtt_publish("status/influx", "ok");
+       }
+}
+
+static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
+{
+       // msg(L_INFO, "MQTT(%d): %s", level, message);
+}
+
+static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
+{
+       char val[256];
+       if (m->payloadlen >= sizeof(val) - 1) {
+               msg(L_ERROR, "Invalid value for topic %s", m->topic);
+               return;
+       }
+       memcpy(val, m->payload, m->payloadlen);
+       val[m->payloadlen] = 0;
+       msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
+
+       for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
+               if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
+                       pthread_mutex_lock(&attr_mutex);
+                       if (attr_values[i]) {
+                               xfree(attr_values[i]);
+                               attr_values[i] = NULL;
+                       }
+                       if (val[0])
+                               attr_values[i] = xstrdup(val);
+                       pthread_mutex_unlock(&attr_mutex);
+               }
+       }
+}
+
+/*** InfluxDB ***/
+
+static CURL *curl;
+
+static struct fastbuf *influx_fb;
+static struct fastbuf *influx_err_fb;
+
+static size_t influx_write_callback(char *ptr, size_t size, size_t nmemb, void *userdata UNUSED)
+{
+       ASSERT(size == 1);
+       for (size_t i=0; i<nmemb; i++) {
+               int c = *(byte *)ptr;
+               ptr++;
+               if (c == '\r')
+                       continue;
+               if (c == '\n')
+                       c = ' ';
+               else if (c < 0x20 || c > 0x7e)
+                       c = '?';
+               bputc(influx_err_fb, c);
+       }
+       return nmemb;
+}
+
+static void influx_init(void)
+{
+       curl = curl_easy_init();
+       if (!curl)
+               die("libcurl init failed");
+
+       curl_easy_setopt(curl, CURLOPT_URL, INFLUX_URL);
+       curl_easy_setopt(curl, CURLOPT_VERBOSE, (long) INFLUX_VERBOSE);
+       curl_easy_setopt(curl, CURLOPT_TIMEOUT, (long) INFLUX_TIMEOUT);
+       curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, influx_write_callback);
+
+       influx_fb = fbgrow_create(4096);
+       influx_err_fb = fbgrow_create(256);
+}
+
+static struct fastbuf *influx_write_start(void)
+{
+       fbgrow_reset(influx_fb);
+       return influx_fb;
+}
+
+static void influx_write_flush(void)
+{
+       bputc(influx_fb, 0);
+       byte *buf;
+       fbgrow_get_buf(influx_fb, &buf);
+       curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf);
+
+       msg(L_DEBUG, "InfluxDB: Sending data (%u bytes)", strlen(buf));
+       if (INFLUX_VERBOSE)
+               fprintf(stderr, "%s", buf);
+
+       fbgrow_reset(influx_err_fb);
+
+       CURLcode code = curl_easy_perform(curl);
+
+       if (code != CURLE_OK) {
+               msg(L_ERROR, "Write to InfluxDB failed: %s", curl_easy_strerror(code));
+               return;
+       }
+
+       long rc = 0;
+       curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc);
+       if (rc != 200 && rc != 204) {
+               bputc(influx_err_fb, 0);
+               byte *err;
+               fbgrow_get_buf(influx_err_fb, &err);
+               msg(L_DEBUG, "InfluxDB HTTP error %ld: %s [...]", rc, err);
+       }
+}
+
+/*** Main ***/
+
+static int use_daemon;
+static int use_debug;
+
+static struct opt_section options = {
+       OPT_ITEMS {
+               OPT_HELP("A daemon for transferring MQTT data to InfluxDB"),
+               OPT_HELP(""),
+               OPT_HELP("Options:"),
+               OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
+               OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
+               OPT_HELP_OPTION,
+               OPT_CONF_OPTIONS,
+               OPT_END
+       }
+};
+
+int main(int argc UNUSED, char **argv)
+{
+       log_init(argv[0]);
+       opt_parse(&options, argv+1);
+
+       if (use_daemon) {
+               struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
+               log_set_default_stream(ls);
+       }
+       if (!use_debug)
+               log_default_stream()->levels &= ~(1U << L_DEBUG);
+
+       mosquitto_lib_init();
+       mosq = mosquitto_new("influx", 1, NULL);
+       if (!mosq)
+               die("Mosquitto: initialization failed");
+
+       mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
+       mosquitto_log_callback_set(mosq, mqtt_log_callback);
+       mosquitto_message_callback_set(mosq, mqtt_msg_callback);
+
+       if (mosquitto_will_set(mosq, "status/influx", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
+               die("Mosquitto: unable to set will");
+
+       if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
+               die("Mosquitto: connect failed");
+
+       if (mosquitto_loop_start(mosq))
+               die("Mosquitto: cannot start service thread");
+
+       influx_init();
+
+       for (;;) {
+               struct fastbuf *f = influx_write_start();
+               char val[256], *w[2];
+               time_t now = time(NULL);
+
+               for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
+                       const struct attr *a = &attr_table[i];
+
+                       pthread_mutex_lock(&attr_mutex);
+                       snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
+                       pthread_mutex_unlock(&attr_mutex);
+
+                       if (!val[0])
+                               continue;
+                       int fields = str_wordsplit(val, w, ARRAY_SIZE(w));
+                       if (fields < 1)
+                               continue;
+                       if (fields >= 2) {
+                               time_t t = atoll(w[1]);
+                               if (t < now - MEASUREMENT_TIMEOUT)
+                                       continue;
+                       }
+
+                       bprintf(f, "%s %s=%s\n", a->metric, a->value_name, val);
+               }
+               influx_write_flush();
+               sleep(INFLUX_INTERVAL);
+       }
+}