]> mj.ucw.cz Git - home-hw.git/blob - prometheus/burrow-prometheus.c
1c6826d080aaa1adb82ccff0ac401be6e0b8a9cf
[home-hw.git] / prometheus / burrow-prometheus.c
1 /*
2  *      A gateway between MQTT and Prometheus
3  *
4  *      (c) 2018 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/fastbuf.h>
9 #include <ucw/log.h>
10 #include <ucw/mempool.h>
11 #include <ucw/opt.h>
12 #include <ucw/string.h>
13
14 #include <netinet/in.h>
15 #include <pthread.h>
16 #include <stdarg.h>
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <sys/poll.h>
21 #include <sys/socket.h>
22 #include <syslog.h>
23 #include <time.h>
24 #include <unistd.h>
25
26 #include <mosquitto.h>
27
28 #define MEASUREMENT_TIMEOUT 120
29
30 static struct mosquitto *mosq;
31
32 struct attr {
33         const char *metric;
34         const char *help;
35         const char *type;
36         const char *topic;
37 };
38
39 static const struct attr attr_table[] = {
40         {
41                 .metric = "temp_loft",
42                 .help = "Temperature in the loft [degC]",
43                 .type = "gauge",
44                 .topic = "burrow/temp/loft",
45         },
46         {
47                 .metric = "loft_fan",
48                 .help = "Fan speed in the loft (0-3)",
49                 .type = "gauge",
50                 .topic = "burrow/loft/fan"
51         },
52         {
53                 .metric = "loft_circ",
54                 .help = "Warm water circulation (0-1)",
55                 .type = "gauge",
56                 .topic = "burrow/loft/circulation"
57         },
58         {
59                 .metric = "temp_ursarium",
60                 .help = "Temperature in the Ursarium [degC]",
61                 .type = "gauge",
62                 .topic = "burrow/temp/ursarium"
63         },
64         {
65                 .metric = "temp_catarium",
66                 .help = "Temperature in the Catarium [degC]",
67                 .type = "gauge",
68                 .topic = "burrow/temp/catarium"
69         },
70         {
71                 .metric = "temp_garage",
72                 .help = "Temperature in the garage [degC]",
73                 .type = "gauge",
74                 .topic = "burrow/temp/garage"
75         },
76         {
77                 .metric = "temp_machinarium",
78                 .help = "Temperature in the Machinarium [degC]",
79                 .type = "gauge",
80                 .topic = "burrow/temp/machinarium"
81         },
82         {
83                 .metric = "rh_garage",
84                 .help = "Relative humidity in the garage [%]",
85                 .type = "gauge",
86                 .topic = "burrow/temp/garage-rh"
87         },
88 };
89
90 static char *attr_values[ARRAY_SIZE(attr_table)];
91 static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
92
93 static void mqtt_publish(const char *topic, const char *fmt, ...)
94 {
95         va_list args;
96         va_start(args, fmt);
97         char m[256];
98         int l = vsnprintf(m, sizeof(m), fmt, args);
99         if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
100                 msg(L_ERROR, "Mosquitto: publish failed");
101         va_end(args);
102 }
103
104 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
105 {
106         if (!status) {
107                 msg(L_DEBUG, "MQTT: Connection established, subscribing");
108                 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
109                         die("Mosquitto: subscribe failed");
110
111                 mqtt_publish("status/prometheus", "ok");
112         }
113 }
114
115 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
116 {
117         // msg(L_INFO, "MQTT(%d): %s", level, message);
118 }
119
120 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
121 {
122         char val[256];
123         if (m->payloadlen >= sizeof(val) - 1) {
124                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
125                 return;
126         }
127         memcpy(val, m->payload, m->payloadlen);
128         val[m->payloadlen] = 0;
129         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
130
131         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
132                 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
133                         pthread_mutex_lock(&attr_mutex);
134                         if (attr_values[i]) {
135                                 xfree(attr_values[i]);
136                                 attr_values[i] = NULL;
137                         }
138                         if (val[0])
139                                 attr_values[i] = xstrdup(val);
140                         pthread_mutex_unlock(&attr_mutex);
141                 }
142         }
143 }
144
145 struct http {
146         struct mempool *mp;
147         int sk;
148         char *iobuf;
149         uint iobuf_pos, iobuf_max;
150         char *linebuf;
151         struct fbpool fbpool;
152 };
153
154 #define IO_TIMEOUT 60000        // in ms
155 #define IOBUF_SIZE 1024
156 #define LINEBUF_SIZE 1024
157
158 static void http_send(struct http *http)
159 {
160         byte *buf = fbpool_end(&http->fbpool);
161         uint len = mp_size(http->mp, buf);
162
163         while (len) {
164                 struct pollfd pfd = { .fd = http->sk, .events = POLLOUT, .revents = 0 };
165                 int res = poll(&pfd, 1, IO_TIMEOUT);
166                 if (res < 0)
167                         die("Poll failed: %m");
168                 if (!res) {
169                         msg(L_ERROR, "HTTP write timed out");
170                         return;
171                 }
172
173                 res = write(http->sk, buf, len);
174                 if (res < 0) {
175                         msg(L_ERROR, "HTTP write failed: %m");
176                         return;
177                 }
178                 buf += res;
179                 len -= res;
180         }
181 }
182
183 static void http_error(struct http *http, const char *err)
184 {
185         msg(L_INFO, "HTTP error: %s", err);
186         fbpool_start(&http->fbpool, http->mp, 0);
187         bprintf(&http->fbpool.fb, "HTTP/1.1 %s\r\n", err);
188         http_send(http);
189 }
190
191 static int http_get_char(struct http *http) {
192         if (http->iobuf_pos >= http->iobuf_max) {
193                 struct pollfd pfd = { .fd = http->sk, .events = POLLIN, .revents = 0 };
194                 int res = poll(&pfd, 1, IO_TIMEOUT);
195                 if (res < 0)
196                         die("Poll failed: %m");
197                 if (!res) {
198                         msg(L_ERROR, "HTTP read timed out");
199                         return -1;
200                 }
201                 int len = read(http->sk, http->iobuf, IOBUF_SIZE);
202                 if (len < 0) {
203                         msg(L_ERROR, "HTTP read error: %m");
204                         return -1;
205                 }
206                 if (!len) {
207                         msg(L_ERROR, "HTTP connection closed");
208                         return -1;
209                 }
210                 http->iobuf_pos = 0;
211                 http->iobuf_max = len;
212         }
213         return http->iobuf[http->iobuf_pos++];
214 }
215
216 static bool http_get_line(struct http *http)
217 {
218         uint i = 0;
219         for (;;) {
220                 int c = http_get_char(http);
221                 if (c < 0)
222                         return false;
223                 if (c == '\n') {
224                         if (i > 0 && http->linebuf[i-1] == '\r')
225                                 i--;
226                         http->linebuf[i] = 0;
227                         return true;
228                 }
229                 if (i >= IOBUF_SIZE-1) {
230                         http_error(http, "400 Line too long");
231                         return false;
232                 }
233                 http->linebuf[i++] = c;
234         }
235 }
236
237 static void http_answer(struct fastbuf *fb)
238 {
239         char val[256], *w[2];
240         time_t now = time(NULL);
241
242         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
243                 const struct attr *a = &attr_table[i];
244
245                 pthread_mutex_lock(&attr_mutex);
246                 snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
247                 pthread_mutex_unlock(&attr_mutex);
248
249                 if (!val[0])
250                         continue;
251                 int fields = str_wordsplit(val, w, ARRAY_SIZE(w));
252                 if (fields < 1)
253                         continue;
254                 if (fields >= 2) {
255                         time_t t = atoll(w[1]);
256                         if (t < now - MEASUREMENT_TIMEOUT)
257                                 continue;
258                 }
259
260                 if (a->help)
261                         bprintf(fb, "# HELP %s %s\n", a->metric, a->help);
262                 if (a->type)
263                         bprintf(fb, "# TYPE %s %s\n", a->metric, a->type);
264                 bprintf(fb, "%s %s", a->metric, val);
265                 if (fields >= 2)
266                         bprintf(fb, " %s", w[1]);
267                 bputc(fb, '\n');
268         }
269 }
270
271 static void http_connection(struct http *http)
272 {
273         http->iobuf = mp_alloc(http->mp, IOBUF_SIZE);
274         http->linebuf = mp_alloc(http->mp, LINEBUF_SIZE);
275         fbpool_init(&http->fbpool);
276
277         if (!http_get_line(http))
278                 return;
279         // msg(L_DEBUG, "Request line: <%s>", http->linebuf);
280         char *words[3];
281         if (str_wordsplit(http->linebuf, words, 3) != 3) {
282                 http_error(http, "400 Invalid request line");
283                 return;
284         }
285         if (strcmp(words[0], "GET")) {
286                 http_error(http, "501 Method not implemented");
287                 return;
288         }
289
290         for (;;) {
291                 if (!http_get_line(http))
292                         return;
293                 if (!http->linebuf[0])
294                         break;
295                 // msg(L_DEBUG, "Header line: <%s>", http->linebuf);
296         }
297
298         fbpool_start(&http->fbpool, http->mp, 0);
299         struct fastbuf *fb = &http->fbpool.fb;
300         bprintf(fb, "HTTP/1.1 200 OK\r\n");
301         bprintf(fb, "Content-type: text/plain; version=0.0.4\r\n");
302         bprintf(fb, "\r\n");
303         http_answer(fb);
304         http_send(http);
305 }
306
307 static int use_daemon;
308 static int use_debug;
309
310 static struct opt_section options = {
311         OPT_ITEMS {
312                 OPT_HELP("A daemon for controlling the solid state relay module via MQTT"),
313                 OPT_HELP(""),
314                 OPT_HELP("Options:"),
315                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
316                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
317                 OPT_HELP_OPTION,
318                 OPT_CONF_OPTIONS,
319                 OPT_END
320         }
321 };
322
323 int main(int argc UNUSED, char **argv)
324 {
325         log_init(argv[0]);
326         opt_parse(&options, argv+1);
327
328         if (use_daemon) {
329                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
330                 log_set_default_stream(ls);
331         }
332         if (!use_debug)
333                 log_default_stream()->levels &= ~(1U << L_DEBUG);
334
335         mosquitto_lib_init();
336         mosq = mosquitto_new("prometheus", 1, NULL);
337         if (!mosq)
338                 die("Mosquitto: initialization failed");
339
340         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
341         mosquitto_log_callback_set(mosq, mqtt_log_callback);
342         mosquitto_message_callback_set(mosq, mqtt_msg_callback);
343
344         if (mosquitto_will_set(mosq, "status/prometheus", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
345                 die("Mosquitto: unable to set will");
346
347         if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
348                 die("Mosquitto: connect failed");
349
350         if (mosquitto_loop_start(mosq))
351                 die("Mosquitto: cannot start service thread");
352
353         int listen_sk = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
354         if (listen_sk < 0)
355                 die("Cannot create listening socket: %m");
356
357         int one = 1;
358         if (setsockopt(listen_sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
359                 die("Cannot set SO_REUSEADDR: %m");
360
361         struct sockaddr_in sin = { .sin_family = AF_INET, .sin_port = htons(1422), .sin_addr = INADDR_ANY };
362         if (bind(listen_sk, (const struct sockaddr *) &sin, sizeof(sin)) < 0)
363                 die("Cannot bind listening socket: %m");
364
365         if (listen(listen_sk, 64) < 0)
366                 die("Cannot listen: %m");
367
368         for (;;) {
369                 int sk = accept(listen_sk, NULL, NULL);
370                 if (sk < 0) {
371                         msg(L_ERROR, "HTTP accept failed: %m");
372                         continue;
373                 }
374                 msg(L_DEBUG, "HTTP accepted connection");
375                 struct mempool *mp = mp_new(4096);
376                 struct http *http = mp_alloc_zero(mp, sizeof(*http));
377                 http->mp = mp;
378                 http->sk = sk;
379                 http_connection(http);
380                 mp_delete(mp);
381                 close(sk);
382         }
383 }