]> mj.ucw.cz Git - home-hw.git/blobdiff - prometheus/burrow-prometheus.c
Merge branch 'master' of ssh://git.ucw.cz/home/mj/GIT/home-hw
[home-hw.git] / prometheus / burrow-prometheus.c
index 8f0f560c235b130b1e1d230ef6ab27e1bca68f99..0e760e737728bdf714d14903461499cb0918a53d 100644 (file)
 /*
  *     A gateway between MQTT and Prometheus
  *
 /*
  *     A gateway between MQTT and Prometheus
  *
- *     (c) 2018 Martin Mares <mj@ucw.cz>
+ *     (c) 2018--2019 Martin Mares <mj@ucw.cz>
  */
 
 #include <ucw/lib.h>
  */
 
 #include <ucw/lib.h>
+#include <ucw/fastbuf.h>
 #include <ucw/log.h>
 #include <ucw/log.h>
-#include <ucw/mainloop.h>
+#include <ucw/mempool.h>
 #include <ucw/opt.h>
 #include <ucw/opt.h>
+#include <ucw/string.h>
 
 
+#include <netinet/in.h>
+#include <pthread.h>
 #include <stdarg.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <stdarg.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <sys/poll.h>
+#include <sys/socket.h>
 #include <syslog.h>
 #include <time.h>
 #include <syslog.h>
 #include <time.h>
+#include <unistd.h>
 
 #include <mosquitto.h>
 
 
 #include <mosquitto.h>
 
