/*
* A BSB frame logger
*
- * (c) 2020 Martin Mares <mj@ucw.cz>
+ * (c) 2020--2022 Martin Mares <mj@ucw.cz>
*/
#include <ucw/lib.h>
#include <ucw/fastbuf.h>
#include <ucw/log.h>
#include <ucw/opt.h>
+#include <ucw/string.h>
#include <fcntl.h>
#include <stdio.h>
#include <curl/curl.h>
#include <mosquitto.h>
+#define LOG_DIR "/var/log/bsb"
+
/*** MQTT ***/
static struct mosquitto *mosq;
-static bool mqtt_connected;
static void mqtt_error(const char *operation, int err, bool teardown)
{
msg(L_ERROR, "Mosquitto: %s failed: error %d", operation, err);
-
- if (teardown) {
- mosquitto_destroy(mosq);
- mosq = NULL;
- mqtt_connected = false;
- } else if (err == MOSQ_ERR_NO_CONN || err == MOSQ_ERR_CONN_REFUSED || err == MOSQ_ERR_CONN_LOST) {
- mqtt_connected = false;
- }
}
static void mqtt_publish(const char *topic, const char *fmt, ...)
va_list args;
va_start(args, fmt);
- if (mqtt_connected) {
- char m[256];
- int l = vsnprintf(m, sizeof(m), fmt, args);
- int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, true);
- if (err != MOSQ_ERR_SUCCESS)
- mqtt_error("publish", err, false);
- }
+ char m[256];
+ int l = vsnprintf(m, sizeof(m), fmt, args);
+ int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, true);
+ if (err != MOSQ_ERR_SUCCESS)
+ mqtt_error("publish", err, false);
va_end(args);
}
// msg(L_INFO, "MQTT(%d): %s", level, message);
}
+static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
+{
+ if (!status) {
+ msg(L_DEBUG, "MQTT: Connection established, subscribing");
+ int err = mosquitto_subscribe(mosq, NULL, "bsb/#", 1);
+ if (err != MOSQ_ERR_SUCCESS) {
+ mqtt_error("subscribe", err, false);
+ return;
+ }
+ mqtt_publish("status/bsb-logger", "ok");
+ } else {
+ msg(L_DEBUG, "MQTT: Cannot connect");
+ }
+}
+
static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m);
-static bool mqtt_connect(void)
+static void mqtt_connect(void)
{
int err;
- if (mqtt_connected)
- return true;
-
- if (!mosq) {
- mosq = mosquitto_new("bsb-logger", 1, NULL);
- if (!mosq)
- die("Mosquitto: initialization failed");
-
- mosquitto_log_callback_set(mosq, mqtt_log_callback);
- mosquitto_message_callback_set(mosq, mqtt_msg_callback);
-
- err = mosquitto_will_set(mosq, "status/bsb-logger", 4, "dead", 0, true);
- if (err != MOSQ_ERR_SUCCESS) {
- mqtt_error("will_set", err, true);
- return false;
- }
+ mosquitto_lib_init();
- err = mosquitto_connect(mosq, "10.32.184.5", 1883, 60);
- if (err != MOSQ_ERR_SUCCESS) {
- mqtt_error("connect", err, true);
- return false;
- }
- } else {
- err = mosquitto_reconnect(mosq);
- if (err != MOSQ_ERR_SUCCESS) {
- mqtt_error("reconnect", err, false);
- return false;
- }
- }
+ mosq = mosquitto_new("bsb-logger", 1, NULL);
+ if (!mosq)
+ die("Mosquitto: initialization failed");
- err = mosquitto_subscribe(mosq, NULL, "bsb/#", 1);
- if (err != MOSQ_ERR_SUCCESS) {
- mqtt_error("subscribe", err, false);
- return false;
- }
+ mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
+ mosquitto_log_callback_set(mosq, mqtt_log_callback);
+ mosquitto_message_callback_set(mosq, mqtt_msg_callback);
- mqtt_connected = true;
+ if (mosquitto_will_set(mosq, "status/bsb-logger", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: unable to set will");
- mqtt_publish("status/bsb-logger", "ok");
+ if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: unable to set TLS parameters");
- return mqtt_connected;
+ if (mosquitto_connect_async(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: connect failed");
}
+static struct log_stream *packet_log, *stat_log;
+
static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
{
time_t now = time(NULL);
val[m->payloadlen] = 0;
msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
- static struct fastbuf *logfile;
- if (!logfile)
- logfile = bopen_file("/var/log/bsb-frames", O_WRONLY | O_CREAT | O_APPEND, NULL);
-
if (!strcmp(m->topic, "bsb/frame"))
- bprintf(logfile, "%jd %s\n", (uintmax_t) time(NULL), val);
-
- // FIXME: Log statistics
-
- bflush(logfile);
+ msg(L_INFO | packet_log->regnum, "%s", val);
+ else if (str_has_prefix(m->topic, "bsb/stats/"))
+ msg(L_INFO | stat_log->regnum, "%s:%s", m->topic + 10, val);
}
/*** Main ***/
if (!use_debug)
log_default_stream()->levels &= ~(1U << L_DEBUG);
- mosquitto_lib_init();
+ packet_log = log_new_file(LOG_DIR "/frames-%Y%m%d", 0);
+ packet_log->msgfmt = LSFMT_TIME;
+ stat_log = log_new_file(LOG_DIR "/stats-%Y%m%d", 0);
+ stat_log->msgfmt = LSFMT_TIME;
- for (;;) {
- if (!mqtt_connect()) {
- sleep(5);
- continue;
- }
+ mqtt_connect();
- int err = mosquitto_loop(mosq, 1000000, 1);
- if (err != MOSQ_ERR_SUCCESS)
- mqtt_error("loop", err, false);
- }
+ int err = mosquitto_loop_forever(mosq, 10000, 1);
+ if (err != MOSQ_ERR_SUCCESS)
+ mqtt_error("loop", err, false);
+
+ return 1;
}