]> mj.ucw.cz Git - home-hw.git/blob - prometheus/burrow-prometheus.c
8f0f560c235b130b1e1d230ef6ab27e1bca68f99
[home-hw.git] / prometheus / burrow-prometheus.c
1 /*
2  *      A gateway between MQTT and Prometheus
3  *
4  *      (c) 2018 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/log.h>
9 #include <ucw/mainloop.h>
10 #include <ucw/opt.h>
11
12 #include <stdarg.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <syslog.h>
17 #include <time.h>
18
19 #include <mosquitto.h>
20
21 static struct mosquitto *mosq;
22
23 struct attr {
24         const char *pm;
25         const char *topic;
26 };
27
28 static const struct attr attr_table[] = {
29         { "# HELP loft_temp Temperature in the loft", NULL },
30         { "# TYPE loft_temp gauge", NULL },
31         { "loft_temp", "burrow/loft/temperature" },
32         { "# HELP loft_fan Fan speed in the loft", NULL },
33         { "# TYPE loft_fan gauge", NULL },
34         { "loft_fan", "burrow/loft/fan" },
35         { NULL, NULL },
36 };
37
38 static char *attr_values[ARRAY_SIZE(attr_table)];
39
40 static void mqtt_publish(const char *topic, const char *fmt, ...)
41 {
42         va_list args;
43         va_start(args, fmt);
44         char m[256];
45         int l = vsnprintf(m, sizeof(m), fmt, args);
46         if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
47                 msg(L_ERROR, "Mosquitto: publish failed");
48         va_end(args);
49 }
50
51 static void mqtt_setup(void)
52 {
53         if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
54                 die("Mosquitto: subscribe failed");
55
56         // mqtt_publish("burrow/loft/status", "ok");
57 }
58
59 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
60 {
61         // msg(L_INFO, "MQTT(%d): %s", level, message);
62 }
63
64 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
65 {
66         char val[256];
67         if (m->payloadlen >= sizeof(val) - 1) {
68                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
69                 return;
70         }
71         memcpy(val, m->payload, m->payloadlen);
72         val[m->payloadlen] = 0;
73         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
74
75         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
76                 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
77                         if (attr_values[i]) {
78                                 xfree(attr_values[i]);
79                                 attr_values[i] = NULL;
80                         }
81                         if (val[0])
82                                 attr_values[i] = xstrdup(val);
83                 }
84         }
85 }
86
87 static struct main_file mqtt_file;
88 static struct main_timer mqtt_timer;
89 #define MQTT_TIMER_FREQ 5000
90
91 static void mqtt_recalc_main(void)
92 {
93         int fd = mosquitto_socket(mosq);
94         if (fd != mqtt_file.fd) {
95                 msg(L_DEBUG, "MQTT: Changing socket fd to %d", fd);
96                 if (fd < 0) {
97                         file_del(&mqtt_file);
98                         mqtt_file.fd = -1;
99                 } else {
100                         mqtt_file.fd = fd;
101                         file_add(&mqtt_file);
102                 }
103         }
104 }
105
106 static void mqtt_main_timer(struct main_timer *tm)
107 {
108         msg(L_DEBUG, "MQTT: Timer handler");
109         mosquitto_loop_misc(mosq);
110         mqtt_recalc_main();
111         timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ);
112 }
113
114 static int mqtt_main_read(struct main_file *mf UNUSED)
115 {
116         msg(L_DEBUG, "MQTT: Read handler");
117         mosquitto_loop_read(mosq, 1);
118         mqtt_recalc_main();
119         return HOOK_IDLE;
120 }
121
122 static int mqtt_main_write(struct main_file *mf UNUSED)
123 {
124         msg(L_DEBUG, "MQTT: Write handler");
125         mosquitto_loop_write(mosq, 1);
126         mqtt_recalc_main();
127         return HOOK_IDLE;
128 }
129
130 static void mqtt_main_init(void)
131 {
132         mqtt_file.fd = -1;
133         mqtt_file.read_handler = mqtt_main_read;
134         mqtt_file.write_handler = mqtt_main_write;
135
136         mqtt_timer.handler = mqtt_main_timer;
137         timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ);
138
139         mqtt_recalc_main();
140 }
141
142 static int use_daemon;
143 static int use_debug;
144
145 static struct opt_section options = {
146         OPT_ITEMS {
147                 OPT_HELP("A daemon for controlling the solid state relay module via MQTT"),
148                 OPT_HELP(""),
149                 OPT_HELP("Options:"),
150                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
151                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
152                 OPT_HELP_OPTION,
153                 OPT_CONF_OPTIONS,
154                 OPT_END
155         }
156 };
157
158 int main(int argc UNUSED, char **argv)
159 {
160         log_init(argv[0]);
161         opt_parse(&options, argv+1);
162         main_init();
163
164         if (use_daemon) {
165                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
166                 log_set_default_stream(ls);
167         }
168         if (!use_debug)
169                 log_default_stream()->levels &= ~(1U << L_DEBUG);
170
171         mosquitto_lib_init();
172         mosq = mosquitto_new("prometheus", 1, NULL);
173         if (!mosq)
174                 die("Mosquitto: initialization failed");
175
176         mosquitto_log_callback_set(mosq, mqtt_log_callback);
177         mosquitto_message_callback_set(mosq, mqtt_msg_callback);
178         mqtt_main_init();
179
180 #if 0
181         // FIXME: Publish online/offline status
182         if (mosquitto_will_set(mosq, "burrow/loft/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
183                 die("Mosquitto: unable to set will");
184 #endif
185
186         if (mosquitto_connect(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
187                 die("Mosquitto: connect failed");
188
189         mqtt_setup();
190
191         main_loop();
192 #if 0
193         for (;;) {
194                 int err = mosquitto_loop(mosq, 5000, 1);
195                 if (err == MOSQ_ERR_NO_CONN) {
196                         err = mosquitto_reconnect(mosq);
197                         if (err == MOSQ_ERR_SUCCESS)
198                                 mqtt_setup();
199                         else
200                                 msg(L_ERROR, "Mosquitto: cannot reconnect, error %d", err);
201                 } else if (err != MOSQ_ERR_SUCCESS)
202                         msg(L_ERROR, "Mosquitto: loop returned error %d", err);
203         }
204 #endif
205 }