+#define MEASUREMENT_TIMEOUT 120
+
 static struct mosquitto *mosq;
 
 struct attr {
 static struct mosquitto *mosq;
 
 struct attr {
-       const char *pm;
+       const char *metric;
+       const char *help;
+       const char *type;
        const char *topic;
 };
 
 static const struct attr attr_table[] = {
        const char *topic;
 };
 
 static const struct attr attr_table[] = {
-       { "# HELP loft_temp Temperature in the loft", NULL },
-       { "# TYPE loft_temp gauge", NULL },
-       { "loft_temp", "burrow/loft/temperature" },
-       { "# HELP loft_fan Fan speed in the loft", NULL },
-       { "# TYPE loft_fan gauge", NULL },
-       { "loft_fan", "burrow/loft/fan" },
-       { NULL, NULL },
+       {
+               .metric = "temp_loft",
+               .help = "Temperature in the loft [degC]",
+               .type = "gauge",
+               .topic = "burrow/temp/loft",
+       },
+       {
+               .metric = "loft_fan",
+               .help = "Fan speed in the loft (0-3)",
+               .type = "gauge",
+               .topic = "burrow/loft/fan"
+       },
+       {
+               .metric = "loft_circ",
+               .help = "Warm water circulation (0-1)",
+               .type = "gauge",
+               .topic = "burrow/loft/circulation"
+       },
+       {
+               .metric = "temp_ursarium",
+               .help = "Temperature in the Ursarium [degC]",
+               .type = "gauge",
+               .topic = "burrow/temp/ursarium"
+       },
+       {
+               .metric = "temp_catarium",
+               .help = "Temperature in the Catarium [degC]",
+               .type = "gauge",
+               .topic = "burrow/temp/catarium"
+       },
+       {
+               .metric = "temp_garage",
+               .help = "Temperature in the garage [degC]",
+               .type = "gauge",
+               .topic = "burrow/temp/garage"
+       },
+       {
+               .metric = "temp_kitchen",
+               .help = "Temperature in the kitchen [degC]",
+               .type = "gauge",
+               .topic = "burrow/temp/kitchen"
+       },
+       {
+               .metric = "rh_ursarium",
+               .help = "Relative humidity in the Ursarium [%]",
+               .type = "gauge",
+               .topic = "burrow/temp/ursarium-rh"
+       },
+       {
+               .metric = "temp_catarium_clock",
+               .help = "Temperature on Catarium clock [degC]",
+               .type = "gauge",
+               .topic = "burrow/temp/clock"
+       },
+       {
+               .metric = "press_catarium_clock",
+               .help = "Pressure on Catarium clock [Pa]",
+               .type = "gauge",
+               .topic = "burrow/pressure/clock"
+       },
+       {
+               .metric = "air_inside_intake",
+               .help = "Temperature of air intake from inside [degC]",
+               .type = "gauge",
+               .topic = "burrow/air/inside-intake",
+       },
+       {
+               .metric = "air_inside_exhaust",
+               .help = "Temperature of air exhaust to inside [degC]",
+               .type = "gauge",
+               .topic = "burrow/air/inside-exhaust",
+       },
+       {
+               .metric = "air_outside_intake",
+               .help = "Temperature of air intake from outside [degC]",
+               .type = "gauge",
+               .topic = "burrow/air/outside-intake",
+       },
+       {
+               .metric = "air_outside_exhaust",
+               .help = "Temperature of air exhaust to outside [degC]",
+               .type = "gauge",
+               .topic = "burrow/air/outside-exhaust",
+       },
+       {
+               .metric = "air_mixed",
+               .help = "Temperature of mixed air [degC]",
+               .type = "gauge",
+               .topic = "burrow/air/mixed",
+       },
+       {
+               .metric = "air_inside_intake_avg",
+               .help = "Average temperature of air intake from inside [degC]",
+               .type = "gauge",
+               .topic = "burrow/avg/air/inside-intake",
+       },
+       {
+               .metric = "air_outside_intake_avg",
+               .help = "Average temperature of air intake from outside [degC]",
+               .type = "gauge",
+               .topic = "burrow/avg/air/outside-intake",
+       },
+       {
+               .metric = "air_bypass",
+               .help = "Heat exchanger bypass (0-1)",
+               .type = "gauge",
+               .topic = "burrow/air/bypass"
+       },
+       {
+               .metric = "air_fan_pwm",
+               .help = "Heat exchanger fan PWM (0-255)",
+               .type = "gauge",
+               .topic = "burrow/air/exchanger-fan"
+       },
+       {
+               // Common heading for all voltages
+               .metric = "pm_voltage",
+               .help = "Voltage between phases and neutral [V]",
+               .type = "gauge",
+       },
+       {
+               .metric = "pm_voltage{phase=\"L1N\"}",
+               .topic = "burrow/power/voltage/l1n",
+       },
+       {
+               .metric = "pm_voltage{phase=\"L2N\"}",
+               .topic = "burrow/power/voltage/l2n",
+       },
+       {
+               .metric = "pm_voltage{phase=\"L3N\"}",
+               .topic = "burrow/power/voltage/l3n",
+       },
+       {
+               // Common heading for all currents
+               .metric = "pm_current",
+               .help = "Current through phases [A]",
+               .type = "gauge",
+       },
+       {
+               .metric = "pm_current{phase=\"L1\"}",
+               .topic = "burrow/power/current/l1",
+       },
+       {
+               .metric = "pm_current{phase=\"L2\"}",
+               .topic = "burrow/power/current/l2",
+       },
+       {
+               .metric = "pm_current{phase=\"L3\"}",
+               .topic = "burrow/power/current/l3",
+       },
+       {
+               .metric = "pm_power",
+               .help = "Total power [W]",
+               .type = "gauge",
+               .topic = "burrow/power/power",
+       },
+       {
+               .metric = "pm_energy",
+               .help = "Total energy [kWh]",
+               .type = "gauge",
+               .topic = "burrow/power/energy",
+       },
+       {
+               .metric = "pm_reactive_power",
+               .help = "Total reactive power [VAr]",
+               .type = "gauge",
+               .topic = "burrow/power/reactive/power",
+       },
+       {
+               .metric = "pm_reactive_energy",
+               .help = "Total reactive energy [kVArh]",
+               .type = "gauge",
+               .topic = "burrow/power/reactive/energy",
+       },
 };
 
 static char *attr_values[ARRAY_SIZE(attr_table)];
 };
 
 static char *attr_values[ARRAY_SIZE(attr_table)];
+static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 static void mqtt_publish(const char *topic, const char *fmt, ...)
 {
 
 static void mqtt_publish(const char *topic, const char *fmt, ...)
 {
@@ -48,12 +227,15 @@ static void mqtt_publish(const char *topic, const char *fmt, ...)
        va_end(args);
 }
 
        va_end(args);
 }
 
