]> mj.ucw.cz Git - arexx.git/blobdiff - arexxd.c
Home: New MQTT server
[arexx.git] / arexxd.c
index 00a36f8febef9ec25964fe89991f5082a91312ad..e083c52bce471caf59bca4ddc9f2b324fa4ba60a 100644 (file)
--- a/arexxd.c
+++ b/arexxd.c
@@ -1,7 +1,7 @@
 /*
  *     Linux Interfece for Arexx Data Loggers
  *
- *     (c) 2011-2012 Martin Mares <mj@ucw.cz>
+ *     (c) 2011-2020 Martin Mares <mj@ucw.cz>
  */
 
 #include <stdio.h>
 #include <signal.h>
 #include <sys/stat.h>
 #include <libusb-1.0/libusb.h>
-#include <rrd.h>
 
 #define DEFAULT_LOG_DIR "/var/log/arexxd"
 
+/*
+ *  Data points received from the logger are sometimes corrupted by noise.
+ *  This effects not only the measured values, but also sensor IDs and timestamps.
+ *  Since rrdtool cannot skip back in time, a random timestamp in the future can
+ *  cause all further measurements to be dropped. To minimize impact of these
+ *  problems, we drop data points which are too far in the past or in the future.
+ *
+ *  Furthermore, you can ignore data from unrecognized sensors, i.e., those
+ *  which are not handled by correct_point().
+ */
+#define MAX_PAST_TIME 30*86400
+#define MAX_FUTURE_TIME 300
+#define IGNORE_UNKNOWN_SENSORS
+
+#undef LOG_TO_RRD
+#define LOG_TO_MQTT
+
 typedef unsigned char byte;
+typedef unsigned int uint;
 static libusb_context *usb_ctxt;
 static libusb_device_handle *devh;
 
@@ -30,6 +47,12 @@ static int debug_mode;
 static int debug_packets;
 static int debug_raw_data;
 static char *log_dir = DEFAULT_LOG_DIR;
+static int no_fork;
+
+#define UNUSED __attribute__((unused))
+
+static int data_point_counter;         // Since last log message
+static time_t packet_rx_time;
 
 static void die(char *fmt, ...)
 {
@@ -81,8 +104,72 @@ static void log_pkt(char *fmt, ...)
        va_end(args);
 }
 
+/*** MQTT interface ***/
+
+#ifdef LOG_TO_MQTT
+
+#include <mosquitto.h>
+
+static struct mosquitto *mosq;
+
+static void mqtt_publish(const char *topic, const char *fmt, ...)
+{
+       va_list args;
+       va_start(args, fmt);
+       char m[256];
+       int l = vsnprintf(m, sizeof(m), fmt, args);
+       if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
+               log_error("Mosquitto: publish failed");
+       va_end(args);
+}
+
+static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
+{
+       if (!status)
+               mqtt_publish("status/arexxd", "ok");
+}
+
+static void mqtt_init(void)
+{
+       mosquitto_lib_init();
+       mosq = mosquitto_new("arexxd", 1, NULL);
+       if (!mosq)
+               die("Mosquitto: initialization failed");
+
+       if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS)
+               die("Mosquitto: unable to set TLS parameters");
+
+       if (mosquitto_will_set(mosq, "status/arexxd", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
+               die("Mosquitto: unable to set will");
+
+       mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
+
+       if (mosquitto_connect(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS)
+               die("Mosquitto: connect failed");
+
+       if (mosquitto_loop_start(mosq))
+               die("Mosquitto: cannot start service thread");
+}
+
+static void mqtt_point(time_t t, const char *name, double val, char *unit UNUSED)
+{
+       // We do not feed past data to MQTT (so MAX_PAST_TIME is stronger for us)
+       if (t < packet_rx_time - 30)
+               return;
+
+       char topic[64];
+       snprintf(topic, sizeof(topic), "burrow/temp/%s", name);
+       mqtt_publish(topic, "%.3f %lld", val, (long long) t);
+}
+
+#endif
+
 /*** RRD interface ***/
 
+#ifdef LOG_TO_RRD
+
+#include <rrd.h>
+
 #define MAX_ARGS 20
 #define MAX_ARG_SIZE 1024
 
@@ -140,6 +227,7 @@ static void rrd_point(time_t t, const char *name, double val, char *unit)
                rrd_create(arg_cnt, arg_ptr);
                if (rrd_test_error()) {
                        log_error("rrd_create on %s failed: %s", rr_name, rrd_get_error());
+                       rrd_clear_error();
                        return;
                }
        }
@@ -148,17 +236,19 @@ static void rrd_point(time_t t, const char *name, double val, char *unit)
        arg_push(rr_name);
        arg_push("%d:%f", t, val);
        rrd_update(arg_cnt, arg_ptr);
-       if (rrd_test_error())
+       if (rrd_test_error()) {
                log_error("rrd_update on %s failed: %s", rr_name, rrd_get_error());
+               rrd_clear_error();
+       }
 }
 
