2 * Interaction with MQTT
4 * (c) 2022 Martin Mares <mj@ucw.cz>
11 #include <ucw/mainloop.h>
16 #include <mosquitto.h>
20 static struct mosquitto *mosq;
23 static struct main_file mqtt_rx_pipe;
25 struct mqtt_pipe_msg {
30 void mqtt_publish(const char *topic, const char *fmt, ...)
35 int l = vsnprintf(m, sizeof(m), fmt, args);
36 DBG("MQTT > %s %s", topic, m);
37 if (mosquitto_publish(mosq, NULL, topic, l, m, 0, true) != MOSQ_ERR_SUCCESS)
38 msg(L_ERROR, "Mosquitto: publish failed");
42 static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
46 msg(L_DEBUG, "MQTT: Connection established, subscribing");
47 if (mosquitto_subscribe(mosq, NULL, "burrow/lights/catarium/#", 1) != MOSQ_ERR_SUCCESS)
48 die("Mosquitto: subscribe failed");
50 mqtt_publish("status/ursary", "ok");
54 static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level UNUSED, const char *message UNUSED)
56 // msg(L_INFO, "MQTT(%d): %s", level, message);
59 static void mqtt_msg_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, const struct mosquitto_message *m)
61 struct mqtt_pipe_msg pm;
63 if (snprintf(pm.topic, sizeof(pm.topic), m->topic) >= sizeof(pm.topic))
65 msg(L_ERROR, "MQTT: Topic %s too long", m->topic);
69 if (m->payloadlen >= sizeof(pm.val) - 1)
71 msg(L_ERROR, "MQTT: Invalid value for topic %s", m->topic);
74 memcpy(pm.val, m->payload, m->payloadlen);
75 pm.val[m->payloadlen] = 0;
77 careful_write(mqtt_pipes[1], &pm, sizeof(pm));
80 static int mqtt_pipe_read_callback(struct main_file *fi)
82 struct mqtt_pipe_msg pm;
83 if (careful_read(fi->fd, &pm, sizeof(pm)) <= 0)
84 die("MQTT pipe read error: %m");
86 DBG("MQTT < %s %s", pm.topic, pm.val);
87 notify_mqtt(pm.topic, pm.val);
95 mosq = mosquitto_new("ursary", 1, NULL);
97 die("Mosquitto: initialization failed");
99 if (pipe(mqtt_pipes) < 0)
100 die("Cannot create pipes: %m");
102 mqtt_rx_pipe.fd = mqtt_pipes[0];
103 mqtt_rx_pipe.read_handler = mqtt_pipe_read_callback;
104 file_add(&mqtt_rx_pipe);
106 mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
107 mosquitto_log_callback_set(mosq, mqtt_log_callback);
108 mosquitto_message_callback_set(mosq, mqtt_msg_callback);
110 if (mosquitto_will_set(mosq, "status/ursary", 4, "dead", 0, true) != MOSQ_ERR_SUCCESS)
111 die("Mosquitto: unable to set will");
113 if (mosquitto_tls_set(mosq, "/etc/burrow-mqtt/ca.crt", NULL, "/etc/burrow-mqtt/client.crt", "/etc/burrow-mqtt/client.key", NULL) != MOSQ_ERR_SUCCESS)
114 die("Mosquitto: unable to set TLS parameters");
116 if (mosquitto_connect_async(mosq, "burrow-mqtt", 8883, 60) != MOSQ_ERR_SUCCESS)
117 die("Mosquitto: connect failed");
119 if (mosquitto_loop_start(mosq))
120 die("Mosquitto: cannot start service thread");