X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=arexxd.c;h=e083c52bce471caf59bca4ddc9f2b324fa4ba60a;hb=HEAD;hp=52cb17c71401e5cff4332e328790b54737a102a0;hpb=ab67ccef6257c019d751b9055c30549abaa7568c;p=arexx.git diff --git a/arexxd.c b/arexxd.c index 52cb17c..e083c52 100644 --- a/arexxd.c +++ b/arexxd.c @@ -1,7 +1,7 @@ /* * Linux Interfece for Arexx Data Loggers * - * (c) 2011-2012 Martin Mares + * (c) 2011-2020 Martin Mares */ #include @@ -17,7 +17,6 @@ #include #include #include -#include #define DEFAULT_LOG_DIR "/var/log/arexxd" @@ -35,7 +34,11 @@ #define MAX_FUTURE_TIME 300 #define IGNORE_UNKNOWN_SENSORS +#undef LOG_TO_RRD +#define LOG_TO_MQTT + typedef unsigned char byte; +typedef unsigned int uint; static libusb_context *usb_ctxt; static libusb_device_handle *devh; @@ -43,8 +46,13 @@ static int use_syslog; static int debug_mode; static int debug_packets; static int debug_raw_data; -static int debug_usb; static char *log_dir = DEFAULT_LOG_DIR; +static int no_fork; + +#define UNUSED __attribute__((unused)) + +static int data_point_counter; // Since last log message +static time_t packet_rx_time; static void die(char *fmt, ...) { @@ -96,8 +104,72 @@ static void log_pkt(char *fmt, ...) va_end(args); } +/*** MQTT interface ***/ + +#ifdef LOG_TO_MQTT + +#include + +static struct mosquitto *mosq; + +static void mqtt_publish(const char *topic, const char *fmt, ...) +{ + va_list args; + va_start(args, fmt); + char m[256]; + int l = vsnprintf(m, sizeof(m), fmt, args); + if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS) + log_error("Mosquitto: publish failed"); + va_end(args); +} + +static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status) +{ + if (!status) + mqtt_publish("status/arexxd", "ok"); +} + +static void mqtt_init(void) +{ + mosquitto_lib_init(); + mosq = mosquitto_new("arexxd", 1, NULL); + if (!mosq) + die("Mosquitto: initialization failed"); + + if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS) + die("Mosquitto: unable to set TLS parameters"); + + if (mosquitto_will_set(mosq, "status/arexxd", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS) + die("Mosquitto: unable to set will"); + + mosquitto_connect_callback_set(mosq, mqtt_conn_callback); + + if (mosquitto_connect(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS) + die("Mosquitto: connect failed"); + + if (mosquitto_loop_start(mosq)) + die("Mosquitto: cannot start service thread"); +} + +static void mqtt_point(time_t t, const char *name, double val, char *unit UNUSED) +{ + // We do not feed past data to MQTT (so MAX_PAST_TIME is stronger for us) + if (t < packet_rx_time - 30) + return; + + char topic[64]; + snprintf(topic, sizeof(topic), "burrow/temp/%s", name); + mqtt_publish(topic, "%.3f %lld", val, (long long) t); +} + +#endif + /*** RRD interface ***/ +#ifdef LOG_TO_RRD + +#include + #define MAX_ARGS 20 #define MAX_ARG_SIZE 1024 @@ -170,14 +242,13 @@ static void rrd_point(time_t t, const char *name, double val, char *unit) } } +#endif + /*** Transforms ***/ #define TIME_OFFSET 946681200 // Timestamp of 2000-01-01 00:00:00 -static int data_point_counter; // Since last log message -static time_t packet_rx_time; - -static double correct_point(int id, double val, const char **name) +static double correct_point(uint id, double val, const char **name) { /* * Manually calculated corrections and renames for my sensors. @@ -185,19 +256,19 @@ static double correct_point(int id, double val, const char **name) */ switch (id) { case 10415: - *name = "ursarium"; + *name = "terarium"; return val - 0.93; case 10707: - *name = "balcony"; + *name = "catarium"; return val - 0.71; case 11699: - *name = "outside"; + *name = "garage"; return val; case 19246: - *name = "catarium"; + *name = "ursarium"; return val + 0.49; case 19247: - *name = "catarium-rh"; + *name = "ursarium-rh"; return val; case 12133: *name = "aquarium"; @@ -210,10 +281,10 @@ static double correct_point(int id, double val, const char **name) } } -static void cooked_point(time_t t, int id, double val, char *unit, int q) +static void cooked_point(time_t t, uint id, double val, char *unit, int q) { char namebuf[16]; - snprintf(namebuf, sizeof(namebuf), "%d", id); + snprintf(namebuf, sizeof(namebuf), "%u", id); const char *name = namebuf; double val2 = correct_point(id, val, &name); @@ -240,10 +311,15 @@ static void cooked_point(time_t t, int id, double val, char *unit, int q) } data_point_counter++; +#ifdef LOG_TO_RRD rrd_point(t, name, val2, unit); +#endif +#ifdef LOG_TO_MQTT + mqtt_point(t, name, val2, unit); +#endif } -static void raw_point(int t, int id, int raw, int q) +static void raw_point(uint t, uint id, int raw, int q) { /* * The binary blob provided by Arexx contains an embedded XML fragment @@ -278,14 +354,15 @@ static void raw_point(int t, int id, int raw, int q) double z = raw; double hi, lo; char *unit; - int idhi = id & 0xf000; + uint idhi = id & 0xfffff000; + uint idext = id & 0xf0000000; if (idhi == 0x1000) { z = 0.02*z - 273.15; lo = -200; hi = 600; unit = "C"; - } else if (idhi == 0x2000) { + } else if (idhi == 0x2000 || idext == 0x20000000) { if (raw >= 0x8000) z -= 0x10000; z /= 128; @@ -323,7 +400,7 @@ static void raw_point(int t, int id, int raw, int q) unit = "ppm"; } } else { - log_error("Unknown sensor type 0x%04x", id); + log_error("Unknown sensor type 0x%08x", id); return; } @@ -332,7 +409,7 @@ static void raw_point(int t, int id, int raw, int q) return; } - cooked_point(t + TIME_OFFSET, id, z, unit, q); + cooked_point(t + TIME_OFFSET, id & 0x0fffffff, z, unit, q); } /*** USB interface ***/ @@ -537,21 +614,35 @@ static int parse_packet(byte *reply) int len = p[0]; if (!len || len == 0xff) break; - if (len < 9 || len > 10) { - log_error("Unknown tuple length %02x", len); - break; - } if (pos + len > 64) { log_error("Tuple truncated"); break; } - int id = get_le16(p+1); - int raw = get_be16(p+3); - int t = get_le32(p+5); - int q = (len > 9) ? p[9] : -1; + + uint t, id; + int raw, q; + switch (len) { + case 9: + case 10: + id = get_le16(p+1); + raw = get_be16(p+3); + t = get_le32(p+5); + q = (len > 9) ? p[9] : -1; + break; + case 12: + id = get_le32(p+1); + raw = get_be16(p+5); + t = get_le32(p+7); + q = p[11]; + break; + default: + log_error("Unknown tuple length %02x", len); + goto end; + } + if (debug_raw_data) { - printf("... %02x: id=%d raw=%d t=%d", len, id, raw, t); - if (len > 9) + printf("... %02x: id=%08x raw=%d t=%u", len, id, raw, t); + if (q >= 0) printf(" q=%d", q); printf("\n"); } @@ -560,6 +651,7 @@ static int parse_packet(byte *reply) points++; } +end: return points; } @@ -589,7 +681,7 @@ static void set_clock(void) static sigset_t term_sigs; static volatile sig_atomic_t want_shutdown; -static void sigterm_handler(int sig __attribute__((unused))) +static void sigterm_handler(int sig UNUSED) { want_shutdown = 1; } @@ -604,6 +696,7 @@ static void interruptible_msleep(int ms) static const struct option long_options[] = { { "debug", 0, NULL, 'd' }, { "log-dir", 1, NULL, 'l' }, + { "no-fork", 0, NULL, 'n' }, { "debug-packets", 0, NULL, 'p' }, { "debug-raw", 0, NULL, 'r' }, { "version", 0, NULL, 'V' }, @@ -618,9 +711,9 @@ Usage: arexxd \n\ Options:\n\ -d, --debug Debug mode (no chdir, no fork, no syslog)\n\ -l, --log-dir= Directory where all received data should be stored\n\ +-n, --no-fork Do not fork\n\ -p, --debug-packets Log all packets sent and received\n\ -r, --debug-raw Log conversion from raw values\n\ --u, --debug-usb Enable libusb debug messages (to stdout/stderr)\n\ -V, --version Show daemon version\n\ "); exit(1); @@ -629,7 +722,7 @@ Options:\n\ int main(int argc, char **argv) { int opt; - while ((opt = getopt_long(argc, argv, "dl:pruV", long_options, NULL)) >= 0) + while ((opt = getopt_long(argc, argv, "dl:nprV", long_options, NULL)) >= 0) switch (opt) { case 'd': debug_mode++; @@ -637,18 +730,18 @@ int main(int argc, char **argv) case 'l': log_dir = optarg; break; + case 'n': + no_fork++; + break; case 'p': debug_packets++; break; case 'r': debug_raw_data++; break; - case 'u': - debug_usb++; - break; case 'V': printf("arexxd " AREXXD_VERSION "\n"); - printf("(c) 2011-2012 Martin Mares \n"); + printf("(c) 2011-2018 Martin Mares \n"); return 0; default: usage(); @@ -659,8 +752,6 @@ int main(int argc, char **argv) int err; if (err = libusb_init(&usb_ctxt)) die("Cannot initialize libusb: error %d", err); - if (debug_usb) - libusb_set_debug(usb_ctxt, 3); if (!debug_mode) { if (chdir(log_dir) < 0) @@ -672,15 +763,21 @@ int main(int argc, char **argv) setlinebuf(stdout); } openlog("arexxd", LOG_NDELAY, LOG_DAEMON); - pid_t pid = fork(); - if (pid < 0) - die("fork() failed: %m"); - if (pid) - return 0; + if (!no_fork) { + pid_t pid = fork(); + if (pid < 0) + die("fork() failed: %m"); + if (pid) + return 0; + } setsid(); use_syslog = 1; } +#ifdef LOG_TO_MQTT + mqtt_init(); +#endif + struct sigaction sa = { .sa_handler = sigterm_handler }; sigaction(SIGTERM, &sa, NULL); sigaction(SIGINT, &sa, NULL);