]> mj.ucw.cz Git - home-hw.git/blob - prometheus/burrow-prometheus.c
9cab56eaa192e5f7058dbed85ed88c964d13d8c9
[home-hw.git] / prometheus / burrow-prometheus.c
1 /*
2  *      A gateway between MQTT and Prometheus
3  *
4  *      (c) 2018 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/fastbuf.h>
9 #include <ucw/log.h>
10 #include <ucw/mempool.h>
11 #include <ucw/opt.h>
12 #include <ucw/string.h>
13
14 #include <netinet/in.h>
15 #include <pthread.h>
16 #include <stdarg.h>
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <string.h>
20 #include <sys/poll.h>
21 #include <sys/socket.h>
22 #include <syslog.h>
23 #include <time.h>
24 #include <unistd.h>
25
26 #include <mosquitto.h>
27
28 static struct mosquitto *mosq;
29
30 struct attr {
31         const char *pm;
32         const char *topic;
33 };
34
35 static const struct attr attr_table[] = {
36         { "# HELP temp_loft Temperature in the loft [degC]", NULL },
37         { "# TYPE temp_loft gauge", NULL },
38         { "temp_loft", "burrow/loft/temperature" },
39         { "# HELP loft_fan Fan speed in the loft", NULL },
40         { "# TYPE loft_fan gauge", NULL },
41         { "loft_fan", "burrow/loft/fan" },
42         { "# HELP temp_ursarium Temperature in the Ursarium [degC]", NULL },
43         { "# TYPE temp_ursarium gauge", NULL },
44         { "temp_ursarium", "burrow/arexxd/ursarium" },
45         { "# HELP temp_catarium Temperature in the Catarium [degC]", NULL },
46         { "# TYPE temp_catarium gauge", NULL },
47         { "temp_catarium", "burrow/arexxd/catarium" },
48         { "# HELP temp_machinarium Temperature in the Machinarium [degC]", NULL },
49         { "# TYPE temp_machinarium gauge", NULL },
50         { "temp_machinarium", "burrow/arexxd/machinarium" },
51         { "# HELP temp_garage Temperature in the garage [degC]", NULL },
52         { "# TYPE temp_garage gauge", NULL },
53         { "temp_garage", "burrow/arexxd/garage" },
54         { "# HELP rh_garage Relative humidity in the garage [%]", NULL },
55         { "# TYPE rh_garage gauge", NULL },
56         { "rh_garage", "burrow/arexxd/garage-rh" },
57 };
58
59 static char *attr_values[ARRAY_SIZE(attr_table)];
60 static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
61
62 static void mqtt_publish(const char *topic, const char *fmt, ...)
63 {
64         va_list args;
65         va_start(args, fmt);
66         char m[256];
67         int l = vsnprintf(m, sizeof(m), fmt, args);
68         if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
69                 msg(L_ERROR, "Mosquitto: publish failed");
70         va_end(args);
71 }
72
73 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
74 {
75         if (!status) {
76                 msg(L_DEBUG, "MQTT: Connection established, subscribing");
77                 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
78                         die("Mosquitto: subscribe failed");
79
80                 // mqtt_publish("burrow/loft/status", "ok");
81         }
82 }
83
84 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
85 {
86         // msg(L_INFO, "MQTT(%d): %s", level, message);
87 }
88
89 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
90 {
91         char val[256];
92         if (m->payloadlen >= sizeof(val) - 1) {
93                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
94                 return;
95         }
96         memcpy(val, m->payload, m->payloadlen);
97         val[m->payloadlen] = 0;
98         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
99
100         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
101                 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
102                         pthread_mutex_lock(&attr_mutex);
103                         if (attr_values[i]) {
104                                 xfree(attr_values[i]);
105                                 attr_values[i] = NULL;
106                         }
107                         if (val[0])
108                                 attr_values[i] = xstrdup(val);
109                         pthread_mutex_unlock(&attr_mutex);
110                 }
111         }
112 }
113
114 struct http {
115         struct mempool *mp;
116         int sk;
117         char *iobuf;
118         uint iobuf_pos, iobuf_max;
119         char *linebuf;
120         struct fbpool fbpool;
121 };
122
123 #define IO_TIMEOUT 60000        // in ms
124 #define IOBUF_SIZE 1024
125 #define LINEBUF_SIZE 1024
126
127 static void http_send(struct http *http)
128 {
129         byte *buf = fbpool_end(&http->fbpool);
130         uint len = mp_size(http->mp, buf);
131
132         while (len) {
133                 struct pollfd pfd = { .fd = http->sk, .events = POLLOUT, .revents = 0 };
134                 int res = poll(&pfd, 1, IO_TIMEOUT);
135                 if (res < 0)
136                         die("Poll failed: %m");
137                 if (!res) {
138                         msg(L_ERROR, "HTTP write timed out");
139                         return;
140                 }
141
142                 res = write(http->sk, buf, len);
143                 if (res < 0) {
144                         msg(L_ERROR, "HTTP write failed: %m");
145                         return;
146                 }
147                 buf += res;
148                 len -= res;
149         }
150 }
151
152 static void http_error(struct http *http, const char *err)
153 {
154         msg(L_INFO, "HTTP error: %s", err);
155         fbpool_start(&http->fbpool, http->mp, 0);
156         bprintf(&http->fbpool.fb, "HTTP/1.1 %s\r\n", err);
157         http_send(http);
158 }
159
160 static int http_get_char(struct http *http) {
161         if (http->iobuf_pos >= http->iobuf_max) {
162                 struct pollfd pfd = { .fd = http->sk, .events = POLLIN, .revents = 0 };
163                 int res = poll(&pfd, 1, IO_TIMEOUT);
164                 if (res < 0)
165                         die("Poll failed: %m");
166                 if (!res) {
167                         msg(L_ERROR, "HTTP read timed out");
168                         return -1;
169                 }
170                 int len = read(http->sk, http->iobuf, IOBUF_SIZE);
171                 if (len < 0) {
172                         msg(L_ERROR, "HTTP read error: %m");
173                         return -1;
174                 }
175                 if (!len) {
176                         msg(L_ERROR, "HTTP connection closed");
177                         return -1;
178                 }
179                 http->iobuf_pos = 0;
180                 http->iobuf_max = len;
181         }
182         return http->iobuf[http->iobuf_pos++];
183 }
184
185 static bool http_get_line(struct http *http)
186 {
187         uint i = 0;
188         for (;;) {
189                 int c = http_get_char(http);
190                 if (c < 0)
191                         return false;
192                 if (c == '\n') {
193                         if (i > 0 && http->linebuf[i-1] == '\r')
194                                 i--;
195                         http->linebuf[i] = 0;
196                         return true;
197                 }
198                 if (i >= IOBUF_SIZE-1) {
199                         http_error(http, "400 Line too long");
200                         return false;
201                 }
202                 http->linebuf[i++] = c;
203         }
204 }
205
206 static void http_connection(struct http *http)
207 {
208         http->iobuf = mp_alloc(http->mp, IOBUF_SIZE);
209         http->linebuf = mp_alloc(http->mp, LINEBUF_SIZE);
210         fbpool_init(&http->fbpool);
211
212         if (!http_get_line(http))
213                 return;
214         // msg(L_DEBUG, "Request line: <%s>", http->linebuf);
215         char *words[3];
216         if (str_wordsplit(http->linebuf, words, 3) != 3) {
217                 http_error(http, "400 Invalid request line");
218                 return;
219         }
220         if (strcmp(words[0], "GET")) {
221                 http_error(http, "501 Method not implemented");
222                 return;
223         }
224
225         for (;;) {
226                 if (!http_get_line(http))
227                         return;
228                 if (!http->linebuf[0])
229                         break;
230                 // msg(L_DEBUG, "Header line: <%s>", http->linebuf);
231         }
232
233         fbpool_start(&http->fbpool, http->mp, 0);
234         struct fastbuf *fb = &http->fbpool.fb;
235         bprintf(fb, "HTTP/1.1 200 OK\r\n");
236         bprintf(fb, "Content-type: text/plain; version=0.0.4\r\n");
237         bprintf(fb, "\r\n");
238         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
239                 if (attr_table[i].topic) {
240                         pthread_mutex_lock(&attr_mutex);
241                         bprintf(fb, "%s %s\n", attr_table[i].pm, attr_values[i] ? : "");
242                         pthread_mutex_unlock(&attr_mutex);
243                 } else {
244                         bprintf(fb, "%s\n", attr_table[i].pm);
245                 }
246         }
247         http_send(http);
248 }
249
250 static int use_daemon;
251 static int use_debug;
252
253 static struct opt_section options = {
254         OPT_ITEMS {
255                 OPT_HELP("A daemon for controlling the solid state relay module via MQTT"),
256                 OPT_HELP(""),
257                 OPT_HELP("Options:"),
258                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
259                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
260                 OPT_HELP_OPTION,
261                 OPT_CONF_OPTIONS,
262                 OPT_END
263         }
264 };
265
266 int main(int argc UNUSED, char **argv)
267 {
268         log_init(argv[0]);
269         opt_parse(&options, argv+1);
270
271         if (use_daemon) {
272                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
273                 log_set_default_stream(ls);
274         }
275         if (!use_debug)
276                 log_default_stream()->levels &= ~(1U << L_DEBUG);
277
278         mosquitto_lib_init();
279         mosq = mosquitto_new("prometheus", 1, NULL);
280         if (!mosq)
281                 die("Mosquitto: initialization failed");
282
283         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
284         mosquitto_log_callback_set(mosq, mqtt_log_callback);
285         mosquitto_message_callback_set(mosq, mqtt_msg_callback);
286
287 #if 0
288         // FIXME: Publish online/offline status
289         if (mosquitto_will_set(mosq, "burrow/loft/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
290                 die("Mosquitto: unable to set will");
291 #endif
292
293         if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
294                 die("Mosquitto: connect failed");
295
296         if (mosquitto_loop_start(mosq))
297                 die("Mosquitto: cannot start service thread");
298
299         int listen_sk = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
300         if (listen_sk < 0)
301                 die("Cannot create listening socket: %m");
302
303         int one = 1;
304         if (setsockopt(listen_sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
305                 die("Cannot set SO_REUSEADDR: %m");
306
307         struct sockaddr_in sin = { .sin_family = AF_INET, .sin_port = htons(1422), .sin_addr = INADDR_ANY };
308         if (bind(listen_sk, (const struct sockaddr *) &sin, sizeof(sin)) < 0)
309                 die("Cannot bind listening socket: %m");
310
311         if (listen(listen_sk, 64) < 0)
312                 die("Cannot listen: %m");
313
314         for (;;) {
315                 int sk = accept(listen_sk, NULL, NULL);
316                 if (sk < 0) {
317                         msg(L_ERROR, "HTTP accept failed: %m");
318                         continue;
319                 }
320                 msg(L_DEBUG, "HTTP accepted connection");
321                 struct mempool *mp = mp_new(4096);
322                 struct http *http = mp_alloc_zero(mp, sizeof(*http));
323                 http->mp = mp;
324                 http->sk = sk;
325                 http_connection(http);
326                 mp_delete(mp);
327                 close(sk);
328         }
329 }