From: Martin Mares Date: Sat, 2 Nov 2019 14:59:31 +0000 (+0100) Subject: burrow-influx X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=a18b65c71b6c8b9868b3fd394d1ce5092dee9428;p=home-hw.git burrow-influx --- diff --git a/influx/Makefile b/influx/Makefile new file mode 100644 index 0000000..b08120e --- /dev/null +++ b/influx/Makefile @@ -0,0 +1,16 @@ +PC=pkg-config +UCW_CFLAGS := $(shell $(PC) --cflags libucw) +UCW_LIBS := $(shell $(PC) --libs libucw) + +CURL_CFLAGS := $(shell curl-config --cflags) +CURL_LIBS := $(shell curl-config --libs) + +CFLAGS=$(UCW_CFLAGS) $(CURL_CFLAGS) -std=gnu99 +LDFLAGS=$(UCW_LIBS) $(CURL_LIBS) -lmosquitto + +all: burrow-influx + +burrow-influx: burrow-influx.c + +install: all + install burrow-influx /usr/local/sbin/ diff --git a/influx/burrow-influx.c b/influx/burrow-influx.c new file mode 100644 index 0000000..9521300 --- /dev/null +++ b/influx/burrow-influx.c @@ -0,0 +1,398 @@ +/* + * A gateway between MQTT and InfluxDB + * + * (c) 2018--2019 Martin Mares + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#define MEASUREMENT_TIMEOUT 120 + +#define INFLUX_URL "http://localhost:8086/write?db=burrow&precision=s" +#define INFLUX_TIMEOUT 30 +#define INFLUX_VERBOSE 0 +#define INFLUX_INTERVAL 60 + +static struct mosquitto *mosq; + +struct attr { + const char *metric; + const char *value_name; + const char *topic; +}; + +static const struct attr attr_table[] = { + { + .metric = "temp,where=loft", + .value_name = "t", + .topic = "burrow/temp/loft", + }, + { + .metric = "temp,where=ursarium", + .value_name = "t", + .topic = "burrow/temp/ursarium" + }, + { + .metric = "temp,where=catarium", + .value_name = "t", + .topic = "burrow/temp/catarium" + }, + { + .metric = "temp,where=garage", + .value_name = "t", + .topic = "burrow/temp/garage" + }, + { + .metric = "temp,where=kitchen", + .value_name = "t", + .topic = "burrow/temp/kitchen" + }, + { + .metric = "rh,where=ursarium", + .value_name = "rh", + .topic = "burrow/temp/ursarium-rh" + }, + { + .metric = "temp,where=catarium_clock", + .value_name = "t", + .topic = "burrow/temp/clock" + }, + { + .metric = "pressure,where=catarium_clock", + .value_name = "pa", + .topic = "burrow/pressure/clock" + }, + { + .metric = "air,where=inside_intake", + .value_name = "t", + .topic = "burrow/air/inside-intake", + }, + { + .metric = "air,where=inside_exhaust", + .value_name = "t", + .topic = "burrow/air/inside-exhaust", + }, + { + .metric = "air,where=outside_intake", + .value_name = "t", + .topic = "burrow/air/outside-intake", + }, + { + .metric = "air,where=outside_exhaust", + .value_name = "t", + .topic = "burrow/air/outside-exhaust", + }, + { + .metric = "air,where=mixed", + .value_name = "t", + .topic = "burrow/air/mixed", + }, + { + .metric = "air,where=inside_intake_avg", + .value_name = "t", + .topic = "burrow/avg/air/inside-intake", + }, + { + .metric = "air,where=outside_intake_avg", + .value_name = "t", + .topic = "burrow/avg/air/outside-intake", + }, + { + .metric = "air_bypass", + .value_name = "on", + .topic = "burrow/air/bypass" + }, + { + .metric = "air_fan_pwm", + .value_name = "pwm", + .topic = "burrow/air/exchanger-fan" + }, +#if 0 + { + .metric = "loft_fan", + .topic = "burrow/loft/fan" + }, +#endif + { + .metric = "water_circ", + .value_name = "on", + .topic = "burrow/loft/circulation" + }, + { + .metric = "pm_voltage,phase=L1N", + .value_name = "V", + .topic = "burrow/power/voltage/l1n", + }, + { + .metric = "pm_voltage,phase=L2N", + .value_name = "V", + .topic = "burrow/power/voltage/l2n", + }, + { + .metric = "pm_voltage,phase=L3N", + .value_name = "V", + .topic = "burrow/power/voltage/l3n", + }, + { + .metric = "pm_current,phase=L1", + .value_name = "A", + .topic = "burrow/power/current/l1", + }, + { + .metric = "pm_current,phase=L2", + .value_name = "A", + .topic = "burrow/power/current/l2", + }, + { + .metric = "pm_current,phase=L3", + .value_name = "A", + .topic = "burrow/power/current/l3", + }, + { + .metric = "pm_power", + .value_name = "W", + .topic = "burrow/power/power", + }, + { + .metric = "pm_energy", + .value_name = "kWh", + .topic = "burrow/power/energy", + }, + { + .metric = "pm_reactive_power", + .value_name = "VAr", + .topic = "burrow/power/reactive/power", + }, + { + .metric = "pm_reactive_energy", + .value_name = "kVArh", + .topic = "burrow/power/reactive/energy", + }, +}; + +/*** MQTT ***/ + +static char *attr_values[ARRAY_SIZE(attr_table)]; +static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER; + +static void mqtt_publish(const char *topic, const char *fmt, ...) +{ + va_list args; + va_start(args, fmt); + char m[256]; + int l = vsnprintf(m, sizeof(m), fmt, args); + if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS) + msg(L_ERROR, "Mosquitto: publish failed"); + va_end(args); +} + +static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status) +{ + if (!status) { + msg(L_DEBUG, "MQTT: Connection established, subscribing"); + if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS) + die("Mosquitto: subscribe failed"); + + mqtt_publish("status/influx", "ok"); + } +} + +static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message) +{ + // msg(L_INFO, "MQTT(%d): %s", level, message); +} + +static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m) +{ + char val[256]; + if (m->payloadlen >= sizeof(val) - 1) { + msg(L_ERROR, "Invalid value for topic %s", m->topic); + return; + } + memcpy(val, m->payload, m->payloadlen); + val[m->payloadlen] = 0; + msg(L_DEBUG, "MQTT < %s %s", m->topic, val); + + for (uint i=0; i < ARRAY_SIZE(attr_table); i++) { + if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) { + pthread_mutex_lock(&attr_mutex); + if (attr_values[i]) { + xfree(attr_values[i]); + attr_values[i] = NULL; + } + if (val[0]) + attr_values[i] = xstrdup(val); + pthread_mutex_unlock(&attr_mutex); + } + } +} + +/*** InfluxDB ***/ + +static CURL *curl; + +static struct fastbuf *influx_fb; +static struct fastbuf *influx_err_fb; + +static size_t influx_write_callback(char *ptr, size_t size, size_t nmemb, void *userdata UNUSED) +{ + ASSERT(size == 1); + for (size_t i=0; i 0x7e) + c = '?'; + bputc(influx_err_fb, c); + } + return nmemb; +} + +static void influx_init(void) +{ + curl = curl_easy_init(); + if (!curl) + die("libcurl init failed"); + + curl_easy_setopt(curl, CURLOPT_URL, INFLUX_URL); + curl_easy_setopt(curl, CURLOPT_VERBOSE, (long) INFLUX_VERBOSE); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, (long) INFLUX_TIMEOUT); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, influx_write_callback); + + influx_fb = fbgrow_create(4096); + influx_err_fb = fbgrow_create(256); +} + +static struct fastbuf *influx_write_start(void) +{ + fbgrow_reset(influx_fb); + return influx_fb; +} + +static void influx_write_flush(void) +{ + bputc(influx_fb, 0); + byte *buf; + fbgrow_get_buf(influx_fb, &buf); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf); + + msg(L_DEBUG, "InfluxDB: Sending data (%u bytes)", strlen(buf)); + if (INFLUX_VERBOSE) + fprintf(stderr, "%s", buf); + + fbgrow_reset(influx_err_fb); + + CURLcode code = curl_easy_perform(curl); + + if (code != CURLE_OK) { + msg(L_ERROR, "Write to InfluxDB failed: %s", curl_easy_strerror(code)); + return; + } + + long rc = 0; + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc); + if (rc != 200 && rc != 204) { + bputc(influx_err_fb, 0); + byte *err; + fbgrow_get_buf(influx_err_fb, &err); + msg(L_DEBUG, "InfluxDB HTTP error %ld: %s [...]", rc, err); + } +} + +/*** Main ***/ + +static int use_daemon; +static int use_debug; + +static struct opt_section options = { + OPT_ITEMS { + OPT_HELP("A daemon for transferring MQTT data to InfluxDB"), + OPT_HELP(""), + OPT_HELP("Options:"), + OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"), + OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"), + OPT_HELP_OPTION, + OPT_CONF_OPTIONS, + OPT_END + } +}; + +int main(int argc UNUSED, char **argv) +{ + log_init(argv[0]); + opt_parse(&options, argv+1); + + if (use_daemon) { + struct log_stream *ls = log_new_syslog("daemon", LOG_PID); + log_set_default_stream(ls); + } + if (!use_debug) + log_default_stream()->levels &= ~(1U << L_DEBUG); + + mosquitto_lib_init(); + mosq = mosquitto_new("influx", 1, NULL); + if (!mosq) + die("Mosquitto: initialization failed"); + + mosquitto_connect_callback_set(mosq, mqtt_conn_callback); + mosquitto_log_callback_set(mosq, mqtt_log_callback); + mosquitto_message_callback_set(mosq, mqtt_msg_callback); + + if (mosquitto_will_set(mosq, "status/influx", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS) + die("Mosquitto: unable to set will"); + + if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS) + die("Mosquitto: connect failed"); + + if (mosquitto_loop_start(mosq)) + die("Mosquitto: cannot start service thread"); + + influx_init(); + + for (;;) { + struct fastbuf *f = influx_write_start(); + char val[256], *w[2]; + time_t now = time(NULL); + + for (uint i=0; i < ARRAY_SIZE(attr_table); i++) { + const struct attr *a = &attr_table[i]; + + pthread_mutex_lock(&attr_mutex); + snprintf(val, sizeof(val), "%s", attr_values[i] ? : ""); + pthread_mutex_unlock(&attr_mutex); + + if (!val[0]) + continue; + int fields = str_wordsplit(val, w, ARRAY_SIZE(w)); + if (fields < 1) + continue; + if (fields >= 2) { + time_t t = atoll(w[1]); + if (t < now - MEASUREMENT_TIMEOUT) + continue; + } + + bprintf(f, "%s %s=%s\n", a->metric, a->value_name, val); + } + influx_write_flush(); + sleep(INFLUX_INTERVAL); + } +}