/*
* Linux Interfece for Arexx Data Loggers
*
- * (c) 2011-2016 Martin Mares <mj@ucw.cz>
+ * (c) 2011-2018 Martin Mares <mj@ucw.cz>
*/
#include <stdio.h>
#include <signal.h>
#include <sys/stat.h>
#include <libusb-1.0/libusb.h>
-#include <rrd.h>
#define DEFAULT_LOG_DIR "/var/log/arexxd"
static char *log_dir = DEFAULT_LOG_DIR;
static int no_fork;
+#define UNUSED __attribute__((unused))
+
+static int data_point_counter; // Since last log message
+static time_t packet_rx_time;
+
static void die(char *fmt, ...)
{
va_list args;
va_end(args);
}
+/*** MQTT interface ***/
+
+#include <mosquitto.h>
+
+static struct mosquitto *mosq;
+
+static void mqtt_publish(const char *topic, const char *fmt, ...)
+{
+ va_list args;
+ va_start(args, fmt);
+ char m[256];
+ int l = vsnprintf(m, sizeof(m), fmt, args);
+ if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
+ log_error("Mosquitto: publish failed");
+ va_end(args);
+}
+
+static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
+{
+ if (!status)
+ mqtt_publish("burrow/arexxd/status", "ok");
+}
+
+static void mqtt_init(void)
+{
+ mosquitto_lib_init();
+ mosq = mosquitto_new("arexxd", 1, NULL);
+ if (!mosq)
+ die("Mosquitto: initialization failed");
+
+ if (mosquitto_will_set(mosq, "burrow/arexxd/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: unable to set will");
+
+ mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
+
+ if (mosquitto_connect(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: connect failed");
+
+ if (mosquitto_loop_start(mosq))
+ die("Mosquitto: cannot start service thread");
+}
+
+static void mqtt_point(time_t t, const char *name, double val, char *unit UNUSED)
+{
+ // We do not feed past data to MQTT (so MAX_PAST_TIME is stronger for us)
+ if (t < packet_rx_time - 30)
+ return;
+
+ char topic[64];
+ snprintf(topic, sizeof(topic), "burrow/arexxd/%s", name);
+ mqtt_publish(topic, "%.3f", val);
+ snprintf(topic, sizeof(topic), "burrow/arexxd/%s_timestamp", name);
+ mqtt_publish(topic, "%llu", (unsigned long long) t);
+}
+
/*** RRD interface ***/
+#include <rrd.h>
+
#define MAX_ARGS 20
#define MAX_ARG_SIZE 1024
#define TIME_OFFSET 946681200 // Timestamp of 2000-01-01 00:00:00
-static int data_point_counter; // Since last log message
-static time_t packet_rx_time;
-
static double correct_point(uint id, double val, const char **name)
{
/*
data_point_counter++;
rrd_point(t, name, val2, unit);
+ mqtt_point(t, name, val2, unit);
}
static void raw_point(uint t, uint id, int raw, int q)
static sigset_t term_sigs;
static volatile sig_atomic_t want_shutdown;
-static void sigterm_handler(int sig __attribute__((unused)))
+static void sigterm_handler(int sig UNUSED)
{
want_shutdown = 1;
}
break;
case 'V':
printf("arexxd " AREXXD_VERSION "\n");
- printf("(c) 2011-2012 Martin Mares <mj@ucw.cz>\n");
+ printf("(c) 2011-2018 Martin Mares <mj@ucw.cz>\n");
return 0;
default:
usage();
use_syslog = 1;
}
+ mqtt_init();
+
struct sigaction sa = { .sa_handler = sigterm_handler };
sigaction(SIGTERM, &sa, NULL);
sigaction(SIGINT, &sa, NULL);