-static void mqtt_setup(void)
+static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
 {
 {
-       if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
-               die("Mosquitto: subscribe failed");
+       if (!status) {
+               msg(L_DEBUG, "MQTT: Connection established, subscribing");
+               if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
+                       die("Mosquitto: subscribe failed");
 
 
-       // mqtt_publish("burrow/loft/status", "ok");
+               mqtt_publish("status/prometheus", "ok");
+       }
 }
 
 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
 }
 
 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
@@ -74,69 +256,184 @@ static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, c
 
        for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
                if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
 
        for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
                if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
+                       pthread_mutex_lock(&attr_mutex);
                        if (attr_values[i]) {
                                xfree(attr_values[i]);
                                attr_values[i] = NULL;
                        }
                        if (val[0])
                                attr_values[i] = xstrdup(val);
                        if (attr_values[i]) {
                                xfree(attr_values[i]);
                                attr_values[i] = NULL;
                        }
                        if (val[0])
                                attr_values[i] = xstrdup(val);
+                       pthread_mutex_unlock(&attr_mutex);
                }
        }
 }
 
                }
        }
 }
 
-static struct main_file mqtt_file;
-static struct main_timer mqtt_timer;
-#define MQTT_TIMER_FREQ 5000
+struct http {
+       struct mempool *mp;
+       int sk;
+       char *iobuf;
+       uint iobuf_pos, iobuf_max;
+       char *linebuf;
+       struct fbpool fbpool;
+};
+
+#define IO_TIMEOUT 60000       // in ms
+#define IOBUF_SIZE 1024
+#define LINEBUF_SIZE 1024
 
 
-static void mqtt_recalc_main(void)
+static void http_send(struct http *http)
 {
 {
-       int fd = mosquitto_socket(mosq);
-       if (fd != mqtt_file.fd) {
-               msg(L_DEBUG, "MQTT: Changing socket fd to %d", fd);
-               if (fd < 0) {
-                       file_del(&mqtt_file);
-                       mqtt_file.fd = -1;
-               } else {
-                       mqtt_file.fd = fd;
-                       file_add(&mqtt_file);
+       byte *buf = fbpool_end(&http->fbpool);
+       uint len = mp_size(http->mp, buf);
+
+       while (len) {
+               struct pollfd pfd = { .fd = http->sk, .events = POLLOUT, .revents = 0 };
+               int res = poll(&pfd, 1, IO_TIMEOUT);
+               if (res < 0)
+                       die("Poll failed: %m");
+               if (!res) {
+                       msg(L_ERROR, "HTTP write timed out");
+                       return;
+               }
+
+               res = write(http->sk, buf, len);
+               if (res < 0) {
+                       msg(L_ERROR, "HTTP write failed: %m");
+                       return;
                }
                }
+               buf += res;
+               len -= res;
        }
 }
 
        }
 }
 
-static void mqtt_main_timer(struct main_timer *tm)
+static void http_error(struct http *http, const char *err)
 {
 {
-       msg(L_DEBUG, "MQTT: Timer handler");
-       mosquitto_loop_misc(mosq);
-       mqtt_recalc_main();
-       timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ);
+       msg(L_INFO, "HTTP error: %s", err);
+       fbpool_start(&http->fbpool, http->mp, 0);
+       bprintf(&http->fbpool.fb, "HTTP/1.1 %s\r\n", err);
+       http_send(http);
 }
 
 }
 
