From d6eec6a8f706824da16859cfa27fb5d462803956 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Sat, 11 Aug 2018 22:46:06 +0200 Subject: [PATCH] Prometheus: Rudiments --- prometheus/Makefile | 10 ++ prometheus/burrow-prometheus.c | 205 +++++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+) create mode 100644 prometheus/Makefile create mode 100644 prometheus/burrow-prometheus.c diff --git a/prometheus/Makefile b/prometheus/Makefile new file mode 100644 index 0000000..6bd14fe --- /dev/null +++ b/prometheus/Makefile @@ -0,0 +1,10 @@ +PC=pkg-config +UCW_CFLAGS := $(shell $(PC) --cflags libucw) +UCW_LDFLAGS := $(shell $(PC) --libs libucw) + +CFLAGS=$(UCW_CFLAGS) -std=gnu99 +LDFLAGS=$(UCW_LDFLAGS) -lmosquitto + +all: burrow-prometheus + +burrow-prometheus: burrow-prometheus.c diff --git a/prometheus/burrow-prometheus.c b/prometheus/burrow-prometheus.c new file mode 100644 index 0000000..8f0f560 --- /dev/null +++ b/prometheus/burrow-prometheus.c @@ -0,0 +1,205 @@ +/* + * A gateway between MQTT and Prometheus + * + * (c) 2018 Martin Mares + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +static struct mosquitto *mosq; + +struct attr { + const char *pm; + const char *topic; +}; + +static const struct attr attr_table[] = { + { "# HELP loft_temp Temperature in the loft", NULL }, + { "# TYPE loft_temp gauge", NULL }, + { "loft_temp", "burrow/loft/temperature" }, + { "# HELP loft_fan Fan speed in the loft", NULL }, + { "# TYPE loft_fan gauge", NULL }, + { "loft_fan", "burrow/loft/fan" }, + { NULL, NULL }, +}; + +static char *attr_values[ARRAY_SIZE(attr_table)]; + +static void mqtt_publish(const char *topic, const char *fmt, ...) +{ + va_list args; + va_start(args, fmt); + char m[256]; + int l = vsnprintf(m, sizeof(m), fmt, args); + if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS) + msg(L_ERROR, "Mosquitto: publish failed"); + va_end(args); +} + +static void mqtt_setup(void) +{ + if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS) + die("Mosquitto: subscribe failed"); + + // mqtt_publish("burrow/loft/status", "ok"); +} + +static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message) +{ + // msg(L_INFO, "MQTT(%d): %s", level, message); +} + +static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m) +{ + char val[256]; + if (m->payloadlen >= sizeof(val) - 1) { + msg(L_ERROR, "Invalid value for topic %s", m->topic); + return; + } + memcpy(val, m->payload, m->payloadlen); + val[m->payloadlen] = 0; + msg(L_DEBUG, "MQTT < %s %s", m->topic, val); + + for (uint i=0; i < ARRAY_SIZE(attr_table); i++) { + if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) { + if (attr_values[i]) { + xfree(attr_values[i]); + attr_values[i] = NULL; + } + if (val[0]) + attr_values[i] = xstrdup(val); + } + } +} + +static struct main_file mqtt_file; +static struct main_timer mqtt_timer; +#define MQTT_TIMER_FREQ 5000 + +static void mqtt_recalc_main(void) +{ + int fd = mosquitto_socket(mosq); + if (fd != mqtt_file.fd) { + msg(L_DEBUG, "MQTT: Changing socket fd to %d", fd); + if (fd < 0) { + file_del(&mqtt_file); + mqtt_file.fd = -1; + } else { + mqtt_file.fd = fd; + file_add(&mqtt_file); + } + } +} + +static void mqtt_main_timer(struct main_timer *tm) +{ + msg(L_DEBUG, "MQTT: Timer handler"); + mosquitto_loop_misc(mosq); + mqtt_recalc_main(); + timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ); +} + +static int mqtt_main_read(struct main_file *mf UNUSED) +{ + msg(L_DEBUG, "MQTT: Read handler"); + mosquitto_loop_read(mosq, 1); + mqtt_recalc_main(); + return HOOK_IDLE; +} + +static int mqtt_main_write(struct main_file *mf UNUSED) +{ + msg(L_DEBUG, "MQTT: Write handler"); + mosquitto_loop_write(mosq, 1); + mqtt_recalc_main(); + return HOOK_IDLE; +} + +static void mqtt_main_init(void) +{ + mqtt_file.fd = -1; + mqtt_file.read_handler = mqtt_main_read; + mqtt_file.write_handler = mqtt_main_write; + + mqtt_timer.handler = mqtt_main_timer; + timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ); + + mqtt_recalc_main(); +} + +static int use_daemon; +static int use_debug; + +static struct opt_section options = { + OPT_ITEMS { + OPT_HELP("A daemon for controlling the solid state relay module via MQTT"), + OPT_HELP(""), + OPT_HELP("Options:"), + OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"), + OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"), + OPT_HELP_OPTION, + OPT_CONF_OPTIONS, + OPT_END + } +}; + +int main(int argc UNUSED, char **argv) +{ + log_init(argv[0]); + opt_parse(&options, argv+1); + main_init(); + + if (use_daemon) { + struct log_stream *ls = log_new_syslog("daemon", LOG_PID); + log_set_default_stream(ls); + } + if (!use_debug) + log_default_stream()->levels &= ~(1U << L_DEBUG); + + mosquitto_lib_init(); + mosq = mosquitto_new("prometheus", 1, NULL); + if (!mosq) + die("Mosquitto: initialization failed"); + + mosquitto_log_callback_set(mosq, mqtt_log_callback); + mosquitto_message_callback_set(mosq, mqtt_msg_callback); + mqtt_main_init(); + +#if 0 + // FIXME: Publish online/offline status + if (mosquitto_will_set(mosq, "burrow/loft/status", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS) + die("Mosquitto: unable to set will"); +#endif + + if (mosquitto_connect(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS) + die("Mosquitto: connect failed"); + + mqtt_setup(); + + main_loop(); +#if 0 + for (;;) { + int err = mosquitto_loop(mosq, 5000, 1); + if (err == MOSQ_ERR_NO_CONN) { + err = mosquitto_reconnect(mosq); + if (err == MOSQ_ERR_SUCCESS) + mqtt_setup(); + else + msg(L_ERROR, "Mosquitto: cannot reconnect, error %d", err); + } else if (err != MOSQ_ERR_SUCCESS) + msg(L_ERROR, "Mosquitto: loop returned error %d", err); + } +#endif +} -- 2.39.2