+#endif
+
 /*** Transforms ***/
 
 #define TIME_OFFSET 946681200          // Timestamp of 2000-01-01 00:00:00
 
-static int data_point_counter;         // Since last log message
-
-static double correct_point(int id, double val, const char **name)
+static double correct_point(uint id, double val, const char **name)
 {
        /*
         *  Manually calculated corrections and renames for my sensors.
@@ -166,29 +256,35 @@ static double correct_point(int id, double val, const char **name)
         */
        switch (id) {
                case 10415:
-                       *name = "ursarium";
+                       *name = "terarium";
                        return val - 0.93;
                case 10707:
-                       *name = "balcony";
+                       *name = "catarium";
                        return val - 0.71;
+               case 11699:
+                       *name = "garage";
+                       return val;
                case 19246:
-                       *name = "catarium";
+                       *name = "ursarium";
                        return val + 0.49;
                case 19247:
-                       *name = "catarium-rh";
+                       *name = "ursarium-rh";
                        return val;
                case 12133:
-                       *name = "outside";
+                       *name = "aquarium";
                        return val + 0.44;
                default:
+#ifdef IGNORE_UNKNOWN_SENSORS
+                       *name = NULL;
+#endif
                        return val;
        }
 }
 
-static void cooked_point(time_t t, int id, double val, char *unit, int q)
+static void cooked_point(time_t t, uint id, double val, char *unit, int q)
 {
        char namebuf[16];
-       snprintf(namebuf, sizeof(namebuf), "%d", id);
+       snprintf(namebuf, sizeof(namebuf), "%u", id);
        const char *name = namebuf;
 
        double val2 = correct_point(id, val, &name);
@@ -201,11 +297,29 @@ static void cooked_point(time_t t, int id, double val, char *unit, int q)
                printf("== %s id=%d name=%s val=%.3f val2=%.3f unit=%s q=%d\n", tbuf, id, name, val, val2, unit, q);
        }
 
+       if (!name) {
+               log_error("Ignored data from unknown sensor %d", id);
+               return;
+       }
+       if (t < packet_rx_time - MAX_PAST_TIME) {
+               log_error("Data point from sensor %d too far in the past (%d sec)", packet_rx_time - t);
+               return;
+       }
+       if (t > packet_rx_time + MAX_FUTURE_TIME) {
+               log_error("Data point from sensor %d too far in the future (%d sec)", t - packet_rx_time);
+               return;
+       }
+
        data_point_counter++;
+#ifdef LOG_TO_RRD
        rrd_point(t, name, val2, unit);
+#endif
+#ifdef LOG_TO_MQTT
+       mqtt_point(t, name, val2, unit);
+#endif
 }
 
