]> mj.ucw.cz Git - home-hw.git/blob - prometheus/burrow-prometheus.c
fddc806c5d5caf5fcadf45adaa75402c5e4957bf
[home-hw.git] / prometheus / burrow-prometheus.c
1 /*
2  *      A gateway between MQTT and Prometheus
3  *
4  *      (c) 2018 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/fastbuf.h>
9 #include <ucw/log.h>
10 #include <ucw/mempool.h>
11 #include <ucw/opt.h>
12 #include <ucw/string.h>
13
14 #include <netinet/in.h>
15 #include <pthread.h>
16 #include <stdarg.h>
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <sys/poll.h>
21 #include <sys/socket.h>
22 #include <syslog.h>
23 #include <time.h>
24 #include <unistd.h>
25
26 #include <mosquitto.h>
27
28 static struct mosquitto *mosq;
29
30 struct attr {
31         const char *metric;
32         const char *help;
33         const char *type;
34         const char *topic;
35 };
36
37 static const struct attr attr_table[] = {
38         {
39                 .metric = "temp_loft",
40                 .help = "Temperature in the loft [degC]",
41                 .type = "gauge",
42                 .topic = "burrow/loft/temperature"
43         },
44         {
45                 .metric = "loft_fan",
46                 .help = "Fan speed in the loft (0-3)",
47                 .type = "gauge",
48                 .topic = "burrow/loft/fan"
49         },
50         {
51                 .metric = "loft_circ",
52                 .help = "Warm water circulation (0-1)",
53                 .type = "gauge",
54                 .topic = "burrow/loft/circulation"
55         },
56         {
57                 .metric = "temp_ursarium",
58                 .help = "Temperature in the Ursarium [degC]",
59                 .type = "gauge",
60                 .topic = "burrow/arexxd/ursarium"
61         },
62         {
63                 .metric = "temp_catarium",
64                 .help = "Temperature in the Catarium [degC]",
65                 .type = "gauge",
66                 .topic = "burrow/arexxd/catarium"
67         },
68         {
69                 .metric = "temp_garage",
70                 .help = "Temperature in the garage [degC]",
71                 .type = "gauge",
72                 .topic = "burrow/arexxd/garage"
73         },
74         {
75                 .metric = "temp_machinarium",
76                 .help = "Temperature in the Machinarium [degC]",
77                 .type = "gauge",
78                 .topic = "burrow/arexxd/machinarium"
79         },
80         {
81                 .metric = "rh_garage",
82                 .help = "Relative humidity in the garage [%]",
83                 .type = "gauge",
84                 .topic = "burrow/arexxd/garage-rh"
85         },
86 };
87
88 static char *attr_values[ARRAY_SIZE(attr_table)];
89 static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
90
91 static void mqtt_publish(const char *topic, const char *fmt, ...)
92 {
93         va_list args;
94         va_start(args, fmt);
95         char m[256];
96         int l = vsnprintf(m, sizeof(m), fmt, args);
97         if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
98                 msg(L_ERROR, "Mosquitto: publish failed");
99         va_end(args);
100 }
101
102 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
103 {
104         if (!status) {
105                 msg(L_DEBUG, "MQTT: Connection established, subscribing");
106                 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
107                         die("Mosquitto: subscribe failed");
108
109                 // mqtt_publish("burrow/loft/status", "ok");
110         }
111 }
112
113 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
114 {
115         // msg(L_INFO, "MQTT(%d): %s", level, message);
116 }
117
118 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
119 {
120         char val[256];
121         if (m->payloadlen >= sizeof(val) - 1) {
122                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
123                 return;
124         }
125         memcpy(val, m->payload, m->payloadlen);
126         val[m->payloadlen] = 0;
127         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
128
129         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
130                 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
131                         pthread_mutex_lock(&attr_mutex);
132                         if (attr_values[i]) {
133                                 xfree(attr_values[i]);
134                                 attr_values[i] = NULL;
135                         }
136                         if (val[0])
137                                 attr_values[i] = xstrdup(val);
138                         pthread_mutex_unlock(&attr_mutex);
139                 }
140         }
141 }
142
143 struct http {
144         struct mempool *mp;
145         int sk;
146         char *iobuf;
147         uint iobuf_pos, iobuf_max;
148         char *linebuf;
149         struct fbpool fbpool;
150 };
151
152 #define IO_TIMEOUT 60000        // in ms
153 #define IOBUF_SIZE 1024
154 #define LINEBUF_SIZE 1024
155
156 static void http_send(struct http *http)
157 {
158         byte *buf = fbpool_end(&http->fbpool);
159         uint len = mp_size(http->mp, buf);
160
161         while (len) {
162                 struct pollfd pfd = { .fd = http->sk, .events = POLLOUT, .revents = 0 };
163                 int res = poll(&pfd, 1, IO_TIMEOUT);
164                 if (res < 0)
165                         die("Poll failed: %m");
166                 if (!res) {
167                         msg(L_ERROR, "HTTP write timed out");
168                         return;
169                 }
170
171                 res = write(http->sk, buf, len);
172                 if (res < 0) {
173                         msg(L_ERROR, "HTTP write failed: %m");
174                         return;
175                 }
176                 buf += res;
177                 len -= res;
178         }
179 }
180
181 static void http_error(struct http *http, const char *err)
182 {
183         msg(L_INFO, "HTTP error: %s", err);
184         fbpool_start(&http->fbpool, http->mp, 0);
185         bprintf(&http->fbpool.fb, "HTTP/1.1 %s\r\n", err);
186         http_send(http);
187 }
188
189 static int http_get_char(struct http *http) {
190         if (http->iobuf_pos >= http->iobuf_max) {
191                 struct pollfd pfd = { .fd = http->sk, .events = POLLIN, .revents = 0 };
192                 int res = poll(&pfd, 1, IO_TIMEOUT);
193                 if (res < 0)
194                         die("Poll failed: %m");
195                 if (!res) {
196                         msg(L_ERROR, "HTTP read timed out");
197                         return -1;
198                 }
199                 int len = read(http->sk, http->iobuf, IOBUF_SIZE);
200                 if (len < 0) {
201                         msg(L_ERROR, "HTTP read error: %m");
202                         return -1;
203                 }
204                 if (!len) {
205                         msg(L_ERROR, "HTTP connection closed");
206                         return -1;
207                 }
208                 http->iobuf_pos = 0;
209                 http->iobuf_max = len;
210         }
211         return http->iobuf[http->iobuf_pos++];
212 }
213
214 static bool http_get_line(struct http *http)
215 {
216         uint i = 0;
217         for (;;) {
218                 int c = http_get_char(http);
219                 if (c < 0)
220                         return false;
221                 if (c == '\n') {
222                         if (i > 0 && http->linebuf[i-1] == '\r')
223                                 i--;
224                         http->linebuf[i] = 0;
225                         return true;
226                 }
227                 if (i >= IOBUF_SIZE-1) {
228                         http_error(http, "400 Line too long");
229                         return false;
230                 }
231                 http->linebuf[i++] = c;
232         }
233 }
234
235 static void http_answer(struct fastbuf *fb)
236 {
237         char val[256];
238
239         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
240                 pthread_mutex_lock(&attr_mutex);
241                 snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
242                 pthread_mutex_unlock(&attr_mutex);
243                 if (val[0]) {
244                         const struct attr *a = &attr_table[i];
245                         if (a->help)
246                                 bprintf(fb, "# HELP %s %s\n", a->metric, a->help);
247                         if (a->type)
248                                 bprintf(fb, "# TYPE %s %s\n", a->metric, a->type);
249                         // FIXME: Add timestamp if known
250                         bprintf(fb, "%s %s\n", a->metric, val);
251                 }
252         }
253 }
254
255 static void http_connection(struct http *http)
256 {
257         http->iobuf = mp_alloc(http->mp, IOBUF_SIZE);
258         http->linebuf = mp_alloc(http->mp, LINEBUF_SIZE);
259         fbpool_init(&http->fbpool);
260
261         if (!http_get_line(http))
262                 return;
263         // msg(L_DEBUG, "Request line: <%s>", http->linebuf);
264         char *words[3];
265         if (str_wordsplit(http->linebuf, words, 3) != 3) {
266                 http_error(http, "400 Invalid request line");
267                 return;
268         }
269         if (strcmp(words[0], "GET")) {
270                 http_error(http, "501 Method not implemented");
271                 return;
272         }
273
274         for (;;) {
275                 if (!http_get_line(http))
276                         return;
277                 if (!http->linebuf[0])
278                         break;
279                 // msg(L_DEBUG, "Header line: <%s>", http->linebuf);
280         }
281
282         fbpool_start(&http->fbpool, http->mp, 0);
283         struct fastbuf *fb = &http->fbpool.fb;
284         bprintf(fb, "HTTP/1.1 200 OK\r\n");
285         bprintf(fb, "Content-type: text/plain; version=0.0.4\r\n");
286         bprintf(fb, "\r\n");
287         http_answer(fb);
288         http_send(http);
289 }
290
291 static int use_daemon;
292 static int use_debug;
293
294 static struct opt_section options = {
295         OPT_ITEMS {
296                 OPT_HELP("A daemon for controlling the solid state relay module via MQTT"),
297                 OPT_HELP(""),
298                 OPT_HELP("Options:"),
299                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
300                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
301                 OPT_HELP_OPTION,
302                 OPT_CONF_OPTIONS,
303                 OPT_END
304         }
305 };
306
307 int main(int argc UNUSED, char **argv)
308 {
309         log_init(argv[0]);
310         opt_parse(&options, argv+1);
311
312         if (use_daemon) {
313                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
314                 log_set_default_stream(ls);
315         }
316         if (!use_debug)
317                 log_default_stream()->levels &= ~(1U << L_DEBUG);
318
319         mosquitto_lib_init();
320         mosq = mosquitto_new("prometheus", 1, NULL);
321         if (!mosq)
322                 die("Mosquitto: initialization failed");
323
324         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
325         mosquitto_log_callback_set(mosq, mqtt_log_callback);
326         mosquitto_message_callback_set(mosq, mqtt_msg_callback);
327
328 #if 0
329         // FIXME: Publish online/offline status
330         if (mosquitto_will_set(mosq, "burrow/loft/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
331                 die("Mosquitto: unable to set will");
332 #endif
333
334         if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
335                 die("Mosquitto: connect failed");
336
337         if (mosquitto_loop_start(mosq))
338                 die("Mosquitto: cannot start service thread");
339
340         int listen_sk = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
341         if (listen_sk < 0)
342                 die("Cannot create listening socket: %m");
343
344         int one = 1;
345         if (setsockopt(listen_sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
346                 die("Cannot set SO_REUSEADDR: %m");
347
348         struct sockaddr_in sin = { .sin_family = AF_INET, .sin_port = htons(1422), .sin_addr = INADDR_ANY };
349         if (bind(listen_sk, (const struct sockaddr *) &sin, sizeof(sin)) < 0)
350                 die("Cannot bind listening socket: %m");
351
352         if (listen(listen_sk, 64) < 0)
353                 die("Cannot listen: %m");
354
355         for (;;) {
356                 int sk = accept(listen_sk, NULL, NULL);
357                 if (sk < 0) {
358                         msg(L_ERROR, "HTTP accept failed: %m");
359                         continue;
360                 }
361                 msg(L_DEBUG, "HTTP accepted connection");
362                 struct mempool *mp = mp_new(4096);
363                 struct http *http = mp_alloc_zero(mp, sizeof(*http));
364                 http->mp = mp;
365                 http->sk = sk;
366                 http_connection(http);
367                 mp_delete(mp);
368                 close(sk);
369         }
370 }