/*
* Linux Interfece for Arexx Data Loggers
*
- * (c) 2011-2012 Martin Mares <mj@ucw.cz>
+ * (c) 2011-2020 Martin Mares <mj@ucw.cz>
*/
#include <stdio.h>
#include <signal.h>
#include <sys/stat.h>
#include <libusb-1.0/libusb.h>
-#include <rrd.h>
#define DEFAULT_LOG_DIR "/var/log/arexxd"
+/*
+ * Data points received from the logger are sometimes corrupted by noise.
+ * This effects not only the measured values, but also sensor IDs and timestamps.
+ * Since rrdtool cannot skip back in time, a random timestamp in the future can
+ * cause all further measurements to be dropped. To minimize impact of these
+ * problems, we drop data points which are too far in the past or in the future.
+ *
+ * Furthermore, you can ignore data from unrecognized sensors, i.e., those
+ * which are not handled by correct_point().
+ */
+#define MAX_PAST_TIME 30*86400
+#define MAX_FUTURE_TIME 300
+#define IGNORE_UNKNOWN_SENSORS
+
+#undef LOG_TO_RRD
+#define LOG_TO_MQTT
+
typedef unsigned char byte;
+typedef unsigned int uint;
static libusb_context *usb_ctxt;
static libusb_device_handle *devh;
static int debug_packets;
static int debug_raw_data;
static char *log_dir = DEFAULT_LOG_DIR;
+static int no_fork;
+
+#define UNUSED __attribute__((unused))
+
+static int data_point_counter; // Since last log message
+static time_t packet_rx_time;
static void die(char *fmt, ...)
{
va_end(args);
}
+/*** MQTT interface ***/
+
+#ifdef LOG_TO_MQTT
+
+#include <mosquitto.h>
+
+static struct mosquitto *mosq;
+
+static void mqtt_publish(const char *topic, const char *fmt, ...)
+{
+ va_list args;
+ va_start(args, fmt);
+ char m[256];
+ int l = vsnprintf(m, sizeof(m), fmt, args);
+ if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
+ log_error("Mosquitto: publish failed");
+ va_end(args);
+}
+
+static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
+{
+ if (!status)
+ mqtt_publish("status/arexxd", "ok");
+}
+
+static void mqtt_init(void)
+{
+ mosquitto_lib_init();
+ mosq = mosquitto_new("arexxd", 1, NULL);
+ if (!mosq)
+ die("Mosquitto: initialization failed");
+
+ if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: unable to set TLS parameters");
+
+ if (mosquitto_will_set(mosq, "status/arexxd", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: unable to set will");
+
+ mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
+
+ if (mosquitto_connect(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: connect failed");
+
+ if (mosquitto_loop_start(mosq))
+ die("Mosquitto: cannot start service thread");
+}
+
+static void mqtt_point(time_t t, const char *name, double val, char *unit UNUSED)
+{
+ // We do not feed past data to MQTT (so MAX_PAST_TIME is stronger for us)
+ if (t < packet_rx_time - 30)
+ return;
+
+ char topic[64];
+ snprintf(topic, sizeof(topic), "burrow/temp/%s", name);
+ mqtt_publish(topic, "%.3f %lld", val, (long long) t);
+}
+
+#endif
+
/*** RRD interface ***/
+#ifdef LOG_TO_RRD
+
+#include <rrd.h>
+
#define MAX_ARGS 20
#define MAX_ARG_SIZE 1024
rrd_create(arg_cnt, arg_ptr);
if (rrd_test_error()) {
log_error("rrd_create on %s failed: %s", rr_name, rrd_get_error());
+ rrd_clear_error();
return;
}
}
arg_push(rr_name);
arg_push("%d:%f", t, val);
rrd_update(arg_cnt, arg_ptr);
- if (rrd_test_error())
+ if (rrd_test_error()) {
log_error("rrd_update on %s failed: %s", rr_name, rrd_get_error());
+ rrd_clear_error();
+ }
}
+#endif
+
/*** Transforms ***/
#define TIME_OFFSET 946681200 // Timestamp of 2000-01-01 00:00:00
-static int data_point_counter; // Since last log message
-
-static double correct_point(int id, double val, const char **name)
+static double correct_point(uint id, double val, const char **name)
{
/*
* Manually calculated corrections and renames for my sensors.
*/
switch (id) {
case 10415:
- *name = "ursarium";
+ *name = "terarium";
return val - 0.93;
case 10707:
- *name = "balcony";
+ *name = "catarium";
return val - 0.71;
+ case 11699:
+ *name = "garage";
+ return val;
case 19246:
- *name = "catarium";
+ *name = "ursarium";
return val + 0.49;
case 19247:
- *name = "catarium-rh";
+ *name = "ursarium-rh";
return val;
case 12133:
- *name = "outside";
+ *name = "aquarium";
return val + 0.44;
default:
+#ifdef IGNORE_UNKNOWN_SENSORS
+ *name = NULL;
+#endif
return val;
}
}
-static void cooked_point(time_t t, int id, double val, char *unit, int q)
+static void cooked_point(time_t t, uint id, double val, char *unit, int q)
{
char namebuf[16];
- snprintf(namebuf, sizeof(namebuf), "%d", id);
+ snprintf(namebuf, sizeof(namebuf), "%u", id);
const char *name = namebuf;
double val2 = correct_point(id, val, &name);
printf("== %s id=%d name=%s val=%.3f val2=%.3f unit=%s q=%d\n", tbuf, id, name, val, val2, unit, q);
}
+ if (!name) {
+ log_error("Ignored data from unknown sensor %d", id);
+ return;
+ }
+ if (t < packet_rx_time - MAX_PAST_TIME) {
+ log_error("Data point from sensor %d too far in the past (%d sec)", packet_rx_time - t);
+ return;
+ }
+ if (t > packet_rx_time + MAX_FUTURE_TIME) {
+ log_error("Data point from sensor %d too far in the future (%d sec)", t - packet_rx_time);
+ return;
+ }
+
data_point_counter++;
+#ifdef LOG_TO_RRD
rrd_point(t, name, val2, unit);
+#endif
+#ifdef LOG_TO_MQTT
+ mqtt_point(t, name, val2, unit);
+#endif
}
-static void raw_point(int t, int id, int raw, int q)
+static void raw_point(uint t, uint id, int raw, int q)
{
/*
* The binary blob provided by Arexx contains an embedded XML fragment
double z = raw;
double hi, lo;
char *unit;
- int idhi = id & 0xf000;
+ uint idhi = id & 0xfffff000;
+ uint idext = id & 0xf0000000;
if (idhi == 0x1000) {
z = 0.02*z - 273.15;
lo = -200;
hi = 600;
unit = "C";
- } else if (idhi == 0x2000) {
+ } else if (idhi == 0x2000 || idext == 0x20000000) {
if (raw >= 0x8000)
z -= 0x10000;
z /= 128;
unit = "ppm";
}
} else {
- log_error("Unknown sensor type 0x%04x", id);
+ log_error("Unknown sensor type 0x%08x", id);
return;
}
return;
}
- cooked_point(t + TIME_OFFSET, id, z, unit, q);
+ cooked_point(t + TIME_OFFSET, id & 0x0fffffff, z, unit, q);
}
/*** USB interface ***/
+static int rx_endpoint, tx_endpoint;
+
+static int parse_descriptors(libusb_device *dev)
+{
+ int err;
+ struct libusb_config_descriptor *desc;
+
+ if (err = libusb_get_active_config_descriptor(dev, &desc)) {
+ log_error("libusb_get_config_descriptor failed: error %d", err);
+ return 0;
+ }
+ if (desc->bNumInterfaces != 1) {
+ log_error("Unexpected number of interfaces: %d", desc->bNumInterfaces);
+ goto failed;
+ }
+
+ const struct libusb_interface *iface = &desc->interface[0];
+ if (iface->num_altsetting != 1) {
+ log_error("Unexpected number of alternate interface settings: %d", iface->num_altsetting);
+ goto failed;
+ }
+
+ const struct libusb_interface_descriptor *ifd = &iface->altsetting[0];
+ if (ifd->bNumEndpoints != 2) {
+ log_error("Unexpected number of endpoints: %d", ifd->bNumEndpoints);
+ goto failed;
+ }
+
+ rx_endpoint = tx_endpoint = -1;
+ for (int i=0; i<2; i++) {
+ const struct libusb_endpoint_descriptor *epd = &ifd->endpoint[i];
+ if (epd->bEndpointAddress & 0x80)
+ rx_endpoint = epd->bEndpointAddress;
+ else
+ tx_endpoint = epd->bEndpointAddress;
+ }
+ if (rx_endpoint < 0 || tx_endpoint < 0) {
+ log_error("Failed to identify endpoints");
+ goto failed;
+ }
+
+ log_pkt("Found endpoints: rx==%02x tx=%02x\n", rx_endpoint, tx_endpoint);
+ libusb_free_config_descriptor(desc);
+ return 1;
+
+failed:
+ libusb_free_config_descriptor(desc);
+ return 0;
+}
+
static int find_device(void)
{
libusb_device **devlist;
if (!libusb_get_device_descriptor(dev, &desc)) {
if (desc.idVendor == 0x0451 && desc.idProduct == 0x3211) {
log_info("Arexx data logger found at usb%d.%d", libusb_get_bus_number(dev), libusb_get_device_address(dev));
+ if (!parse_descriptors(dev))
+ continue;
int err;
if (err = libusb_open(dev, &devh)) {
log_error("libusb_open() failed: error %d", err);
static void release_device(void)
{
+ libusb_release_interface(devh, 0);
+ libusb_reset_device(devh);
libusb_close(devh);
devh = NULL;
}
}
}
+static void my_msleep(int ms)
+{
+ struct timespec ts = { .tv_sec = ms/1000, .tv_nsec = (ms%1000) * 1000000 };
+ nanosleep(&ts, NULL);
+}
+
static int send_and_receive(byte *req, byte *reply)
{
if (debug_packets) {
}
int err, transferred;
- if (err = libusb_bulk_transfer(devh, 0x01, req, 64, &transferred, 200)) {
+ if (err = libusb_bulk_transfer(devh, tx_endpoint, req, 64, &transferred, 200)) {
if (err == LIBUSB_ERROR_TIMEOUT) {
log_pkt(">> xmit timed out\n");
return 0;
log_pkt(">> xmit %d bytes\n", transferred);
dump_packet(req);
}
- if (err = libusb_bulk_transfer(devh, 0x81, reply, 64, &transferred, 200)) {
+ my_msleep(1);
+ if (err = libusb_bulk_transfer(devh, rx_endpoint, reply, 64, &transferred, 200)) {
if (err == LIBUSB_ERROR_TIMEOUT) {
log_pkt("<< recv timed out\n");
return 0;
log_error("Receive error: %d", err);
return err;
}
+ packet_rx_time = time(NULL);
if (debug_packets)
log_pkt("<< recv %d bytes\n", transferred);
while (transferred < 64)
int len = p[0];
if (!len || len == 0xff)
break;
- if (len < 9 || len > 10) {
- log_error("Unknown tuple length %02x", len);
- break;
- }
if (pos + len > 64) {
log_error("Tuple truncated");
break;
}
- int id = get_le16(p+1);
- int raw = get_be16(p+3);
- int t = get_le32(p+5);
- int q = (len > 9) ? p[9] : -1;
+
+ uint t, id;
+ int raw, q;
+ switch (len) {
+ case 9:
+ case 10:
+ id = get_le16(p+1);
+ raw = get_be16(p+3);
+ t = get_le32(p+5);
+ q = (len > 9) ? p[9] : -1;
+ break;
+ case 12:
+ id = get_le32(p+1);
+ raw = get_be16(p+5);
+ t = get_le32(p+7);
+ q = p[11];
+ break;
+ default:
+ log_error("Unknown tuple length %02x", len);
+ goto end;
+ }
+
if (debug_raw_data) {
- printf("... %02x: id=%d raw=%d t=%d", len, id, raw, t);
- if (len > 9)
+ printf("... %02x: id=%08x raw=%d t=%u", len, id, raw, t);
+ if (q >= 0)
printf(" q=%d", q);
printf("\n");
}
points++;
}
+end:
return points;
}
static sigset_t term_sigs;
static volatile sig_atomic_t want_shutdown;
-static void sigterm_handler(int sig __attribute__((unused)))
+static void sigterm_handler(int sig UNUSED)
{
want_shutdown = 1;
}
-static void interruptible_sleep(int seconds)
+static void interruptible_msleep(int ms)
{
sigprocmask(SIG_UNBLOCK, &term_sigs, NULL);
- sleep(seconds);
+ my_msleep(ms);
sigprocmask(SIG_BLOCK, &term_sigs, NULL);
}
static const struct option long_options[] = {
{ "debug", 0, NULL, 'd' },
{ "log-dir", 1, NULL, 'l' },
+ { "no-fork", 0, NULL, 'n' },
{ "debug-packets", 0, NULL, 'p' },
{ "debug-raw", 0, NULL, 'r' },
+ { "version", 0, NULL, 'V' },
{ NULL, 0, NULL, 0 },
};
Options:\n\
-d, --debug Debug mode (no chdir, no fork, no syslog)\n\
-l, --log-dir=<dir> Directory where all received data should be stored\n\
+-n, --no-fork Do not fork\n\
-p, --debug-packets Log all packets sent and received\n\
-r, --debug-raw Log conversion from raw values\n\
+-V, --version Show daemon version\n\
");
exit(1);
}
int main(int argc, char **argv)
{
int opt;
- while ((opt = getopt_long(argc, argv, "dl:pr", long_options, NULL)) >= 0)
+ while ((opt = getopt_long(argc, argv, "dl:nprV", long_options, NULL)) >= 0)
switch (opt) {
case 'd':
debug_mode++;
case 'l':
log_dir = optarg;
break;
+ case 'n':
+ no_fork++;
+ break;
case 'p':
debug_packets++;
break;
case 'r':
debug_raw_data++;
break;
+ case 'V':
+ printf("arexxd " AREXXD_VERSION "\n");
+ printf("(c) 2011-2018 Martin Mares <mj@ucw.cz>\n");
+ return 0;
default:
usage();
}
int err;
if (err = libusb_init(&usb_ctxt))
die("Cannot initialize libusb: error %d", err);
- // libusb_set_debug(usb_ctxt, 3);
if (!debug_mode) {
if (chdir(log_dir) < 0)
setlinebuf(stdout);
}
openlog("arexxd", LOG_NDELAY, LOG_DAEMON);
- pid_t pid = fork();
- if (pid < 0)
- die("fork() failed: %m");
- if (pid)
- return 0;
+ if (!no_fork) {
+ pid_t pid = fork();
+ if (pid < 0)
+ die("fork() failed: %m");
+ if (pid)
+ return 0;
+ }
setsid();
use_syslog = 1;
}
+#ifdef LOG_TO_MQTT
+ mqtt_init();
+#endif
+
struct sigaction sa = { .sa_handler = sigterm_handler };
sigaction(SIGTERM, &sa, NULL);
sigaction(SIGINT, &sa, NULL);
inited = 1;
log_error("Data logger not connected, waiting until it appears");
}
- interruptible_sleep(30);
+ interruptible_msleep(30000);
continue;
}
log_info("Listening");
if (err > 0 && parse_packet(reply))
want_sleep = 0;
if (want_sleep) {
- interruptible_sleep(4);
+ interruptible_msleep(4000);
want_stats = 1;
- }
+ } else
+ interruptible_msleep(5);
}
log_info("Disconnecting data logger");
release_device();
inited = 0;
- interruptible_sleep(10);
+ interruptible_msleep(10000);
}
log_info("Terminated");