2 * A gateway between MQTT and Prometheus
4 * (c) 2018 Martin Mares <mj@ucw.cz>
8 #include <ucw/fastbuf.h>
10 #include <ucw/mempool.h>
12 #include <ucw/string.h>
14 #include <netinet/in.h>
21 #include <sys/socket.h>
26 #include <mosquitto.h>
28 static struct mosquitto *mosq;
35 static const struct attr attr_table[] = {
36 { "# HELP temp_loft Temperature in the loft [degC]", NULL },
37 { "# TYPE temp_loft gauge", NULL },
38 { "temp_loft", "burrow/loft/temperature" },
39 { "# HELP loft_fan Fan speed in the loft", NULL },
40 { "# TYPE loft_fan gauge", NULL },
41 { "loft_fan", "burrow/loft/fan" },
42 { "# HELP temp_ursarium Temperature in the Ursarium [degC]", NULL },
43 { "# TYPE temp_ursarium gauge", NULL },
44 { "temp_ursarium", "burrow/arexxd/ursarium" },
45 { "# HELP temp_catarium Temperature in the Catarium [degC]", NULL },
46 { "# TYPE temp_catarium gauge", NULL },
47 { "temp_catarium", "burrow/arexxd/catarium" },
48 { "# HELP temp_machinarium Temperature in the Machinarium [degC]", NULL },
49 { "# TYPE temp_machinarium gauge", NULL },
50 { "temp_machinarium", "burrow/arexxd/machinarium" },
51 { "# HELP temp_garage Temperature in the garage [degC]", NULL },
52 { "# TYPE temp_garage gauge", NULL },
53 { "temp_garage", "burrow/arexxd/garage" },
54 { "# HELP rh_garage Relative humidity in the garage [%]", NULL },
55 { "# TYPE rh_garage gauge", NULL },
56 { "rh_garage", "burrow/arexxd/garage-rh" },
59 static char *attr_values[ARRAY_SIZE(attr_table)];
60 static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
62 static void mqtt_publish(const char *topic, const char *fmt, ...)
67 int l = vsnprintf(m, sizeof(m), fmt, args);
68 if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
69 msg(L_ERROR, "Mosquitto: publish failed");
73 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
76 msg(L_DEBUG, "MQTT: Connection established, subscribing");
77 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
78 die("Mosquitto: subscribe failed");
80 // mqtt_publish("burrow/loft/status", "ok");
84 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
86 // msg(L_INFO, "MQTT(%d): %s", level, message);
89 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
92 if (m->payloadlen >= sizeof(val) - 1) {
93 msg(L_ERROR, "Invalid value for topic %s", m->topic);
96 memcpy(val, m->payload, m->payloadlen);
97 val[m->payloadlen] = 0;
98 msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
100 for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
101 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
102 pthread_mutex_lock(&attr_mutex);
103 if (attr_values[i]) {
104 xfree(attr_values[i]);
105 attr_values[i] = NULL;
108 attr_values[i] = xstrdup(val);
109 pthread_mutex_unlock(&attr_mutex);
118 uint iobuf_pos, iobuf_max;
120 struct fbpool fbpool;
123 #define IO_TIMEOUT 60000 // in ms
124 #define IOBUF_SIZE 1024
125 #define LINEBUF_SIZE 1024
127 static void http_send(struct http *http)
129 byte *buf = fbpool_end(&http->fbpool);
130 uint len = mp_size(http->mp, buf);
133 struct pollfd pfd = { .fd = http->sk, .events = POLLOUT, .revents = 0 };
134 int res = poll(&pfd, 1, IO_TIMEOUT);
136 die("Poll failed: %m");
138 msg(L_ERROR, "HTTP write timed out");
142 res = write(http->sk, buf, len);
144 msg(L_ERROR, "HTTP write failed: %m");
152 static void http_error(struct http *http, const char *err)
154 msg(L_INFO, "HTTP error: %s", err);
155 fbpool_start(&http->fbpool, http->mp, 0);
156 bprintf(&http->fbpool.fb, "HTTP/1.1 %s\r\n", err);
160 static int http_get_char(struct http *http) {
161 if (http->iobuf_pos >= http->iobuf_max) {
162 struct pollfd pfd = { .fd = http->sk, .events = POLLIN, .revents = 0 };
163 int res = poll(&pfd, 1, IO_TIMEOUT);
165 die("Poll failed: %m");
167 msg(L_ERROR, "HTTP read timed out");
170 int len = read(http->sk, http->iobuf, IOBUF_SIZE);
172 msg(L_ERROR, "HTTP read error: %m");
176 msg(L_ERROR, "HTTP connection closed");
180 http->iobuf_max = len;
182 return http->iobuf[http->iobuf_pos++];
185 static bool http_get_line(struct http *http)
189 int c = http_get_char(http);
193 if (i > 0 && http->linebuf[i-1] == '\r')
195 http->linebuf[i] = 0;
198 if (i >= IOBUF_SIZE-1) {
199 http_error(http, "400 Line too long");
202 http->linebuf[i++] = c;
206 static void http_connection(struct http *http)
208 http->iobuf = mp_alloc(http->mp, IOBUF_SIZE);
209 http->linebuf = mp_alloc(http->mp, LINEBUF_SIZE);
210 fbpool_init(&http->fbpool);
212 if (!http_get_line(http))
214 // msg(L_DEBUG, "Request line: <%s>", http->linebuf);
216 if (str_wordsplit(http->linebuf, words, 3) != 3) {
217 http_error(http, "400 Invalid request line");
220 if (strcmp(words[0], "GET")) {
221 http_error(http, "501 Method not implemented");
226 if (!http_get_line(http))
228 if (!http->linebuf[0])
230 // msg(L_DEBUG, "Header line: <%s>", http->linebuf);
233 fbpool_start(&http->fbpool, http->mp, 0);
234 struct fastbuf *fb = &http->fbpool.fb;
235 bprintf(fb, "HTTP/1.1 200 OK\r\n");
236 bprintf(fb, "Content-type: text/plain; version=0.0.4\r\n");
238 for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
239 if (attr_table[i].topic) {
240 pthread_mutex_lock(&attr_mutex);
241 bprintf(fb, "%s %s\n", attr_table[i].pm, attr_values[i] ? : "");
242 pthread_mutex_unlock(&attr_mutex);
244 bprintf(fb, "%s\n", attr_table[i].pm);
250 static int use_daemon;
251 static int use_debug;
253 static struct opt_section options = {
255 OPT_HELP("A daemon for controlling the solid state relay module via MQTT"),
257 OPT_HELP("Options:"),
258 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
259 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
266 int main(int argc UNUSED, char **argv)
269 opt_parse(&options, argv+1);
272 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
273 log_set_default_stream(ls);
276 log_default_stream()->levels &= ~(1U << L_DEBUG);
278 mosquitto_lib_init();
279 mosq = mosquitto_new("prometheus", 1, NULL);
281 die("Mosquitto: initialization failed");
283 mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
284 mosquitto_log_callback_set(mosq, mqtt_log_callback);
285 mosquitto_message_callback_set(mosq, mqtt_msg_callback);
288 // FIXME: Publish online/offline status
289 if (mosquitto_will_set(mosq, "burrow/loft/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
290 die("Mosquitto: unable to set will");
293 if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
294 die("Mosquitto: connect failed");
296 if (mosquitto_loop_start(mosq))
297 die("Mosquitto: cannot start service thread");
299 int listen_sk = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
301 die("Cannot create listening socket: %m");
304 if (setsockopt(listen_sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
305 die("Cannot set SO_REUSEADDR: %m");
307 struct sockaddr_in sin = { .sin_family = AF_INET, .sin_port = htons(1422), .sin_addr = INADDR_ANY };
308 if (bind(listen_sk, (const struct sockaddr *) &sin, sizeof(sin)) < 0)
309 die("Cannot bind listening socket: %m");
311 if (listen(listen_sk, 64) < 0)
312 die("Cannot listen: %m");
315 int sk = accept(listen_sk, NULL, NULL);
317 msg(L_ERROR, "HTTP accept failed: %m");
320 msg(L_DEBUG, "HTTP accepted connection");
321 struct mempool *mp = mp_new(4096);
322 struct http *http = mp_alloc_zero(mp, sizeof(*http));
325 http_connection(http);