4 * (c) 2020 Martin Mares <mj@ucw.cz>
8 #include <ucw/fastbuf.h>
20 #include <curl/curl.h>
21 #include <mosquitto.h>
25 static struct mosquitto *mosq;
26 static bool mqtt_connected;
28 static void mqtt_error(const char *operation, int err, bool teardown)
30 msg(L_ERROR, "Mosquitto: %s failed: error %d", operation, err);
33 mosquitto_destroy(mosq);
35 mqtt_connected = false;
36 } else if (err == MOSQ_ERR_NO_CONN || err == MOSQ_ERR_CONN_REFUSED || err == MOSQ_ERR_CONN_LOST) {
37 mqtt_connected = false;
41 static void mqtt_publish(const char *topic, const char *fmt, ...)
48 int l = vsnprintf(m, sizeof(m), fmt, args);
49 int err = mosquitto_publish(mosq, NULL, topic, l, m, 0, true);
50 if (err != MOSQ_ERR_SUCCESS)
51 mqtt_error("publish", err, false);
57 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
59 // msg(L_INFO, "MQTT(%d): %s", level, message);
62 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m);
64 static bool mqtt_connect(void)
72 mosq = mosquitto_new("bsb-logger", 1, NULL);
74 die("Mosquitto: initialization failed");
76 mosquitto_log_callback_set(mosq, mqtt_log_callback);
77 mosquitto_message_callback_set(mosq, mqtt_msg_callback);
79 err = mosquitto_will_set(mosq, "status/bsb-logger", 4, "dead", 0, true);
80 if (err != MOSQ_ERR_SUCCESS) {
81 mqtt_error("will_set", err, true);
85 err = mosquitto_connect(mosq, "10.32.184.5", 1883, 60);
86 if (err != MOSQ_ERR_SUCCESS) {
87 mqtt_error("connect", err, true);
91 err = mosquitto_reconnect(mosq);
92 if (err != MOSQ_ERR_SUCCESS) {
93 mqtt_error("reconnect", err, false);
98 err = mosquitto_subscribe(mosq, NULL, "bsb/#", 1);
99 if (err != MOSQ_ERR_SUCCESS) {
100 mqtt_error("subscribe", err, false);
104 mqtt_connected = true;
106 mqtt_publish("status/bsb-logger", "ok");
108 return mqtt_connected;
111 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
113 time_t now = time(NULL);
115 if (m->payloadlen >= sizeof(val) - 1) {
116 msg(L_ERROR, "Invalid value for topic %s", m->topic);
119 memcpy(val, m->payload, m->payloadlen);
120 val[m->payloadlen] = 0;
121 msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
123 static struct fastbuf *logfile;
125 logfile = bopen_file("/var/log/bsb-frames", O_WRONLY | O_CREAT | O_APPEND, NULL);
127 if (!strcmp(m->topic, "bsb/frame"))
128 bprintf(logfile, "%jd %s\n", (uintmax_t) time(NULL), val);
130 // FIXME: Log statistics
137 static int use_daemon;
138 static int use_debug;
140 static struct opt_section options = {
142 OPT_HELP("A daemon for logging BSB frames received via MQTT"),
144 OPT_HELP("Options:"),
145 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
146 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
153 int main(int argc UNUSED, char **argv)
156 opt_parse(&options, argv+1);
159 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
160 log_set_default_stream(ls);
163 log_default_stream()->levels &= ~(1U << L_DEBUG);
165 mosquitto_lib_init();
168 if (!mqtt_connect()) {
173 int err = mosquitto_loop(mosq, 1000000, 1);
174 if (err != MOSQ_ERR_SUCCESS)
175 mqtt_error("loop", err, false);