-static int mqtt_main_read(struct main_file *mf UNUSED)
+static int http_get_char(struct http *http) {
+       if (http->iobuf_pos >= http->iobuf_max) {
+               struct pollfd pfd = { .fd = http->sk, .events = POLLIN, .revents = 0 };
+               int res = poll(&pfd, 1, IO_TIMEOUT);
+               if (res < 0)
+                       die("Poll failed: %m");
+               if (!res) {
+                       msg(L_ERROR, "HTTP read timed out");
+                       return -1;
+               }
+               int len = read(http->sk, http->iobuf, IOBUF_SIZE);
+               if (len < 0) {
+                       msg(L_ERROR, "HTTP read error: %m");
+                       return -1;
+               }
+               if (!len) {
+                       msg(L_ERROR, "HTTP connection closed");
+                       return -1;
+               }
+               http->iobuf_pos = 0;
+               http->iobuf_max = len;
+       }
+       return http->iobuf[http->iobuf_pos++];
+}
+
+static bool http_get_line(struct http *http)
 {
 {
-       msg(L_DEBUG, "MQTT: Read handler");
-       mosquitto_loop_read(mosq, 1);
-       mqtt_recalc_main();
-       return HOOK_IDLE;
+       uint i = 0;
+       for (;;) {
+               int c = http_get_char(http);
+               if (c < 0)
+                       return false;
+               if (c == '\n') {
+                       if (i > 0 && http->linebuf[i-1] == '\r')
+                               i--;
+                       http->linebuf[i] = 0;
+                       return true;
+               }
+               if (i >= IOBUF_SIZE-1) {
+                       http_error(http, "400 Line too long");
+                       return false;
+               }
+               http->linebuf[i++] = c;
+       }
 }
 
 }
 
-static int mqtt_main_write(struct main_file *mf UNUSED)
+static void http_answer(struct fastbuf *fb)
 {
 {
-       msg(L_DEBUG, "MQTT: Write handler");
-       mosquitto_loop_write(mosq, 1);
-       mqtt_recalc_main();
-       return HOOK_IDLE;
+       char val[256], *w[2];
+       time_t now = time(NULL);
+
+       for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
+               const struct attr *a = &attr_table[i];
+
+               pthread_mutex_lock(&attr_mutex);
+               snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
+               pthread_mutex_unlock(&attr_mutex);
+
+               if (a->help)
+                       bprintf(fb, "# HELP %s %s\n", a->metric, a->help);
+               if (a->type)
+                       bprintf(fb, "# TYPE %s %s\n", a->metric, a->type);
+
+               if (!val[0])
+                       continue;
+               int fields = str_wordsplit(val, w, ARRAY_SIZE(w));
+               if (fields < 1)
+                       continue;
+               if (fields >= 2) {
+                       time_t t = atoll(w[1]);
+                       if (t < now - MEASUREMENT_TIMEOUT)
+                               continue;
+               }
+
+               if (a->topic) {
+                       bprintf(fb, "%s %s", a->metric, val);
+#if 0
+                       // Prometheus does not like our timestamps -- why?
+                       if (fields >= 2)
+                               bprintf(fb, " %s", w[1]);
+#endif
+                       bputc(fb, '\n');
+               }
+       }
 }
 
 }
 
-static void mqtt_main_init(void)
+static void http_connection(struct http *http)
 {
 {
-       mqtt_file.fd = -1;
-       mqtt_file.read_handler = mqtt_main_read;
-       mqtt_file.write_handler = mqtt_main_write;
+       http->iobuf = mp_alloc(http->mp, IOBUF_SIZE);
+       http->linebuf = mp_alloc(http->mp, LINEBUF_SIZE);
+       fbpool_init(&http->fbpool);
+
+       if (!http_get_line(http))
+               return;
+       // msg(L_DEBUG, "Request line: <%s>", http->linebuf);
+       char *words[3];
+       if (str_wordsplit(http->linebuf, words, 3) != 3) {
+               http_error(http, "400 Invalid request line");
+               return;
+       }
+       if (strcmp(words[0], "GET")) {
+               http_error(http, "501 Method not implemented");
+               return;
+       }
 
 
-       mqtt_timer.handler = mqtt_main_timer;
-       timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ);
+       for (;;) {
+               if (!http_get_line(http))
+                       return;
+               if (!http->linebuf[0])
+                       break;
+               // msg(L_DEBUG, "Header line: <%s>", http->linebuf);
+       }
 
 
-       mqtt_recalc_main();
+       fbpool_start(&http->fbpool, http->mp, 0);
+       struct fastbuf *fb = &http->fbpool.fb;
+       bprintf(fb, "HTTP/1.1 200 OK\r\n");
+       bprintf(fb, "Content-type: text/plain; version=0.0.4\r\n");
+       bprintf(fb, "\r\n");
+       http_answer(fb);
+       http_send(http);
 }
 
 static int use_daemon;
 }
 
 static int use_daemon;
