]> mj.ucw.cz Git - home-hw.git/blob - bsb/logger/burrow-bsb-logger.c
8a2d4129c10061a8459558ab09060012a1b98358
[home-hw.git] / bsb / logger / burrow-bsb-logger.c
1 /*
2  *      A BSB frame logger
3  *
4  *      (c) 2020--2022 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/fastbuf.h>
9 #include <ucw/log.h>
10 #include <ucw/opt.h>
11 #include <ucw/string.h>
12
13 #include <fcntl.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <syslog.h>
18 #include <time.h>
19 #include <unistd.h>
20
21 #include <curl/curl.h>
22 #include <mosquitto.h>
23
24 #define LOG_DIR "/var/log"
25
26 /*** MQTT ***/
27
28 static struct mosquitto *mosq;
29
30 static void mqtt_error(const char *operation, int err, bool teardown)
31 {
32         msg(L_ERROR, "Mosquitto: %s failed: error %d", operation, err);
33 }
34
35 static void mqtt_publish(const char *topic, const char *fmt, ...)
36 {
37         va_list args;
38         va_start(args, fmt);
39
40         char m[256];
41         int l = vsnprintf(m, sizeof(m), fmt, args);
42         int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, true);
43         if (err != MOSQ_ERR_SUCCESS)
44                 mqtt_error("publish", err, false);
45
46         va_end(args);
47 }
48
49 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
50 {
51         // msg(L_INFO, "MQTT(%d): %s", level, message);
52 }
53
54 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
55 {
56         if (!status) {
57                 msg(L_DEBUG, "MQTT: Connection established, subscribing");
58                 int err = mosquitto_subscribe(mosq, NULL, "bsb/#", 1);
59                 if (err != MOSQ_ERR_SUCCESS) {
60                         mqtt_error("subscribe", err, false);
61                         return;
62                 }
63                 mqtt_publish("status/bsb-logger", "ok");
64         } else {
65                 msg(L_DEBUG, "MQTT: Cannot connect");
66         }
67 }
68
69 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m);
70
71 static void mqtt_connect(void)
72 {
73         int err;
74
75         mosquitto_lib_init();
76
77         mosq = mosquitto_new("bsb-logger", 1, NULL);
78         if (!mosq)
79                 die("Mosquitto: initialization failed");
80
81         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
82         mosquitto_log_callback_set(mosq, mqtt_log_callback);
83         mosquitto_message_callback_set(mosq, mqtt_msg_callback);
84
85         if (mosquitto_will_set(mosq, "status/bsb-logger", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
86                 die("Mosquitto: unable to set will");
87
88         if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS)
89                 die("Mosquitto: unable to set TLS parameters");
90
91         if (mosquitto_connect_async(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS)
92                 die("Mosquitto: connect failed");
93 }
94
95 static struct log_stream *packet_log, *stat_log;
96
97 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
98 {
99         time_t now = time(NULL);
100         char val[256];
101         if (m->payloadlen >= sizeof(val) - 1) {
102                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
103                 return;
104         }
105         memcpy(val, m->payload, m->payloadlen);
106         val[m->payloadlen] = 0;
107         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
108
109         if (!strcmp(m->topic, "bsb/frame"))
110                 msg(L_INFO | packet_log->regnum, "%s", val);
111         else if (str_has_prefix(m->topic, "bsb/stats/"))
112                 msg(L_INFO | stat_log->regnum, "%s:%s", m->topic + 10, val);
113 }
114
115 /*** Main ***/
116
117 static int use_daemon;
118 static int use_debug;
119
120 static struct opt_section options = {
121         OPT_ITEMS {
122                 OPT_HELP("A daemon for logging BSB frames received via MQTT"),
123                 OPT_HELP(""),
124                 OPT_HELP("Options:"),
125                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
126                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
127                 OPT_HELP_OPTION,
128                 OPT_CONF_OPTIONS,
129                 OPT_END
130         }
131 };
132
133 int main(int argc UNUSED, char **argv)
134 {
135         log_init(argv[0]);
136         opt_parse(&options, argv+1);
137
138         if (use_daemon) {
139                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
140                 log_set_default_stream(ls);
141         }
142         if (!use_debug)
143                 log_default_stream()->levels &= ~(1U << L_DEBUG);
144
145         packet_log = log_new_file(LOG_DIR "/frames-%Y%m%d", 0);
146         packet_log->msgfmt = LSFMT_TIME;
147         stat_log = log_new_file(LOG_DIR "/stats-%Y%m%d", 0);
148         stat_log->msgfmt = LSFMT_TIME;
149
150         mqtt_connect();
151
152         int err = mosquitto_loop_forever(mosq, 10000, 1);
153         if (err != MOSQ_ERR_SUCCESS)
154                 mqtt_error("loop", err, false);
155
156         return 1;
157 }