From cb325627cd4da8b2472d24bd8e94440ec53f09fc Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Fri, 28 Feb 2020 18:27:03 +0100 Subject: [PATCH] BSB: Logger daemon --- bsb/logger/Makefile | 13 +++ bsb/logger/burrow-bsb-logger.c | 177 +++++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 bsb/logger/Makefile create mode 100644 bsb/logger/burrow-bsb-logger.c diff --git a/bsb/logger/Makefile b/bsb/logger/Makefile new file mode 100644 index 0000000..21495aa --- /dev/null +++ b/bsb/logger/Makefile @@ -0,0 +1,13 @@ +PC=pkg-config +UCW_CFLAGS := $(shell $(PC) --cflags libucw) +UCW_LIBS := $(shell $(PC) --libs libucw) + +CFLAGS=$(UCW_CFLAGS) $(CURL_CFLAGS) -std=gnu99 +LDFLAGS=$(UCW_LIBS) $(CURL_LIBS) -lmosquitto + +all: burrow-bsb-logger + +burrow-bsb-logger: burrow-bsb-logger.c + +install: all + install burrow-bsb-logger /usr/local/sbin/ diff --git a/bsb/logger/burrow-bsb-logger.c b/bsb/logger/burrow-bsb-logger.c new file mode 100644 index 0000000..b7a914f --- /dev/null +++ b/bsb/logger/burrow-bsb-logger.c @@ -0,0 +1,177 @@ +/* + * A BSB frame logger + * + * (c) 2020 Martin Mares + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +/*** MQTT ***/ + +static struct mosquitto *mosq; +static bool mqtt_connected; + +static void mqtt_error(const char *operation, int err, bool teardown) +{ + msg(L_ERROR, "Mosquitto: %s failed: error %d", operation, err); + + if (teardown) { + mosquitto_destroy(mosq); + mosq = NULL; + mqtt_connected = false; + } else if (err == MOSQ_ERR_NO_CONN || err == MOSQ_ERR_CONN_REFUSED || err == MOSQ_ERR_CONN_LOST) { + mqtt_connected = false; + } +} + +static void mqtt_publish(const char *topic, const char *fmt, ...) +{ + va_list args; + va_start(args, fmt); + + if (mqtt_connected) { + char m[256]; + int l = vsnprintf(m, sizeof(m), fmt, args); + int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, true); + if (err != MOSQ_ERR_SUCCESS) + mqtt_error("publish", err, false); + } + + va_end(args); +} + +static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message) +{ + // msg(L_INFO, "MQTT(%d): %s", level, message); +} + +static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m); + +static bool mqtt_connect(void) +{ + int err; + + if (mqtt_connected) + return true; + + if (!mosq) { + mosq = mosquitto_new("bsb-logger", 1, NULL); + if (!mosq) + die("Mosquitto: initialization failed"); + + mosquitto_log_callback_set(mosq, mqtt_log_callback); + mosquitto_message_callback_set(mosq, mqtt_msg_callback); + + err = mosquitto_will_set(mosq, "status/bsb-logger", 4, "dead", 0, true); + if (err != MOSQ_ERR_SUCCESS) { + mqtt_error("will_set", err, true); + return false; + } + + err = mosquitto_connect(mosq, "10.32.184.5", 1883, 60); + if (err != MOSQ_ERR_SUCCESS) { + mqtt_error("connect", err, true); + return false; + } + } else { + err = mosquitto_reconnect(mosq); + if (err != MOSQ_ERR_SUCCESS) { + mqtt_error("reconnect", err, false); + return false; + } + } + + err = mosquitto_subscribe(mosq, NULL, "bsb/#", 1); + if (err != MOSQ_ERR_SUCCESS) { + mqtt_error("subscribe", err, false); + return false; + } + + mqtt_connected = true; + + mqtt_publish("status/bsb-logger", "ok"); + + return mqtt_connected; +} + +static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m) +{ + time_t now = time(NULL); + char val[256]; + if (m->payloadlen >= sizeof(val) - 1) { + msg(L_ERROR, "Invalid value for topic %s", m->topic); + return; + } + memcpy(val, m->payload, m->payloadlen); + val[m->payloadlen] = 0; + msg(L_DEBUG, "MQTT < %s %s", m->topic, val); + + static struct fastbuf *logfile; + if (!logfile) + logfile = bopen_file("/var/log/bsb-frames", O_WRONLY | O_CREAT | O_APPEND, NULL); + + if (!strcmp(m->topic, "bsb/frame")) + bprintf(logfile, "%jd %s\n", (uintmax_t) time(NULL), val); + + // FIXME: Log statistics + + bflush(logfile); +} + +/*** Main ***/ + +static int use_daemon; +static int use_debug; + +static struct opt_section options = { + OPT_ITEMS { + OPT_HELP("A daemon for logging BSB frames received via MQTT"), + OPT_HELP(""), + OPT_HELP("Options:"), + OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"), + OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"), + OPT_HELP_OPTION, + OPT_CONF_OPTIONS, + OPT_END + } +}; + +int main(int argc UNUSED, char **argv) +{ + log_init(argv[0]); + opt_parse(&options, argv+1); + + if (use_daemon) { + struct log_stream *ls = log_new_syslog("daemon", LOG_PID); + log_set_default_stream(ls); + } + if (!use_debug) + log_default_stream()->levels &= ~(1U << L_DEBUG); + + mosquitto_lib_init(); + + for (;;) { + if (!mqtt_connect()) { + sleep(5); + continue; + } + + int err = mosquitto_loop(mosq, 1000000, 1); + if (err != MOSQ_ERR_SUCCESS) + mqtt_error("loop", err, false); + } +} -- 2.39.2