]> mj.ucw.cz Git - home-hw.git/blob - influx/burrow-influx.c
95213003b4ca213ec04c15c8a83054d9bfa6588d
[home-hw.git] / influx / burrow-influx.c
1 /*
2  *      A gateway between MQTT and InfluxDB
3  *
4  *      (c) 2018--2019 Martin Mares <mj@ucw.cz>
5  */
6
7 #include <ucw/lib.h>
8 #include <ucw/fastbuf.h>
9 #include <ucw/log.h>
10 #include <ucw/opt.h>
11 #include <ucw/string.h>
12
13 #include <pthread.h>
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <syslog.h>
18 #include <time.h>
19 #include <unistd.h>
20
21 #include <curl/curl.h>
22 #include <mosquitto.h>
23
24 #define MEASUREMENT_TIMEOUT 120
25
26 #define INFLUX_URL "http://localhost:8086/write?db=burrow&precision=s"
27 #define INFLUX_TIMEOUT 30
28 #define INFLUX_VERBOSE 0
29 #define INFLUX_INTERVAL 60
30
31 static struct mosquitto *mosq;
32
33 struct attr {
34         const char *metric;
35         const char *value_name;
36         const char *topic;
37 };
38
39 static const struct attr attr_table[] = {
40         {
41                 .metric = "temp,where=loft",
42                 .value_name = "t",
43                 .topic = "burrow/temp/loft",
44         },
45         {
46                 .metric = "temp,where=ursarium",
47                 .value_name = "t",
48                 .topic = "burrow/temp/ursarium"
49         },
50         {
51                 .metric = "temp,where=catarium",
52                 .value_name = "t",
53                 .topic = "burrow/temp/catarium"
54         },
55         {
56                 .metric = "temp,where=garage",
57                 .value_name = "t",
58                 .topic = "burrow/temp/garage"
59         },
60         {
61                 .metric = "temp,where=kitchen",
62                 .value_name = "t",
63                 .topic = "burrow/temp/kitchen"
64         },
65         {
66                 .metric = "rh,where=ursarium",
67                 .value_name = "rh",
68                 .topic = "burrow/temp/ursarium-rh"
69         },
70         {
71                 .metric = "temp,where=catarium_clock",
72                 .value_name = "t",
73                 .topic = "burrow/temp/clock"
74         },
75         {
76                 .metric = "pressure,where=catarium_clock",
77                 .value_name = "pa",
78                 .topic = "burrow/pressure/clock"
79         },
80         {
81                 .metric = "air,where=inside_intake",
82                 .value_name = "t",
83                 .topic = "burrow/air/inside-intake",
84         },
85         {
86                 .metric = "air,where=inside_exhaust",
87                 .value_name = "t",
88                 .topic = "burrow/air/inside-exhaust",
89         },
90         {
91                 .metric = "air,where=outside_intake",
92                 .value_name = "t",
93                 .topic = "burrow/air/outside-intake",
94         },
95         {
96                 .metric = "air,where=outside_exhaust",
97                 .value_name = "t",
98                 .topic = "burrow/air/outside-exhaust",
99         },
100         {
101                 .metric = "air,where=mixed",
102                 .value_name = "t",
103                 .topic = "burrow/air/mixed",
104         },
105         {
106                 .metric = "air,where=inside_intake_avg",
107                 .value_name = "t",
108                 .topic = "burrow/avg/air/inside-intake",
109         },
110         {
111                 .metric = "air,where=outside_intake_avg",
112                 .value_name = "t",
113                 .topic = "burrow/avg/air/outside-intake",
114         },
115         {
116                 .metric = "air_bypass",
117                 .value_name = "on",
118                 .topic = "burrow/air/bypass"
119         },
120         {
121                 .metric = "air_fan_pwm",
122                 .value_name = "pwm",
123                 .topic = "burrow/air/exchanger-fan"
124         },
125 #if 0
126         {
127                 .metric = "loft_fan",
128                 .topic = "burrow/loft/fan"
129         },
130 #endif
131         {
132                 .metric = "water_circ",
133                 .value_name = "on",
134                 .topic = "burrow/loft/circulation"
135         },
136         {
137                 .metric = "pm_voltage,phase=L1N",
138                 .value_name = "V",
139                 .topic = "burrow/power/voltage/l1n",
140         },
141         {
142                 .metric = "pm_voltage,phase=L2N",
143                 .value_name = "V",
144                 .topic = "burrow/power/voltage/l2n",
145         },
146         {
147                 .metric = "pm_voltage,phase=L3N",
148                 .value_name = "V",
149                 .topic = "burrow/power/voltage/l3n",
150         },
151         {
152                 .metric = "pm_current,phase=L1",
153                 .value_name = "A",
154                 .topic = "burrow/power/current/l1",
155         },
156         {
157                 .metric = "pm_current,phase=L2",
158                 .value_name = "A",
159                 .topic = "burrow/power/current/l2",
160         },
161         {
162                 .metric = "pm_current,phase=L3",
163                 .value_name = "A",
164                 .topic = "burrow/power/current/l3",
165         },
166         {
167                 .metric = "pm_power",
168                 .value_name = "W",
169                 .topic = "burrow/power/power",
170         },
171         {
172                 .metric = "pm_energy",
173                 .value_name = "kWh",
174                 .topic = "burrow/power/energy",
175         },
176         {
177                 .metric = "pm_reactive_power",
178                 .value_name = "VAr",
179                 .topic = "burrow/power/reactive/power",
180         },
181         {
182                 .metric = "pm_reactive_energy",
183                 .value_name = "kVArh",
184                 .topic = "burrow/power/reactive/energy",
185         },
186 };
187
188 /*** MQTT ***/
189
190 static char *attr_values[ARRAY_SIZE(attr_table)];
191 static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
192
193 static void mqtt_publish(const char *topic, const char *fmt, ...)
194 {
195         va_list args;
196         va_start(args, fmt);
197         char m[256];
198         int l = vsnprintf(m, sizeof(m), fmt, args);
199         if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
200                 msg(L_ERROR, "Mosquitto: publish failed");
201         va_end(args);
202 }
203
204 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
205 {
206         if (!status) {
207                 msg(L_DEBUG, "MQTT: Connection established, subscribing");
208                 if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
209                         die("Mosquitto: subscribe failed");
210
211                 mqtt_publish("status/influx", "ok");
212         }
213 }
214
215 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
216 {
217         // msg(L_INFO, "MQTT(%d): %s", level, message);
218 }
219
220 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
221 {
222         char val[256];
223         if (m->payloadlen >= sizeof(val) - 1) {
224                 msg(L_ERROR, "Invalid value for topic %s", m->topic);
225                 return;
226         }
227         memcpy(val, m->payload, m->payloadlen);
228         val[m->payloadlen] = 0;
229         msg(L_DEBUG, "MQTT < %s %s", m->topic, val);
230
231         for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
232                 if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
233                         pthread_mutex_lock(&attr_mutex);
234                         if (attr_values[i]) {
235                                 xfree(attr_values[i]);
236                                 attr_values[i] = NULL;
237                         }
238                         if (val[0])
239                                 attr_values[i] = xstrdup(val);
240                         pthread_mutex_unlock(&attr_mutex);
241                 }
242         }
243 }
244
245 /*** InfluxDB ***/
246
247 static CURL *curl;
248
249 static struct fastbuf *influx_fb;
250 static struct fastbuf *influx_err_fb;
251
252 static size_t influx_write_callback(char *ptr, size_t size, size_t nmemb, void *userdata UNUSED)
253 {
254         ASSERT(size == 1);
255         for (size_t i=0; i<nmemb; i++) {
256                 int c = *(byte *)ptr;
257                 ptr++;
258                 if (c == '\r')
259                         continue;
260                 if (c == '\n')
261                         c = ' ';
262                 else if (c < 0x20 || c > 0x7e)
263                         c = '?';
264                 bputc(influx_err_fb, c);
265         }
266         return nmemb;
267 }
268
269 static void influx_init(void)
270 {
271         curl = curl_easy_init();
272         if (!curl)
273                 die("libcurl init failed");
274
275         curl_easy_setopt(curl, CURLOPT_URL, INFLUX_URL);
276         curl_easy_setopt(curl, CURLOPT_VERBOSE, (long) INFLUX_VERBOSE);
277         curl_easy_setopt(curl, CURLOPT_TIMEOUT, (long) INFLUX_TIMEOUT);
278         curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, influx_write_callback);
279
280         influx_fb = fbgrow_create(4096);
281         influx_err_fb = fbgrow_create(256);
282 }
283
284 static struct fastbuf *influx_write_start(void)
285 {
286         fbgrow_reset(influx_fb);
287         return influx_fb;
288 }
289
290 static void influx_write_flush(void)
291 {
292         bputc(influx_fb, 0);
293         byte *buf;
294         fbgrow_get_buf(influx_fb, &buf);
295         curl_easy_setopt(curl, CURLOPT_POSTFIELDS, buf);
296
297         msg(L_DEBUG, "InfluxDB: Sending data (%u bytes)", strlen(buf));
298         if (INFLUX_VERBOSE)
299                 fprintf(stderr, "%s", buf);
300
301         fbgrow_reset(influx_err_fb);
302
303         CURLcode code = curl_easy_perform(curl);
304
305         if (code != CURLE_OK) {
306                 msg(L_ERROR, "Write to InfluxDB failed: %s", curl_easy_strerror(code));
307                 return;
308         }
309
310         long rc = 0;
311         curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &rc);
312         if (rc != 200 && rc != 204) {
313                 bputc(influx_err_fb, 0);
314                 byte *err;
315                 fbgrow_get_buf(influx_err_fb, &err);
316                 msg(L_DEBUG, "InfluxDB HTTP error %ld: %s [...]", rc, err);
317         }
318 }
319
320 /*** Main ***/
321
322 static int use_daemon;
323 static int use_debug;
324
325 static struct opt_section options = {
326         OPT_ITEMS {
327                 OPT_HELP("A daemon for transferring MQTT data to InfluxDB"),
328                 OPT_HELP(""),
329                 OPT_HELP("Options:"),
330                 OPT_BOOL('d', "debug", use_debug, 0, "\tLog debugging messages"),
331                 OPT_BOOL(0, "daemon", use_daemon, 0, "\tDaemonize"),
332                 OPT_HELP_OPTION,
333                 OPT_CONF_OPTIONS,
334                 OPT_END
335         }
336 };
337
338 int main(int argc UNUSED, char **argv)
339 {
340         log_init(argv[0]);
341         opt_parse(&options, argv+1);
342
343         if (use_daemon) {
344                 struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
345                 log_set_default_stream(ls);
346         }
347         if (!use_debug)
348                 log_default_stream()->levels &= ~(1U << L_DEBUG);
349
350         mosquitto_lib_init();
351         mosq = mosquitto_new("influx", 1, NULL);
352         if (!mosq)
353                 die("Mosquitto: initialization failed");
354
355         mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
356         mosquitto_log_callback_set(mosq, mqtt_log_callback);
357         mosquitto_message_callback_set(mosq, mqtt_msg_callback);
358
359         if (mosquitto_will_set(mosq, "status/influx", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
360                 die("Mosquitto: unable to set will");
361
362         if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
363                 die("Mosquitto: connect failed");
364
365         if (mosquitto_loop_start(mosq))
366                 die("Mosquitto: cannot start service thread");
367
368         influx_init();
369
370         for (;;) {
371                 struct fastbuf *f = influx_write_start();
372                 char val[256], *w[2];
373                 time_t now = time(NULL);
374
375                 for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
376                         const struct attr *a = &attr_table[i];
377
378                         pthread_mutex_lock(&attr_mutex);
379                         snprintf(val, sizeof(val), "%s", attr_values[i] ? : "");
380                         pthread_mutex_unlock(&attr_mutex);
381
382                         if (!val[0])
383                                 continue;
384                         int fields = str_wordsplit(val, w, ARRAY_SIZE(w));
385                         if (fields < 1)
386                                 continue;
387                         if (fields >= 2) {
388                                 time_t t = atoll(w[1]);
389                                 if (t < now - MEASUREMENT_TIMEOUT)
390                                         continue;
391                         }
392
393                         bprintf(f, "%s %s=%s\n", a->metric, a->value_name, val);
394                 }
395                 influx_write_flush();
396                 sleep(INFLUX_INTERVAL);
397         }
398 }