2 * A gateway between MQTT and Prometheus
4 * (c) 2018 Martin Mares <mj@ucw.cz>
9 #include <ucw/mainloop.h>
19 #include <mosquitto.h>
21 static struct mosquitto *mosq;
28 static const struct attr attr_table[] = {
29 { "# HELP loft_temp Temperature in the loft", NULL },
30 { "# TYPE loft_temp gauge", NULL },
31 { "loft_temp", "burrow/loft/temperature" },
32 { "# HELP loft_fan Fan speed in the loft", NULL },
33 { "# TYPE loft_fan gauge", NULL },
34 { "loft_fan", "burrow/loft/fan" },
38 static char *attr_values[ARRAY_SIZE(attr_table)];
40 static void mqtt_publish(const char *topic, const char *fmt, ...)
45 int l = vsnprintf(m, sizeof(m), fmt, args);
46 if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
47 msg(L_ERROR, "Mosquitto: publish failed");
51 static void mqtt_setup(void)
53 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
54 die("Mosquitto: subscribe failed");
56 // mqtt_publish("burrow/loft/status", "ok");
59 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
61 // msg(L_INFO, "MQTT(%d): %s", level, message);
64 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
67 if (m->payloadlen >= sizeof(val) - 1) {
68 msg(L_ERROR, "Invalid value for topic %s", m->topic);
71 memcpy(val, m->payload, m->payloadlen);
72 val[m->payloadlen] = 0;
73 msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
75 for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
76 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
78 xfree(attr_values[i]);
79 attr_values[i] = NULL;
82 attr_values[i] = xstrdup(val);
87 static struct main_file mqtt_file;
88 static struct main_timer mqtt_timer;
89 #define MQTT_TIMER_FREQ 5000
91 static void mqtt_recalc_main(void)
93 int fd = mosquitto_socket(mosq);
94 if (fd != mqtt_file.fd) {
95 msg(L_DEBUG, "MQTT: Changing socket fd to %d", fd);
101 file_add(&mqtt_file);
106 static void mqtt_main_timer(struct main_timer *tm)
108 msg(L_DEBUG, "MQTT: Timer handler");
109 mosquitto_loop_misc(mosq);
111 timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ);
114 static int mqtt_main_read(struct main_file *mf UNUSED)
116 msg(L_DEBUG, "MQTT: Read handler");
117 mosquitto_loop_read(mosq, 1);
122 static int mqtt_main_write(struct main_file *mf UNUSED)
124 msg(L_DEBUG, "MQTT: Write handler");
125 mosquitto_loop_write(mosq, 1);
130 static void mqtt_main_init(void)
133 mqtt_file.read_handler = mqtt_main_read;
134 mqtt_file.write_handler = mqtt_main_write;
136 mqtt_timer.handler = mqtt_main_timer;
137 timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ);
142 static int use_daemon;
143 static int use_debug;
145 static struct opt_section options = {
147 OPT_HELP("A daemon for controlling the solid state relay module via MQTT"),
149 OPT_HELP("Options:"),
150 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
151 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
158 int main(int argc UNUSED, char **argv)
161 opt_parse(&options, argv+1);
165 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
166 log_set_default_stream(ls);
169 log_default_stream()->levels &= ~(1U << L_DEBUG);
171 mosquitto_lib_init();
172 mosq = mosquitto_new("prometheus", 1, NULL);
174 die("Mosquitto: initialization failed");
176 mosquitto_log_callback_set(mosq, mqtt_log_callback);
177 mosquitto_message_callback_set(mosq, mqtt_msg_callback);
181 // FIXME: Publish online/offline status
182 if (mosquitto_will_set(mosq, "burrow/loft/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
183 die("Mosquitto: unable to set will");
186 if (mosquitto_connect(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
187 die("Mosquitto: connect failed");
194 int err = mosquitto_loop(mosq, 5000, 1);
195 if (err == MOSQ_ERR_NO_CONN) {
196 err = mosquitto_reconnect(mosq);
197 if (err == MOSQ_ERR_SUCCESS)
200 msg(L_ERROR, "Mosquitto: cannot reconnect, error %d", err);
201 } else if (err != MOSQ_ERR_SUCCESS)
202 msg(L_ERROR, "Mosquitto: loop returned error %d", err);