@@ -144,7 +441,7 @@ static int use_debug;
 
 static struct opt_section options = {
        OPT_ITEMS {
 
 static struct opt_section options = {
        OPT_ITEMS {
-               OPT_HELP("A daemon for controlling the solid state relay module via MQTT"),
+               OPT_HELP("A daemon for transferring MQTT data to Prometheus"),
                OPT_HELP(""),
                OPT_HELP("Options:"),
                OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
                OPT_HELP(""),
                OPT_HELP("Options:"),
                OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
@@ -159,7 +456,6 @@ int main(int argc UNUSED, char **argv)
 {
        log_init(argv[0]);
        opt_parse(&options, argv+1);
 {
        log_init(argv[0]);
        opt_parse(&options, argv+1);
-       main_init();
 
        if (use_daemon) {
                struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
 
        if (use_daemon) {
                struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
@@ -173,33 +469,47 @@ int main(int argc UNUSED, char **argv)
        if (!mosq)
                die("Mosquitto: initialization failed");
 
        if (!mosq)
                die("Mosquitto: initialization failed");
 
+       mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
        mosquitto_log_callback_set(mosq, mqtt_log_callback);
        mosquitto_message_callback_set(mosq, mqtt_msg_callback);
        mosquitto_log_callback_set(mosq, mqtt_log_callback);
        mosquitto_message_callback_set(mosq, mqtt_msg_callback);
-       mqtt_main_init();
 
 
-#if 0
-       // FIXME: Publish online/offline status
-       if (mosquitto_will_set(mosq, "burrow/loft/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
+       if (mosquitto_will_set(mosq, "status/prometheus", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
                die("Mosquitto: unable to set will");
                die("Mosquitto: unable to set will");
-#endif
 
 
-       if (mosquitto_connect(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
+       if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
                die("Mosquitto: connect failed");
 
                die("Mosquitto: connect failed");
 
-       mqtt_setup();
+       if (mosquitto_loop_start(mosq))
+               die("Mosquitto: cannot start service thread");
+
+       int listen_sk = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+       if (listen_sk < 0)
+               die("Cannot create listening socket: %m");
+
+       int one = 1;
+       if (setsockopt(listen_sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
+               die("Cannot set SO_REUSEADDR: %m");
+
+       struct sockaddr_in sin = { .sin_family = AF_INET, .sin_port = htons(1422), .sin_addr = INADDR_ANY };
+       if (bind(listen_sk, (const struct sockaddr *) &sin, sizeof(sin)) < 0)
+               die("Cannot bind listening socket: %m");
+
+       if (listen(listen_sk, 64) < 0)
+               die("Cannot listen: %m");
 
 
-       main_loop();
-#if 0
        for (;;) {
        for (;;) {
-               int err = mosquitto_loop(mosq, 5000, 1);
-               if (err == MOSQ_ERR_NO_CONN) {
-                       err = mosquitto_reconnect(mosq);
-                       if (err == MOSQ_ERR_SUCCESS)
-                               mqtt_setup();
-                       else
-                               msg(L_ERROR, "Mosquitto: cannot reconnect, error %d", err);
-               } else if (err != MOSQ_ERR_SUCCESS)
-                       msg(L_ERROR, "Mosquitto: loop returned error %d", err);
+               int sk = accept(listen_sk, NULL, NULL);
+               if (sk < 0) {
+                       msg(L_ERROR, "HTTP accept failed: %m");
+                       continue;
+               }
+               msg(L_DEBUG, "HTTP accepted connection");
+               struct mempool *mp = mp_new(4096);
+               struct http *http = mp_alloc_zero(mp, sizeof(*http));
+               http->mp = mp;
+               http->sk = sk;
+               http_connection(http);
+               mp_delete(mp);
+               close(sk);
        }
        }
-#endif
 }
 }