2 * Interaction with MQTT
4 * (c) 2022-2023 Martin Mares <mj@ucw.cz>
11 #include <ucw/mainloop.h>
16 #include <mosquitto.h>
20 static struct mosquitto *mosq;
23 static struct main_file mqtt_rx_pipe;
25 struct mqtt_pipe_msg {
30 void mqtt_publish(const char *topic, const char *fmt, ...)
35 int l = vsnprintf(m, sizeof(m), fmt, args);
36 DBG("MQTT > %s %s", topic, m);
37 if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
38 msg(L_ERROR, "Mosquitto: publish failed");
42 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
46 msg(L_DEBUG, "MQTT: Connection established, subscribing");
47 if (mosquitto_subscribe(mosq, NULL, "burrow/lights/catarium/#", 1) != MOSQ_ERR_SUCCESS ||
48 mosquitto_subscribe(mosq, NULL, "burrow/control/catarium-ir", 1) != MOSQ_ERR_SUCCESS)
49 die("Mosquitto: subscribe failed");
51 mqtt_publish("status/ursary", "ok");
55 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level UNUSED, const char *message UNUSED)
57 // msg(L_INFO, "MQTT(%d): %s", level, message);
60 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
62 struct mqtt_pipe_msg pm;
64 if (snprintf(pm.topic, sizeof(pm.topic), m->topic) >= sizeof(pm.topic))
66 msg(L_ERROR, "MQTT: Topic %s too long", m->topic);
70 if (m->payloadlen >= sizeof(pm.val) - 1)
72 msg(L_ERROR, "MQTT: Invalid value for topic %s", m->topic);
75 memcpy(pm.val, m->payload, m->payloadlen);
76 pm.val[m->payloadlen] = 0;
78 careful_write(mqtt_pipes[1], &pm, sizeof(pm));
81 static int mqtt_pipe_read_callback(struct main_file *fi)
83 struct mqtt_pipe_msg pm;
84 if (careful_read(fi->fd, &pm, sizeof(pm)) <= 0)
85 die("MQTT pipe read error: %m");
87 DBG("MQTT < %s %s", pm.topic, pm.val);
88 notify_mqtt(pm.topic, pm.val);
96 mosq = mosquitto_new("ursary", 1, NULL);
98 die("Mosquitto: initialization failed");
100 if (pipe(mqtt_pipes) < 0)
101 die("Cannot create pipes: %m");
103 mqtt_rx_pipe.fd = mqtt_pipes[0];
104 mqtt_rx_pipe.read_handler = mqtt_pipe_read_callback;
105 file_add(&mqtt_rx_pipe);
107 mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
108 mosquitto_log_callback_set(mosq, mqtt_log_callback);
109 mosquitto_message_callback_set(mosq, mqtt_msg_callback);
111 if (mosquitto_will_set(mosq, "status/ursary", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
112 die("Mosquitto: unable to set will");
114 if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS)
115 die("Mosquitto: unable to set TLS parameters");
117 if (mosquitto_connect_async(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS)
118 die("Mosquitto: connect failed");
120 if (mosquitto_loop_start(mosq))
121 die("Mosquitto: cannot start service thread");