X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;ds=sidebyside;f=arexxd.c;h=e083c52bce471caf59bca4ddc9f2b324fa4ba60a;hb=HEAD;hp=2a50c6e6face4d0b280b8ec8f1df1c07b38fd9f4;hpb=3d88c827f9b7a870298061e950de2321d246ed96;p=arexx.git diff --git a/arexxd.c b/arexxd.c index 2a50c6e..e083c52 100644 --- a/arexxd.c +++ b/arexxd.c @@ -1,7 +1,7 @@ /* * Linux Interfece for Arexx Data Loggers * - * (c) 2011-2012 Martin Mares + * (c) 2011-2020 Martin Mares */ #include @@ -17,11 +17,28 @@ #include #include #include -#include #define DEFAULT_LOG_DIR "/var/log/arexxd" +/* + * Data points received from the logger are sometimes corrupted by noise. + * This effects not only the measured values, but also sensor IDs and timestamps. + * Since rrdtool cannot skip back in time, a random timestamp in the future can + * cause all further measurements to be dropped. To minimize impact of these + * problems, we drop data points which are too far in the past or in the future. + * + * Furthermore, you can ignore data from unrecognized sensors, i.e., those + * which are not handled by correct_point(). + */ +#define MAX_PAST_TIME 30*86400 +#define MAX_FUTURE_TIME 300 +#define IGNORE_UNKNOWN_SENSORS + +#undef LOG_TO_RRD +#define LOG_TO_MQTT + typedef unsigned char byte; +typedef unsigned int uint; static libusb_context *usb_ctxt; static libusb_device_handle *devh; @@ -30,6 +47,12 @@ static int debug_mode; static int debug_packets; static int debug_raw_data; static char *log_dir = DEFAULT_LOG_DIR; +static int no_fork; + +#define UNUSED __attribute__((unused)) + +static int data_point_counter; // Since last log message +static time_t packet_rx_time; static void die(char *fmt, ...) { @@ -81,8 +104,72 @@ static void log_pkt(char *fmt, ...) va_end(args); } +/*** MQTT interface ***/ + +#ifdef LOG_TO_MQTT + +#include + +static struct mosquitto *mosq; + +static void mqtt_publish(const char *topic, const char *fmt, ...) +{ + va_list args; + va_start(args, fmt); + char m[256]; + int l = vsnprintf(m, sizeof(m), fmt, args); + if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS) + log_error("Mosquitto: publish failed"); + va_end(args); +} + +static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status) +{ + if (!status) + mqtt_publish("status/arexxd", "ok"); +} + +static void mqtt_init(void) +{ + mosquitto_lib_init(); + mosq = mosquitto_new("arexxd", 1, NULL); + if (!mosq) + die("Mosquitto: initialization failed"); + + if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS) + die("Mosquitto: unable to set TLS parameters"); + + if (mosquitto_will_set(mosq, "status/arexxd", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS) + die("Mosquitto: unable to set will"); + + mosquitto_connect_callback_set(mosq, mqtt_conn_callback); + + if (mosquitto_connect(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS) + die("Mosquitto: connect failed"); + + if (mosquitto_loop_start(mosq)) + die("Mosquitto: cannot start service thread"); +} + +static void mqtt_point(time_t t, const char *name, double val, char *unit UNUSED) +{ + // We do not feed past data to MQTT (so MAX_PAST_TIME is stronger for us) + if (t < packet_rx_time - 30) + return; + + char topic[64]; + snprintf(topic, sizeof(topic), "burrow/temp/%s", name); + mqtt_publish(topic, "%.3f %lld", val, (long long) t); +} + +#endif + /*** RRD interface ***/ +#ifdef LOG_TO_RRD + +#include + #define MAX_ARGS 20 #define MAX_ARG_SIZE 1024 @@ -140,6 +227,7 @@ static void rrd_point(time_t t, const char *name, double val, char *unit) rrd_create(arg_cnt, arg_ptr); if (rrd_test_error()) { log_error("rrd_create on %s failed: %s", rr_name, rrd_get_error()); + rrd_clear_error(); return; } } @@ -148,17 +236,19 @@ static void rrd_point(time_t t, const char *name, double val, char *unit) arg_push(rr_name); arg_push("%d:%f", t, val); rrd_update(arg_cnt, arg_ptr); - if (rrd_test_error()) + if (rrd_test_error()) { log_error("rrd_update on %s failed: %s", rr_name, rrd_get_error()); + rrd_clear_error(); + } } +#endif + /*** Transforms ***/ #define TIME_OFFSET 946681200 // Timestamp of 2000-01-01 00:00:00 -static int data_point_counter; // Since last log message - -static double correct_point(int id, double val, const char **name) +static double correct_point(uint id, double val, const char **name) { /* * Manually calculated corrections and renames for my sensors. @@ -166,23 +256,35 @@ static double correct_point(int id, double val, const char **name) */ switch (id) { case 10415: - *name = "ursarium"; + *name = "terarium"; return val - 0.93; - case 19246: + case 10707: *name = "catarium"; + return val - 0.71; + case 11699: + *name = "garage"; + return val; + case 19246: + *name = "ursarium"; return val + 0.49; + case 19247: + *name = "ursarium-rh"; + return val; case 12133: - *name = "balcony"; + *name = "aquarium"; return val + 0.44; default: +#ifdef IGNORE_UNKNOWN_SENSORS + *name = NULL; +#endif return val; } } -static void cooked_point(time_t t, int id, double val, char *unit, int q) +static void cooked_point(time_t t, uint id, double val, char *unit, int q) { char namebuf[16]; - snprintf(namebuf, sizeof(namebuf), "%d", id); + snprintf(namebuf, sizeof(namebuf), "%u", id); const char *name = namebuf; double val2 = correct_point(id, val, &name); @@ -195,11 +297,29 @@ static void cooked_point(time_t t, int id, double val, char *unit, int q) printf("== %s id=%d name=%s val=%.3f val2=%.3f unit=%s q=%d\n", tbuf, id, name, val, val2, unit, q); } + if (!name) { + log_error("Ignored data from unknown sensor %d", id); + return; + } + if (t < packet_rx_time - MAX_PAST_TIME) { + log_error("Data point from sensor %d too far in the past (%d sec)", packet_rx_time - t); + return; + } + if (t > packet_rx_time + MAX_FUTURE_TIME) { + log_error("Data point from sensor %d too far in the future (%d sec)", t - packet_rx_time); + return; + } + data_point_counter++; +#ifdef LOG_TO_RRD rrd_point(t, name, val2, unit); +#endif +#ifdef LOG_TO_MQTT + mqtt_point(t, name, val2, unit); +#endif } -static void raw_point(int t, int id, int raw, int q) +static void raw_point(uint t, uint id, int raw, int q) { /* * The binary blob provided by Arexx contains an embedded XML fragment @@ -234,14 +354,15 @@ static void raw_point(int t, int id, int raw, int q) double z = raw; double hi, lo; char *unit; - int idhi = id & 0xf000; + uint idhi = id & 0xfffff000; + uint idext = id & 0xf0000000; if (idhi == 0x1000) { z = 0.02*z - 273.15; lo = -200; hi = 600; unit = "C"; - } else if (idhi == 0x2000) { + } else if (idhi == 0x2000 || idext == 0x20000000) { if (raw >= 0x8000) z -= 0x10000; z /= 128; @@ -279,7 +400,7 @@ static void raw_point(int t, int id, int raw, int q) unit = "ppm"; } } else { - log_error("Unknown sensor type 0x%04x", id); + log_error("Unknown sensor type 0x%08x", id); return; } @@ -288,11 +409,61 @@ static void raw_point(int t, int id, int raw, int q) return; } - cooked_point(t + TIME_OFFSET, id, z, unit, q); + cooked_point(t + TIME_OFFSET, id & 0x0fffffff, z, unit, q); } /*** USB interface ***/ +static int rx_endpoint, tx_endpoint; + +static int parse_descriptors(libusb_device *dev) +{ + int err; + struct libusb_config_descriptor *desc; + + if (err = libusb_get_active_config_descriptor(dev, &desc)) { + log_error("libusb_get_config_descriptor failed: error %d", err); + return 0; + } + if (desc->bNumInterfaces != 1) { + log_error("Unexpected number of interfaces: %d", desc->bNumInterfaces); + goto failed; + } + + const struct libusb_interface *iface = &desc->interface[0]; + if (iface->num_altsetting != 1) { + log_error("Unexpected number of alternate interface settings: %d", iface->num_altsetting); + goto failed; + } + + const struct libusb_interface_descriptor *ifd = &iface->altsetting[0]; + if (ifd->bNumEndpoints != 2) { + log_error("Unexpected number of endpoints: %d", ifd->bNumEndpoints); + goto failed; + } + + rx_endpoint = tx_endpoint = -1; + for (int i=0; i<2; i++) { + const struct libusb_endpoint_descriptor *epd = &ifd->endpoint[i]; + if (epd->bEndpointAddress & 0x80) + rx_endpoint = epd->bEndpointAddress; + else + tx_endpoint = epd->bEndpointAddress; + } + if (rx_endpoint < 0 || tx_endpoint < 0) { + log_error("Failed to identify endpoints"); + goto failed; + } + + log_pkt("Found endpoints: rx==%02x tx=%02x\n", rx_endpoint, tx_endpoint); + libusb_free_config_descriptor(desc); + return 1; + +failed: + libusb_free_config_descriptor(desc); + return 0; +} + static int find_device(void) { libusb_device **devlist; @@ -308,6 +479,8 @@ static int find_device(void) if (!libusb_get_device_descriptor(dev, &desc)) { if (desc.idVendor == 0x0451 && desc.idProduct == 0x3211) { log_info("Arexx data logger found at usb%d.%d", libusb_get_bus_number(dev), libusb_get_device_address(dev)); + if (!parse_descriptors(dev)) + continue; int err; if (err = libusb_open(dev, &devh)) { log_error("libusb_open() failed: error %d", err); @@ -331,6 +504,8 @@ failed: static void release_device(void) { + libusb_release_interface(devh, 0); + libusb_reset_device(devh); libusb_close(devh); devh = NULL; } @@ -346,6 +521,12 @@ static void dump_packet(byte *pkt) } } +static void my_msleep(int ms) +{ + struct timespec ts = { .tv_sec = ms/1000, .tv_nsec = (ms%1000) * 1000000 }; + nanosleep(&ts, NULL); +} + static int send_and_receive(byte *req, byte *reply) { if (debug_packets) { @@ -359,7 +540,7 @@ static int send_and_receive(byte *req, byte *reply) } int err, transferred; - if (err = libusb_bulk_transfer(devh, 0x01, req, 64, &transferred, 200)) { + if (err = libusb_bulk_transfer(devh, tx_endpoint, req, 64, &transferred, 200)) { if (err == LIBUSB_ERROR_TIMEOUT) { log_pkt(">> xmit timed out\n"); return 0; @@ -372,7 +553,8 @@ static int send_and_receive(byte *req, byte *reply) log_pkt(">> xmit %d bytes\n", transferred); dump_packet(req); } - if (err = libusb_bulk_transfer(devh, 0x81, reply, 64, &transferred, 200)) { + my_msleep(1); + if (err = libusb_bulk_transfer(devh, rx_endpoint, reply, 64, &transferred, 200)) { if (err == LIBUSB_ERROR_TIMEOUT) { log_pkt("<< recv timed out\n"); return 0; @@ -381,6 +563,7 @@ static int send_and_receive(byte *req, byte *reply) log_error("Receive error: %d", err); return err; } + packet_rx_time = time(NULL); if (debug_packets) log_pkt("<< recv %d bytes\n", transferred); while (transferred < 64) @@ -431,21 +614,35 @@ static int parse_packet(byte *reply) int len = p[0]; if (!len || len == 0xff) break; - if (len < 9 || len > 10) { - log_error("Unknown tuple length %02x", len); - break; - } if (pos + len > 64) { log_error("Tuple truncated"); break; } - int id = get_le16(p+1); - int raw = get_be16(p+3); - int t = get_le32(p+5); - int q = (len > 9) ? p[9] : -1; + + uint t, id; + int raw, q; + switch (len) { + case 9: + case 10: + id = get_le16(p+1); + raw = get_be16(p+3); + t = get_le32(p+5); + q = (len > 9) ? p[9] : -1; + break; + case 12: + id = get_le32(p+1); + raw = get_be16(p+5); + t = get_le32(p+7); + q = p[11]; + break; + default: + log_error("Unknown tuple length %02x", len); + goto end; + } + if (debug_raw_data) { - printf("... %02x: id=%d raw=%d t=%d", len, id, raw, t); - if (len > 9) + printf("... %02x: id=%08x raw=%d t=%u", len, id, raw, t); + if (q >= 0) printf(" q=%d", q); printf("\n"); } @@ -454,6 +651,7 @@ static int parse_packet(byte *reply) points++; } +end: return points; } @@ -480,18 +678,28 @@ static void set_clock(void) /*** Main ***/ +static sigset_t term_sigs; static volatile sig_atomic_t want_shutdown; -static void sigterm_handler(int sig __attribute__((unused))) +static void sigterm_handler(int sig UNUSED) { want_shutdown = 1; } +static void interruptible_msleep(int ms) +{ + sigprocmask(SIG_UNBLOCK, &term_sigs, NULL); + my_msleep(ms); + sigprocmask(SIG_BLOCK, &term_sigs, NULL); +} + static const struct option long_options[] = { { "debug", 0, NULL, 'd' }, { "log-dir", 1, NULL, 'l' }, + { "no-fork", 0, NULL, 'n' }, { "debug-packets", 0, NULL, 'p' }, { "debug-raw", 0, NULL, 'r' }, + { "version", 0, NULL, 'V' }, { NULL, 0, NULL, 0 }, }; @@ -503,8 +711,10 @@ Usage: arexxd \n\ Options:\n\ -d, --debug Debug mode (no chdir, no fork, no syslog)\n\ -l, --log-dir= Directory where all received data should be stored\n\ +-n, --no-fork Do not fork\n\ -p, --debug-packets Log all packets sent and received\n\ -r, --debug-raw Log conversion from raw values\n\ +-V, --version Show daemon version\n\ "); exit(1); } @@ -512,7 +722,7 @@ Options:\n\ int main(int argc, char **argv) { int opt; - while ((opt = getopt_long(argc, argv, "dl:pr", long_options, NULL)) >= 0) + while ((opt = getopt_long(argc, argv, "dl:nprV", long_options, NULL)) >= 0) switch (opt) { case 'd': debug_mode++; @@ -520,12 +730,19 @@ int main(int argc, char **argv) case 'l': log_dir = optarg; break; + case 'n': + no_fork++; + break; case 'p': debug_packets++; break; case 'r': debug_raw_data++; break; + case 'V': + printf("arexxd " AREXXD_VERSION "\n"); + printf("(c) 2011-2018 Martin Mares \n"); + return 0; default: usage(); } @@ -535,7 +752,6 @@ int main(int argc, char **argv) int err; if (err = libusb_init(&usb_ctxt)) die("Cannot initialize libusb: error %d", err); - // libusb_set_debug(usb_ctxt, 3); if (!debug_mode) { if (chdir(log_dir) < 0) @@ -547,20 +763,25 @@ int main(int argc, char **argv) setlinebuf(stdout); } openlog("arexxd", LOG_NDELAY, LOG_DAEMON); - pid_t pid = fork(); - if (pid < 0) - die("fork() failed: %m"); - if (pid) - return 0; + if (!no_fork) { + pid_t pid = fork(); + if (pid < 0) + die("fork() failed: %m"); + if (pid) + return 0; + } setsid(); use_syslog = 1; } +#ifdef LOG_TO_MQTT + mqtt_init(); +#endif + struct sigaction sa = { .sa_handler = sigterm_handler }; sigaction(SIGTERM, &sa, NULL); sigaction(SIGINT, &sa, NULL); - sigset_t term_sigs; sigemptyset(&term_sigs); sigaddset(&term_sigs, SIGTERM); sigaddset(&term_sigs, SIGINT); @@ -573,7 +794,7 @@ int main(int argc, char **argv) inited = 1; log_error("Data logger not connected, waiting until it appears"); } - sleep(30); + interruptible_msleep(30000); continue; } log_info("Listening"); @@ -605,17 +826,17 @@ int main(int argc, char **argv) want_sleep = 1; if (err > 0 && parse_packet(reply)) want_sleep = 0; - sigprocmask(SIG_UNBLOCK, &term_sigs, NULL); if (want_sleep) { - sleep(4); + interruptible_msleep(4000); want_stats = 1; - } - sigprocmask(SIG_BLOCK, &term_sigs, NULL); + } else + interruptible_msleep(5); } log_info("Disconnecting data logger"); release_device(); inited = 0; + interruptible_msleep(10000); } log_info("Terminated");