]> mj.ucw.cz Git - arexx.git/commitdiff
Exporting data points to MQTT
authorMartin Mares <mj@ucw.cz>
Sun, 12 Aug 2018 09:57:03 +0000 (11:57 +0200)
committerMartin Mares <mj@ucw.cz>
Sun, 12 Aug 2018 09:57:03 +0000 (11:57 +0200)
Makefile
arexxd.c

index f1d35e8d6e3cb1127455ff6b6ae32a59fe5d1244..8a2a8b488d9ebae60ba83d1fe269332f01ecf598 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -1,10 +1,10 @@
-VERSION=1.6
+VERSION=1.99
 ARCHIVE=arexxd-$(VERSION).tar.gz
 
 CC=gcc
 LD=gcc
 CFLAGS=-O2 -Wall -W -Wno-parentheses -Wstrict-prototypes -Wmissing-prototypes -Wundef -Wredundant-decls -std=gnu99 -DAREXXD_VERSION='"$(VERSION)"'
-LDLIBS=-lusb-1.0 -lm -lrrd
+LDLIBS=-lusb-1.0 -lm -lrrd -lmosquitto
 
 all: arexxd
 
index b5f43da516979971e2da513cb6298f355be6b9bd..c48efd176fd9e461bab6e449eea355596afb134d 100644 (file)
--- a/arexxd.c
+++ b/arexxd.c
@@ -1,7 +1,7 @@
 /*
  *     Linux Interfece for Arexx Data Loggers
  *
- *     (c) 2011-2016 Martin Mares <mj@ucw.cz>
+ *     (c) 2011-2018 Martin Mares <mj@ucw.cz>
  */
 
 #include <stdio.h>
@@ -17,7 +17,6 @@
 #include <signal.h>
 #include <sys/stat.h>
 #include <libusb-1.0/libusb.h>
-#include <rrd.h>
 
 #define DEFAULT_LOG_DIR "/var/log/arexxd"
 
@@ -48,6 +47,11 @@ static int debug_usb;
 static char *log_dir = DEFAULT_LOG_DIR;
 static int no_fork;
 
+#define UNUSED __attribute__((unused))
+
+static int data_point_counter;         // Since last log message
+static time_t packet_rx_time;
+
 static void die(char *fmt, ...)
 {
        va_list args;
@@ -98,8 +102,65 @@ static void log_pkt(char *fmt, ...)
        va_end(args);
 }
 
+/*** MQTT interface ***/
+
+#include <mosquitto.h>
+
+static struct mosquitto *mosq;
+
+static void mqtt_publish(const char *topic, const char *fmt, ...)
+{
+       va_list args;
+       va_start(args, fmt);
+       char m[256];
+       int l = vsnprintf(m, sizeof(m), fmt, args);
+       if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
+               log_error("Mosquitto: publish failed");
+       va_end(args);
+}
+
+static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
+{
+       if (!status)
+               mqtt_publish("burrow/arexxd/status", "ok");
+}
+
+static void mqtt_init(void)
+{
+       mosquitto_lib_init();
+       mosq = mosquitto_new("arexxd", 1, NULL);
+       if (!mosq)
+               die("Mosquitto: initialization failed");
+
+       if (mosquitto_will_set(mosq, "burrow/arexxd/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
+               die("Mosquitto: unable to set will");
+
+       mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
+
+       if (mosquitto_connect(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
+               die("Mosquitto: connect failed");
+
+       if (mosquitto_loop_start(mosq))
+               die("Mosquitto: cannot start service thread");
+}
+
+static void mqtt_point(time_t t, const char *name, double val, char *unit UNUSED)
+{
+       // We do not feed past data to MQTT (so MAX_PAST_TIME is stronger for us)
+       if (t < packet_rx_time - 30)
+               return;
+
+       char topic[64];
+       snprintf(topic, sizeof(topic), "burrow/arexxd/%s", name);
+       mqtt_publish(topic, "%.3f", val);
+       snprintf(topic, sizeof(topic), "burrow/arexxd/%s_timestamp", name);
+       mqtt_publish(topic, "%llu", (unsigned long long) t);
+}
+
 /*** RRD interface ***/
 
+#include <rrd.h>
+
 #define MAX_ARGS 20
 #define MAX_ARG_SIZE 1024
 
@@ -176,9 +237,6 @@ static void rrd_point(time_t t, const char *name, double val, char *unit)
 
 #define TIME_OFFSET 946681200          // Timestamp of 2000-01-01 00:00:00
 
-static int data_point_counter;         // Since last log message
-static time_t packet_rx_time;
-
 static double correct_point(uint id, double val, const char **name)
 {
        /*
@@ -243,6 +301,7 @@ static void cooked_point(time_t t, uint id, double val, char *unit, int q)
 
        data_point_counter++;
        rrd_point(t, name, val2, unit);
+       mqtt_point(t, name, val2, unit);
 }
 
 static void raw_point(uint t, uint id, int raw, int q)
@@ -607,7 +666,7 @@ static void set_clock(void)
 static sigset_t term_sigs;
 static volatile sig_atomic_t want_shutdown;
 
-static void sigterm_handler(int sig __attribute__((unused)))
+static void sigterm_handler(int sig UNUSED)
 {
        want_shutdown = 1;
 }
@@ -671,7 +730,7 @@ int main(int argc, char **argv)
                                break;
                        case 'V':
                                printf("arexxd " AREXXD_VERSION "\n");
-                               printf("(c) 2011-2012 Martin Mares <mj@ucw.cz>\n");
+                               printf("(c) 2011-2018 Martin Mares <mj@ucw.cz>\n");
                                return 0;
                        default:
                                usage();
@@ -706,6 +765,8 @@ int main(int argc, char **argv)
                use_syslog = 1;
        }
 
+       mqtt_init();
+
        struct sigaction sa = { .sa_handler = sigterm_handler };
        sigaction(SIGTERM, &sa, NULL);
        sigaction(SIGINT, &sa, NULL);