]> mj.ucw.cz Git - home-hw.git/blob - logger/burrow-bsb-logger.c
Auto: Meditation mode turned off
[home-hw.git] / logger / burrow-bsb-logger.c
1 /*
2  *      A BSB frame logger
3  *
4  *      (c) 2020 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/fastbuf.h>
9 #include <ucw/log.h>
10 #include <ucw/opt.h>
11
12 #include <fcntl.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <syslog.h>
17 #include <time.h>
18 #include <unistd.h>
19
20 #include <curl/curl.h>
21 #include <mosquitto.h>
22
23 /*** MQTT ***/
24
25 static struct mosquitto *mosq;
26 static bool mqtt_connected;
27
28 static void mqtt_error(const char *operation, int err, bool teardown)
29 {
30         msg(L_ERROR, "Mosquitto: %s failed: error %d", operation, err);
31
32         if (teardown) {
33                 mosquitto_destroy(mosq);
34                 mosq = NULL;
35                 mqtt_connected = false;
36         } else if (err == MOSQ_ERR_NO_CONN || err == MOSQ_ERR_CONN_REFUSED || err == MOSQ_ERR_CONN_LOST) {
37                 mqtt_connected = false;
38         }
39 }
40
41 static void mqtt_publish(const char *topic, const char *fmt, ...)
42 {
43         va_list args;
44         va_start(args, fmt);
45
46         if (mqtt_connected) {
47                 char m[256];
48                 int l = vsnprintf(m, sizeof(m), fmt, args);
49                 int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, true);
50                 if (err != MOSQ_ERR_SUCCESS)
51                         mqtt_error("publish", err, false);
52         }
53
54         va_end(args);
55 }
56
57 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
58 {
59         // msg(L_INFO, "MQTT(%d): %s", level, message);
60 }
61
62 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m);
63
64 static bool mqtt_connect(void)
65 {
66         int err;
67
68         if (mqtt_connected)
69                 return true;
70
71         if (!mosq) {
72                 mosq = mosquitto_new("bsb-logger", 1, NULL);
73                 if (!mosq)
74                         die("Mosquitto: initialization failed");
75
76                 mosquitto_log_callback_set(mosq, mqtt_log_callback);
77                 mosquitto_message_callback_set(mosq, mqtt_msg_callback);
78
79                 err = mosquitto_will_set(mosq, "status/bsb-logger", 4, "dead", 0, true);
80                 if (err != MOSQ_ERR_SUCCESS) {
81                         mqtt_error("will_set", err, true);
82                         return false;
83                 }
84
85                 err = mosquitto_connect(mosq, "10.32.184.5", 1883, 60);
86                 if (err != MOSQ_ERR_SUCCESS) {
87                         mqtt_error("connect", err, true);
88                         return false;
89                 }
90         } else {
91                 err = mosquitto_reconnect(mosq);
92                 if (err != MOSQ_ERR_SUCCESS) {
93                         mqtt_error("reconnect", err, false);
94                         return false;
95                 }
96         }
97
98         err = mosquitto_subscribe(mosq, NULL, "bsb/#", 1);
99         if (err != MOSQ_ERR_SUCCESS) {
100                 mqtt_error("subscribe", err, false);
101                 return false;
102         }
103
104         mqtt_connected = true;
105
106         mqtt_publish("status/bsb-logger", "ok");
107
108         return mqtt_connected;
109 }
110
111 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
112 {
113         time_t now = time(NULL);
114         char val[256];
115         if (m->payloadlen >= sizeof(val) - 1) {
116                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
117                 return;
118         }
119         memcpy(val, m->payload, m->payloadlen);
120         val[m->payloadlen] = 0;
121         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
122
123         static struct fastbuf *logfile;
124         if (!logfile)
125                 logfile = bopen_file("/var/log/bsb-frames", O_WRONLY | O_CREAT | O_APPEND, NULL);
126
127         if (!strcmp(m->topic, "bsb/frame"))
128                 bprintf(logfile, "%jd %s\n", (uintmax_t) time(NULL), val);
129
130         // FIXME: Log statistics
131
132         bflush(logfile);
133 }
134
135 /*** Main ***/
136
137 static int use_daemon;
138 static int use_debug;
139
140 static struct opt_section options = {
141         OPT_ITEMS {
142                 OPT_HELP("A daemon for logging BSB frames received via MQTT"),
143                 OPT_HELP(""),
144                 OPT_HELP("Options:"),
145                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
146                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
147                 OPT_HELP_OPTION,
148                 OPT_CONF_OPTIONS,
149                 OPT_END
150         }
151 };
152
153 int main(int argc UNUSED, char **argv)
154 {
155         log_init(argv[0]);
156         opt_parse(&options, argv+1);
157
158         if (use_daemon) {
159                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
160                 log_set_default_stream(ls);
161         }
162         if (!use_debug)
163                 log_default_stream()->levels &= ~(1U << L_DEBUG);
164
165         mosquitto_lib_init();
166
167         for (;;) {
168                 if (!mqtt_connect()) {
169                         sleep(5);
170                         continue;
171                 }
172
173                 int err = mosquitto_loop(mosq, 1000000, 1);
174                 if (err != MOSQ_ERR_SUCCESS)
175                         mqtt_error("loop", err, false);
176         }
177 }