]> mj.ucw.cz Git - home-hw.git/blob - bsb/logger/burrow-bsb-logger.c
BSB logger: Different way to use libmosquitto
[home-hw.git] / bsb / logger / burrow-bsb-logger.c
1 /*
2  *      A BSB frame logger
3  *
4  *      (c) 2020 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/fastbuf.h>
9 #include <ucw/log.h>
10 #include <ucw/opt.h>
11
12 #include <fcntl.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <syslog.h>
17 #include <time.h>
18 #include <unistd.h>
19
20 #include <curl/curl.h>
21 #include <mosquitto.h>
22
23 /*** MQTT ***/
24
25 static struct mosquitto *mosq;
26
27 static void mqtt_error(const char *operation, int err, bool teardown)
28 {
29         msg(L_ERROR, "Mosquitto: %s failed: error %d", operation, err);
30 }
31
32 static void mqtt_publish(const char *topic, const char *fmt, ...)
33 {
34         va_list args;
35         va_start(args, fmt);
36
37         if (1) {
38                 char m[256];
39                 int l = vsnprintf(m, sizeof(m), fmt, args);
40                 int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, true);
41                 if (err != MOSQ_ERR_SUCCESS)
42                         mqtt_error("publish", err, false);
43         }
44
45         va_end(args);
46 }
47
48 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
49 {
50         msg(L_INFO, "MQTT(%d): %s", level, message);
51 }
52
53 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
54 {
55         if (!status) {
56                 msg(L_DEBUG, "MQTT: Connection established, subscribing");
57                 int err = mosquitto_subscribe(mosq, NULL, "bsb/#", 1);
58                 if (err != MOSQ_ERR_SUCCESS) {
59                         mqtt_error("subscribe", err, false);
60                         return;
61                 }
62                 mqtt_publish("status/bsb-logger", "ok");
63         } else {
64                 msg(L_DEBUG, "MQTT: Cannot connect");
65         }
66 }
67
68 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m);
69
70 static void mqtt_connect(void)
71 {
72         int err;
73
74         mosquitto_lib_init();
75
76         mosq = mosquitto_new("bsb-logger", 1, NULL);
77         if (!mosq)
78                 die("Mosquitto: initialization failed");
79
80         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
81         mosquitto_log_callback_set(mosq, mqtt_log_callback);
82         mosquitto_message_callback_set(mosq, mqtt_msg_callback);
83
84         if (mosquitto_will_set(mosq, "status/bsb-logger", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
85                 die("Mosquitto: unable to set will");
86
87         if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS)
88                 die("Mosquitto: unable to set TLS parameters");
89
90         if (mosquitto_connect_async(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS)
91                 die("Mosquitto: connect failed");
92 }
93
94 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
95 {
96         time_t now = time(NULL);
97         char val[256];
98         if (m->payloadlen >= sizeof(val) - 1) {
99                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
100                 return;
101         }
102         memcpy(val, m->payload, m->payloadlen);
103         val[m->payloadlen] = 0;
104         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
105
106         static struct fastbuf *logfile;
107         if (!logfile)
108                 logfile = bopen_file("/var/log/bsb-frames", O_WRONLY | O_CREAT | O_APPEND, NULL);
109
110         if (!strcmp(m->topic, "bsb/frame"))
111                 bprintf(logfile, "%jd %s\n", (uintmax_t) time(NULL), val);
112
113         // FIXME: Log statistics
114
115         bflush(logfile);
116 }
117
118 /*** Main ***/
119
120 static int use_daemon;
121 static int use_debug;
122
123 static struct opt_section options = {
124         OPT_ITEMS {
125                 OPT_HELP("A daemon for logging BSB frames received via MQTT"),
126                 OPT_HELP(""),
127                 OPT_HELP("Options:"),
128                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
129                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
130                 OPT_HELP_OPTION,
131                 OPT_CONF_OPTIONS,
132                 OPT_END
133         }
134 };
135
136 int main(int argc UNUSED, char **argv)
137 {
138         log_init(argv[0]);
139         opt_parse(&options, argv+1);
140
141         if (use_daemon) {
142                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
143                 log_set_default_stream(ls);
144         }
145         if (!use_debug)
146                 log_default_stream()->levels &= ~(1U << L_DEBUG);
147
148         mqtt_connect();
149
150         int err = mosquitto_loop_forever(mosq, 10000, 1);
151         if (err != MOSQ_ERR_SUCCESS)
152                 mqtt_error("loop", err, false);
153
154         return 1;
155 }