From: Martin Mares Date: Sun, 12 Aug 2018 09:57:03 +0000 (+0200) Subject: Exporting data points to MQTT X-Git-Tag: v2.0~7 X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=1091e2ad748c18e54a5734e4627d4007cb6413ef;p=arexx.git Exporting data points to MQTT --- diff --git a/Makefile b/Makefile index f1d35e8..8a2a8b4 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ -VERSION=1.6 +VERSION=1.99 ARCHIVE=arexxd-$(VERSION).tar.gz CC=gcc LD=gcc CFLAGS=-O2 -Wall -W -Wno-parentheses -Wstrict-prototypes -Wmissing-prototypes -Wundef -Wredundant-decls -std=gnu99 -DAREXXD_VERSION='"$(VERSION)"' -LDLIBS=-lusb-1.0 -lm -lrrd +LDLIBS=-lusb-1.0 -lm -lrrd -lmosquitto all: arexxd diff --git a/arexxd.c b/arexxd.c index b5f43da..c48efd1 100644 --- a/arexxd.c +++ b/arexxd.c @@ -1,7 +1,7 @@ /* * Linux Interfece for Arexx Data Loggers * - * (c) 2011-2016 Martin Mares + * (c) 2011-2018 Martin Mares */ #include @@ -17,7 +17,6 @@ #include #include #include -#include #define DEFAULT_LOG_DIR "/var/log/arexxd" @@ -48,6 +47,11 @@ static int debug_usb; static char *log_dir = DEFAULT_LOG_DIR; static int no_fork; +#define UNUSED __attribute__((unused)) + +static int data_point_counter; // Since last log message +static time_t packet_rx_time; + static void die(char *fmt, ...) { va_list args; @@ -98,8 +102,65 @@ static void log_pkt(char *fmt, ...) va_end(args); } +/*** MQTT interface ***/ + +#include + +static struct mosquitto *mosq; + +static void mqtt_publish(const char *topic, const char *fmt, ...) +{ + va_list args; + va_start(args, fmt); + char m[256]; + int l = vsnprintf(m, sizeof(m), fmt, args); + if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS) + log_error("Mosquitto: publish failed"); + va_end(args); +} + +static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status) +{ + if (!status) + mqtt_publish("burrow/arexxd/status", "ok"); +} + +static void mqtt_init(void) +{ + mosquitto_lib_init(); + mosq = mosquitto_new("arexxd", 1, NULL); + if (!mosq) + die("Mosquitto: initialization failed"); + + if (mosquitto_will_set(mosq, "burrow/arexxd/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS) + die("Mosquitto: unable to set will"); + + mosquitto_connect_callback_set(mosq, mqtt_conn_callback); + + if (mosquitto_connect(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS) + die("Mosquitto: connect failed"); + + if (mosquitto_loop_start(mosq)) + die("Mosquitto: cannot start service thread"); +} + +static void mqtt_point(time_t t, const char *name, double val, char *unit UNUSED) +{ + // We do not feed past data to MQTT (so MAX_PAST_TIME is stronger for us) + if (t < packet_rx_time - 30) + return; + + char topic[64]; + snprintf(topic, sizeof(topic), "burrow/arexxd/%s", name); + mqtt_publish(topic, "%.3f", val); + snprintf(topic, sizeof(topic), "burrow/arexxd/%s_timestamp", name); + mqtt_publish(topic, "%llu", (unsigned long long) t); +} + /*** RRD interface ***/ +#include + #define MAX_ARGS 20 #define MAX_ARG_SIZE 1024 @@ -176,9 +237,6 @@ static void rrd_point(time_t t, const char *name, double val, char *unit) #define TIME_OFFSET 946681200 // Timestamp of 2000-01-01 00:00:00 -static int data_point_counter; // Since last log message -static time_t packet_rx_time; - static double correct_point(uint id, double val, const char **name) { /* @@ -243,6 +301,7 @@ static void cooked_point(time_t t, uint id, double val, char *unit, int q) data_point_counter++; rrd_point(t, name, val2, unit); + mqtt_point(t, name, val2, unit); } static void raw_point(uint t, uint id, int raw, int q) @@ -607,7 +666,7 @@ static void set_clock(void) static sigset_t term_sigs; static volatile sig_atomic_t want_shutdown; -static void sigterm_handler(int sig __attribute__((unused))) +static void sigterm_handler(int sig UNUSED) { want_shutdown = 1; } @@ -671,7 +730,7 @@ int main(int argc, char **argv) break; case 'V': printf("arexxd " AREXXD_VERSION "\n"); - printf("(c) 2011-2012 Martin Mares \n"); + printf("(c) 2011-2018 Martin Mares \n"); return 0; default: usage(); @@ -706,6 +765,8 @@ int main(int argc, char **argv) use_syslog = 1; } + mqtt_init(); + struct sigaction sa = { .sa_handler = sigterm_handler }; sigaction(SIGTERM, &sa, NULL); sigaction(SIGINT, &sa, NULL);