-static void raw_point(int t, int id, int raw, int q)
+static void raw_point(uint t, uint id, int raw, int q)
 {
        /*
         *  The binary blob provided by Arexx contains an embedded XML fragment
@@ -240,14 +354,15 @@ static void raw_point(int t, int id, int raw, int q)
        double z = raw;
        double hi, lo;
        char *unit;
-       int idhi = id & 0xf000;
+       uint idhi = id & 0xfffff000;
+       uint idext = id & 0xf0000000;
 
        if (idhi == 0x1000) {
                z = 0.02*z - 273.15;
                lo = -200;
                hi = 600;
                unit = "C";
-       } else if (idhi == 0x2000) {
+       } else if (idhi == 0x2000 || idext == 0x20000000) {
                if (raw >= 0x8000)
                        z -= 0x10000;
                z /= 128;
@@ -285,7 +400,7 @@ static void raw_point(int t, int id, int raw, int q)
                        unit = "ppm";
                }
        } else {
-               log_error("Unknown sensor type 0x%04x", id);
+               log_error("Unknown sensor type 0x%08x", id);
                return;
        }
 
@@ -294,11 +409,61 @@ static void raw_point(int t, int id, int raw, int q)
                return;
        }
 
-       cooked_point(t + TIME_OFFSET, id, z, unit, q);
+       cooked_point(t + TIME_OFFSET, id & 0x0fffffff, z, unit, q);
 }
 
 /*** USB interface ***/
 
+static int rx_endpoint, tx_endpoint;
+
+static int parse_descriptors(libusb_device *dev)
+{
+       int err;
+       struct libusb_config_descriptor *desc;
+
+       if (err = libusb_get_active_config_descriptor(dev, &desc)) {
+               log_error("libusb_get_config_descriptor failed: error %d", err);
+               return 0;
+       }
+       if (desc->bNumInterfaces != 1) {
+               log_error("Unexpected number of interfaces: %d", desc->bNumInterfaces);
+               goto failed;
+       }
+
+       const struct libusb_interface *iface = &desc->interface[0];
+       if (iface->num_altsetting != 1) {
+               log_error("Unexpected number of alternate interface settings: %d", iface->num_altsetting);
+               goto failed;
+       }
+
+       const struct libusb_interface_descriptor *ifd = &iface->altsetting[0];
+       if (ifd->bNumEndpoints != 2) {
+               log_error("Unexpected number of endpoints: %d", ifd->bNumEndpoints);
+               goto failed;
+       }
+
+       rx_endpoint = tx_endpoint = -1;
+       for (int i=0; i<2; i++) {
+               const struct libusb_endpoint_descriptor *epd = &ifd->endpoint[i];
+               if (epd->bEndpointAddress & 0x80)
+                       rx_endpoint = epd->bEndpointAddress;
+               else
+                       tx_endpoint = epd->bEndpointAddress;
+       }
+       if (rx_endpoint < 0 || tx_endpoint < 0) {
+               log_error("Failed to identify endpoints");
+               goto failed;
+       }
+
+       log_pkt("Found endpoints: rx==%02x tx=%02x\n", rx_endpoint, tx_endpoint);
+       libusb_free_config_descriptor(desc);
+       return 1;
+
+failed:
+       libusb_free_config_descriptor(desc);
+       return 0;
+}
+
 static int find_device(void)
 {
        libusb_device **devlist;
@@ -314,6 +479,8 @@ static int find_device(void)
                if (!libusb_get_device_descriptor(dev, &desc)) {
                        if (desc.idVendor == 0x0451 && desc.idProduct == 0x3211) {
                                log_info("Arexx data logger found at usb%d.%d", libusb_get_bus_number(dev), libusb_get_device_address(dev));
+                               if (!parse_descriptors(dev))
+                                       continue;
                                int err;
                                if (err = libusb_open(dev, &devh)) {
                                        log_error("libusb_open() failed: error %d", err);
@@ -354,6 +521,12 @@ static void dump_packet(byte *pkt)
        }
 }
 
+static void my_msleep(int ms)
+{
+       struct timespec ts = { .tv_sec = ms/1000, .tv_nsec = (ms%1000) * 1000000 };
+       nanosleep(&ts, NULL);
+}
+
 static int send_and_receive(byte *req, byte *reply)
 {
        if (debug_packets) {
@@ -367,7 +540,7 @@ static int send_and_receive(byte *req, byte *reply)
        }
 
        int err, transferred;
-       if (err = libusb_bulk_transfer(devh, 0x01, req, 64, &transferred, 200)) {
+       if (err = libusb_bulk_transfer(devh, tx_endpoint, req, 64, &transferred, 200)) {
                if (err == LIBUSB_ERROR_TIMEOUT) {
                        log_pkt(">> xmit timed out\n");
                        return 0;
@@ -380,7 +553,8 @@ static int send_and_receive(byte *req, byte *reply)
                log_pkt(">> xmit %d bytes\n", transferred);
                dump_packet(req);
        }
-       if (err = libusb_bulk_transfer(devh, 0x81, reply, 64, &transferred, 200)) {
+       my_msleep(1);
+       if (err = libusb_bulk_transfer(devh, rx_endpoint, reply, 64, &transferred, 200)) {
                if (err == LIBUSB_ERROR_TIMEOUT) {
                        log_pkt("<< recv timed out\n");
                        return 0;
@@ -389,6 +563,7 @@ static int send_and_receive(byte *req, byte *reply)
                log_error("Receive error: %d", err);
                return err;
        }
+       packet_rx_time = time(NULL);
        if (debug_packets)
                log_pkt("<< recv %d bytes\n", transferred);
        while (transferred < 64)
@@ -439,21 +614,35 @@ static int parse_packet(byte *reply)
                int len = p[0];
                if (!len || len == 0xff)
                        break;
-               if (len < 9 || len > 10) {
-                       log_error("Unknown tuple length %02x", len);
-                       break;
-               }
                if (pos + len > 64) {
                        log_error("Tuple truncated");
                        break;
                }
-               int id = get_le16(p+1);
-               int raw = get_be16(p+3);
-               int t = get_le32(p+5);
-               int q = (len > 9) ? p[9] : -1;
+
+               uint t, id;
+               int raw, q;
+               switch (len) {
+                       case 9:
+                       case 10:
+                               id = get_le16(p+1);
+                               raw = get_be16(p+3);
+                               t = get_le32(p+5);
+                               q = (len > 9) ? p[9] : -1;
+                               break;
+                       case 12:
+                               id = get_le32(p+1);
+                               raw = get_be16(p+5);
+                               t = get_le32(p+7);
+                               q = p[11];
+                               break;
+                       default:
+                               log_error("Unknown tuple length %02x", len);
+                               goto end;
+               }
+
                if (debug_raw_data) {
-                       printf("... %02x: id=%d raw=%d t=%d", len, id, raw, t);
-                       if (len > 9)
+                       printf("... %02x: id=%08x raw=%d t=%u", len, id, raw, t);
+                       if (q >= 0)
                                printf(" q=%d", q);
                        printf("\n");
                }
@@ -462,6 +651,7 @@ static int parse_packet(byte *reply)
                points++;
        }
 
+end:
        return points;
 }
 
@@ -491,7 +681,7 @@ static void set_clock(void)
 static sigset_t term_sigs;
 static volatile sig_atomic_t want_shutdown;
 
-static void sigterm_handler(int sig __attribute__((unused)))
+static void sigterm_handler(int sig UNUSED)
 {
        want_shutdown = 1;
 }
@@ -499,16 +689,17 @@ static void sigterm_handler(int sig __attribute__((unused)))
 static void interruptible_msleep(int ms)
 {
        sigprocmask(SIG_UNBLOCK, &term_sigs, NULL);
-       struct timespec ts = { .tv_sec = ms/1000, .tv_nsec = (ms%1000) * 1000000 };
-       nanosleep(&ts, NULL);
+       my_msleep(ms);
        sigprocmask(SIG_BLOCK, &term_sigs, NULL);
 }
 
 static const struct option long_options[] = {
        { "debug",              0, NULL, 'd' },
        { "log-dir",            1, NULL, 'l' },
+       { "no-fork",            0, NULL, 'n' },
        { "debug-packets",      0, NULL, 'p' },
        { "debug-raw",          0, NULL, 'r' },
+       { "version",            0, NULL, 'V' },
        { NULL,                 0, NULL, 0 },
 };
 
@@ -520,8 +711,10 @@ Usage: arexxd <options>\n\
 Options:\n\
 -d, --debug            Debug mode (no chdir, no fork, no syslog)\n\
 -l, --log-dir=<dir>    Directory where all received data should be stored\n\
+-n, --no-fork          Do not fork\n\
 -p, --debug-packets    Log all packets sent and received\n\
 -r, --debug-raw                Log conversion from raw values\n\
+-V, --version          Show daemon version\n\
 ");
        exit(1);
 }
@@ -529,7 +722,7 @@ Options:\n\
 int main(int argc, char **argv)
 {
        int opt;
-       while ((opt = getopt_long(argc, argv, "dl:pr", long_options, NULL)) >= 0)
+       while ((opt = getopt_long(argc, argv, "dl:nprV", long_options, NULL)) >= 0)
                switch (opt) {
                        case 'd':
                                debug_mode++;
@@ -537,12 +730,19 @@ int main(int argc, char **argv)
                        case 'l':
                                log_dir = optarg;
                                break;
+                       case 'n':
+                               no_fork++;
+                               break;
                        case 'p':
                                debug_packets++;
                                break;
                        case 'r':
                                debug_raw_data++;
                                break;
+                       case 'V':
+                               printf("arexxd " AREXXD_VERSION "\n");
+                               printf("(c) 2011-2018 Martin Mares <mj@ucw.cz>\n");
+                               return 0;
                        default:
                                usage();
                }
@@ -552,7 +752,6 @@ int main(int argc, char **argv)
        int err;
        if (err = libusb_init(&usb_ctxt))
                die("Cannot initialize libusb: error %d", err);
-       // libusb_set_debug(usb_ctxt, 3);
 
        if (!debug_mode) {
                if (chdir(log_dir) < 0)
@@ -564,15 +763,21 @@ int main(int argc, char **argv)
                        setlinebuf(stdout);
                }
                openlog("arexxd", LOG_NDELAY, LOG_DAEMON);
-               pid_t pid = fork();
-               if (pid < 0)
-                       die("fork() failed: %m");
-               if (pid)
-                       return 0;
+               if (!no_fork) {
+                       pid_t pid = fork();
+                       if (pid < 0)
+                               die("fork() failed: %m");
+                       if (pid)
+                               return 0;
+               }
                setsid();
                use_syslog = 1;
        }
 
+#ifdef LOG_TO_MQTT
+       mqtt_init();
+#endif
+
        struct sigaction sa = { .sa_handler = sigterm_handler };
        sigaction(SIGTERM, &sa, NULL);
        sigaction(SIGINT, &sa, NULL);