X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=prometheus%2Fburrow-prometheus.c;h=0e760e737728bdf714d14903461499cb0918a53d;hb=fef910220f1ee04d6a572996bcc11ce7715b04d5;hp=8f0f560c235b130b1e1d230ef6ab27e1bca68f99;hpb=d6eec6a8f706824da16859cfa27fb5d462803956;p=home-hw.git diff --git a/prometheus/burrow-prometheus.c b/prometheus/burrow-prometheus.c index 8f0f560..0e760e7 100644 --- a/prometheus/burrow-prometheus.c +++ b/prometheus/burrow-prometheus.c @@ -1,41 +1,220 @@ /* * A gateway between MQTT and Prometheus * - * (c) 2018 Martin Mares + * (c) 2018--2019 Martin Mares */ #include +#include #include -#include +#include #include +#include +#include +#include #include #include #include #include +#include +#include #include #include +#include #include +#define MEASUREMENT_TIMEOUT 120 + static struct mosquitto *mosq; struct attr { - const char *pm; + const char *metric; + const char *help; + const char *type; const char *topic; }; static const struct attr attr_table[] = { - { "# HELP loft_temp Temperature in the loft", NULL }, - { "# TYPE loft_temp gauge", NULL }, - { "loft_temp", "burrow/loft/temperature" }, - { "# HELP loft_fan Fan speed in the loft", NULL }, - { "# TYPE loft_fan gauge", NULL }, - { "loft_fan", "burrow/loft/fan" }, - { NULL, NULL }, + { + .metric = "temp_loft", + .help = "Temperature in the loft [degC]", + .type = "gauge", + .topic = "burrow/temp/loft", + }, + { + .metric = "loft_fan", + .help = "Fan speed in the loft (0-3)", + .type = "gauge", + .topic = "burrow/loft/fan" + }, + { + .metric = "loft_circ", + .help = "Warm water circulation (0-1)", + .type = "gauge", + .topic = "burrow/loft/circulation" + }, + { + .metric = "temp_ursarium", + .help = "Temperature in the Ursarium [degC]", + .type = "gauge", + .topic = "burrow/temp/ursarium" + }, + { + .metric = "temp_catarium", + .help = "Temperature in the Catarium [degC]", + .type = "gauge", + .topic = "burrow/temp/catarium" + }, + { + .metric = "temp_garage", + .help = "Temperature in the garage [degC]", + .type = "gauge", + .topic = "burrow/temp/garage" + }, + { + .metric = "temp_kitchen", + .help = "Temperature in the kitchen [degC]", + .type = "gauge", + .topic = "burrow/temp/kitchen" + }, + { + .metric = "rh_ursarium", + .help = "Relative humidity in the Ursarium [%]", + .type = "gauge", + .topic = "burrow/temp/ursarium-rh" + }, + { + .metric = "temp_catarium_clock", + .help = "Temperature on Catarium clock [degC]", + .type = "gauge", + .topic = "burrow/temp/clock" + }, + { + .metric = "press_catarium_clock", + .help = "Pressure on Catarium clock [Pa]", + .type = "gauge", + .topic = "burrow/pressure/clock" + }, + { + .metric = "air_inside_intake", + .help = "Temperature of air intake from inside [degC]", + .type = "gauge", + .topic = "burrow/air/inside-intake", + }, + { + .metric = "air_inside_exhaust", + .help = "Temperature of air exhaust to inside [degC]", + .type = "gauge", + .topic = "burrow/air/inside-exhaust", + }, + { + .metric = "air_outside_intake", + .help = "Temperature of air intake from outside [degC]", + .type = "gauge", + .topic = "burrow/air/outside-intake", + }, + { + .metric = "air_outside_exhaust", + .help = "Temperature of air exhaust to outside [degC]", + .type = "gauge", + .topic = "burrow/air/outside-exhaust", + }, + { + .metric = "air_mixed", + .help = "Temperature of mixed air [degC]", + .type = "gauge", + .topic = "burrow/air/mixed", + }, + { + .metric = "air_inside_intake_avg", + .help = "Average temperature of air intake from inside [degC]", + .type = "gauge", + .topic = "burrow/avg/air/inside-intake", + }, + { + .metric = "air_outside_intake_avg", + .help = "Average temperature of air intake from outside [degC]", + .type = "gauge", + .topic = "burrow/avg/air/outside-intake", + }, + { + .metric = "air_bypass", + .help = "Heat exchanger bypass (0-1)", + .type = "gauge", + .topic = "burrow/air/bypass" + }, + { + .metric = "air_fan_pwm", + .help = "Heat exchanger fan PWM (0-255)", + .type = "gauge", + .topic = "burrow/air/exchanger-fan" + }, + { + // Common heading for all voltages + .metric = "pm_voltage", + .help = "Voltage between phases and neutral [V]", + .type = "gauge", + }, + { + .metric = "pm_voltage{phase=\"L1N\"}", + .topic = "burrow/power/voltage/l1n", + }, + { + .metric = "pm_voltage{phase=\"L2N\"}", + .topic = "burrow/power/voltage/l2n", + }, + { + .metric = "pm_voltage{phase=\"L3N\"}", + .topic = "burrow/power/voltage/l3n", + }, + { + // Common heading for all currents + .metric = "pm_current", + .help = "Current through phases [A]", + .type = "gauge", + }, + { + .metric = "pm_current{phase=\"L1\"}", + .topic = "burrow/power/current/l1", + }, + { + .metric = "pm_current{phase=\"L2\"}", + .topic = "burrow/power/current/l2", + }, + { + .metric = "pm_current{phase=\"L3\"}", + .topic = "burrow/power/current/l3", + }, + { + .metric = "pm_power", + .help = "Total power [W]", + .type = "gauge", + .topic = "burrow/power/power", + }, + { + .metric = "pm_energy", + .help = "Total energy [kWh]", + .type = "gauge", + .topic = "burrow/power/energy", + }, + { + .metric = "pm_reactive_power", + .help = "Total reactive power [VAr]", + .type = "gauge", + .topic = "burrow/power/reactive/power", + }, + { + .metric = "pm_reactive_energy", + .help = "Total reactive energy [kVArh]", + .type = "gauge", + .topic = "burrow/power/reactive/energy", + }, }; static char *attr_values[ARRAY_SIZE(attr_table)]; +static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER; static void mqtt_publish(const char *topic, const char *fmt, ...) { @@ -48,12 +227,15 @@ static void mqtt_publish(const char *topic, const char *fmt, ...) va_end(args); } -static void mqtt_setup(void) +static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status) { - if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS) - die("Mosquitto: subscribe failed"); + if (!status) { + msg(L_DEBUG, "MQTT: Connection established, subscribing"); + if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS) + die("Mosquitto: subscribe failed"); - // mqtt_publish("burrow/loft/status", "ok"); + mqtt_publish("status/prometheus", "ok"); + } } static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message) @@ -74,69 +256,184 @@ static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, c for (uint i=0; i < ARRAY_SIZE(attr_table); i++) { if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) { + pthread_mutex_lock(&attr_mutex); if (attr_values[i]) { xfree(attr_values[i]); attr_values[i] = NULL; } if (val[0]) attr_values[i] = xstrdup(val); + pthread_mutex_unlock(&attr_mutex); } } } -static struct main_file mqtt_file; -static struct main_timer mqtt_timer; -#define MQTT_TIMER_FREQ 5000 +struct http { + struct mempool *mp; + int sk; + char *iobuf; + uint iobuf_pos, iobuf_max; + char *linebuf; + struct fbpool fbpool; +}; + +#define IO_TIMEOUT 60000 // in ms +#define IOBUF_SIZE 1024 +#define LINEBUF_SIZE 1024 -static void mqtt_recalc_main(void) +static void http_send(struct http *http) { - int fd = mosquitto_socket(mosq); - if (fd != mqtt_file.fd) { - msg(L_DEBUG, "MQTT: Changing socket fd to %d", fd); - if (fd < 0) { - file_del(&mqtt_file); - mqtt_file.fd = -1; - } else { - mqtt_file.fd = fd; - file_add(&mqtt_file); + byte *buf = fbpool_end(&http->fbpool); + uint len = mp_size(http->mp, buf); + + while (len) { + struct pollfd pfd = { .fd = http->sk, .events = POLLOUT, .revents = 0 }; + int res = poll(&pfd, 1, IO_TIMEOUT); + if (res < 0) + die("Poll failed: %m"); + if (!res) { + msg(L_ERROR, "HTTP write timed out"); + return; + } + + res = write(http->sk, buf, len); + if (res < 0) { + msg(L_ERROR, "HTTP write failed: %m"); + return; } + buf += res; + len -= res; } } -static void mqtt_main_timer(struct main_timer *tm) +static void http_error(struct http *http, const char *err) { - msg(L_DEBUG, "MQTT: Timer handler"); - mosquitto_loop_misc(mosq); - mqtt_recalc_main(); - timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ); + msg(L_INFO, "HTTP error: %s", err); + fbpool_start(&http->fbpool, http->mp, 0); + bprintf(&http->fbpool.fb, "HTTP/1.1 %s\r\n", err); + http_send(http); } -static int mqtt_main_read(struct main_file *mf UNUSED) +static int http_get_char(struct http *http) { + if (http->iobuf_pos >= http->iobuf_max) { + struct pollfd pfd = { .fd = http->sk, .events = POLLIN, .revents = 0 }; + int res = poll(&pfd, 1, IO_TIMEOUT); + if (res < 0) + die("Poll failed: %m"); + if (!res) { + msg(L_ERROR, "HTTP read timed out"); + return -1; + } + int len = read(http->sk, http->iobuf, IOBUF_SIZE); + if (len < 0) { + msg(L_ERROR, "HTTP read error: %m"); + return -1; + } + if (!len) { + msg(L_ERROR, "HTTP connection closed"); + return -1; + } + http->iobuf_pos = 0; + http->iobuf_max = len; + } + return http->iobuf[http->iobuf_pos++]; +} + +static bool http_get_line(struct http *http) { - msg(L_DEBUG, "MQTT: Read handler"); - mosquitto_loop_read(mosq, 1); - mqtt_recalc_main(); - return HOOK_IDLE; + uint i = 0; + for (;;) { + int c = http_get_char(http); + if (c < 0) + return false; + if (c == '\n') { + if (i > 0 && http->linebuf[i-1] == '\r') + i--; + http->linebuf[i] = 0; + return true; + } + if (i >= IOBUF_SIZE-1) { + http_error(http, "400 Line too long"); + return false; + } + http->linebuf[i++] = c; + } } -static int mqtt_main_write(struct main_file *mf UNUSED) +static void http_answer(struct fastbuf *fb) { - msg(L_DEBUG, "MQTT: Write handler"); - mosquitto_loop_write(mosq, 1); - mqtt_recalc_main(); - return HOOK_IDLE; + char val[256], *w[2]; + time_t now = time(NULL); + + for (uint i=0; i < ARRAY_SIZE(attr_table); i++) { + const struct attr *a = &attr_table[i]; + + pthread_mutex_lock(&attr_mutex); + snprintf(val, sizeof(val), "%s", attr_values[i] ? : ""); + pthread_mutex_unlock(&attr_mutex); + + if (a->help) + bprintf(fb, "# HELP %s %s\n", a->metric, a->help); + if (a->type) + bprintf(fb, "# TYPE %s %s\n", a->metric, a->type); + + if (!val[0]) + continue; + int fields = str_wordsplit(val, w, ARRAY_SIZE(w)); + if (fields < 1) + continue; + if (fields >= 2) { + time_t t = atoll(w[1]); + if (t < now - MEASUREMENT_TIMEOUT) + continue; + } + + if (a->topic) { + bprintf(fb, "%s %s", a->metric, val); +#if 0 + // Prometheus does not like our timestamps -- why? + if (fields >= 2) + bprintf(fb, " %s", w[1]); +#endif + bputc(fb, '\n'); + } + } } -static void mqtt_main_init(void) +static void http_connection(struct http *http) { - mqtt_file.fd = -1; - mqtt_file.read_handler = mqtt_main_read; - mqtt_file.write_handler = mqtt_main_write; + http->iobuf = mp_alloc(http->mp, IOBUF_SIZE); + http->linebuf = mp_alloc(http->mp, LINEBUF_SIZE); + fbpool_init(&http->fbpool); + + if (!http_get_line(http)) + return; + // msg(L_DEBUG, "Request line: <%s>", http->linebuf); + char *words[3]; + if (str_wordsplit(http->linebuf, words, 3) != 3) { + http_error(http, "400 Invalid request line"); + return; + } + if (strcmp(words[0], "GET")) { + http_error(http, "501 Method not implemented"); + return; + } - mqtt_timer.handler = mqtt_main_timer; - timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ); + for (;;) { + if (!http_get_line(http)) + return; + if (!http->linebuf[0]) + break; + // msg(L_DEBUG, "Header line: <%s>", http->linebuf); + } - mqtt_recalc_main(); + fbpool_start(&http->fbpool, http->mp, 0); + struct fastbuf *fb = &http->fbpool.fb; + bprintf(fb, "HTTP/1.1 200 OK\r\n"); + bprintf(fb, "Content-type: text/plain; version=0.0.4\r\n"); + bprintf(fb, "\r\n"); + http_answer(fb); + http_send(http); } static int use_daemon; @@ -144,7 +441,7 @@ static int use_debug; static struct opt_section options = { OPT_ITEMS { - OPT_HELP("A daemon for controlling the solid state relay module via MQTT"), + OPT_HELP("A daemon for transferring MQTT data to Prometheus"), OPT_HELP(""), OPT_HELP("Options:"), OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"), @@ -159,7 +456,6 @@ int main(int argc UNUSED, char **argv) { log_init(argv[0]); opt_parse(&options, argv+1); - main_init(); if (use_daemon) { struct log_stream *ls = log_new_syslog("daemon", LOG_PID); @@ -173,33 +469,47 @@ int main(int argc UNUSED, char **argv) if (!mosq) die("Mosquitto: initialization failed"); + mosquitto_connect_callback_set(mosq, mqtt_conn_callback); mosquitto_log_callback_set(mosq, mqtt_log_callback); mosquitto_message_callback_set(mosq, mqtt_msg_callback); - mqtt_main_init(); -#if 0 - // FIXME: Publish online/offline status - if (mosquitto_will_set(mosq, "burrow/loft/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS) + if (mosquitto_will_set(mosq, "status/prometheus", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS) die("Mosquitto: unable to set will"); -#endif - if (mosquitto_connect(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS) + if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS) die("Mosquitto: connect failed"); - mqtt_setup(); + if (mosquitto_loop_start(mosq)) + die("Mosquitto: cannot start service thread"); + + int listen_sk = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + if (listen_sk < 0) + die("Cannot create listening socket: %m"); + + int one = 1; + if (setsockopt(listen_sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) + die("Cannot set SO_REUSEADDR: %m"); + + struct sockaddr_in sin = { .sin_family = AF_INET, .sin_port = htons(1422), .sin_addr = INADDR_ANY }; + if (bind(listen_sk, (const struct sockaddr *) &sin, sizeof(sin)) < 0) + die("Cannot bind listening socket: %m"); + + if (listen(listen_sk, 64) < 0) + die("Cannot listen: %m"); - main_loop(); -#if 0 for (;;) { - int err = mosquitto_loop(mosq, 5000, 1); - if (err == MOSQ_ERR_NO_CONN) { - err = mosquitto_reconnect(mosq); - if (err == MOSQ_ERR_SUCCESS) - mqtt_setup(); - else - msg(L_ERROR, "Mosquitto: cannot reconnect, error %d", err); - } else if (err != MOSQ_ERR_SUCCESS) - msg(L_ERROR, "Mosquitto: loop returned error %d", err); + int sk = accept(listen_sk, NULL, NULL); + if (sk < 0) { + msg(L_ERROR, "HTTP accept failed: %m"); + continue; + } + msg(L_DEBUG, "HTTP accepted connection"); + struct mempool *mp = mp_new(4096); + struct http *http = mp_alloc_zero(mp, sizeof(*http)); + http->mp = mp; + http->sk = sk; + http_connection(http); + mp_delete(mp); + close(sk); } -#endif }