*/
#include <ucw/lib.h>
+#include <ucw/fastbuf.h>
#include <ucw/log.h>
-#include <ucw/mainloop.h>
+#include <ucw/mempool.h>
#include <ucw/opt.h>
+#include <ucw/string.h>
+#include <netinet/in.h>
+#include <pthread.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <sys/poll.h>
+#include <sys/socket.h>
#include <syslog.h>
#include <time.h>
+#include <unistd.h>
#include <mosquitto.h>
};
static const struct attr attr_table[] = {
- { "# HELP loft_temp Temperature in the loft", NULL },
+ { "# HELP loft_temp Temperature in the loft [degC]", NULL },
{ "# TYPE loft_temp gauge", NULL },
{ "loft_temp", "burrow/loft/temperature" },
{ "# HELP loft_fan Fan speed in the loft", NULL },
{ "# TYPE loft_fan gauge", NULL },
{ "loft_fan", "burrow/loft/fan" },
- { NULL, NULL },
};
static char *attr_values[ARRAY_SIZE(attr_table)];
+static pthread_mutex_t attr_mutex = PTHREAD_MUTEX_INITIALIZER;
static void mqtt_publish(const char *topic, const char *fmt, ...)
{
va_end(args);
}
-static void mqtt_setup(void)
+static void mqtt_conn_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int status)
{
- if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
- die("Mosquitto: subscribe failed");
+ if (!status) {
+ msg(L_DEBUG, "MQTT: Connection established, subscribing");
+ if (mosquitto_subscribe(mosq, NULL, "burrow/#", 1) != MOSQ_ERR_SUCCESS)
+ die("Mosquitto: subscribe failed");
- // mqtt_publish("burrow/loft/status", "ok");
+ // mqtt_publish("burrow/loft/status", "ok");
+ }
}
static void mqtt_log_callback(struct mosquitto *mosq UNUSED, void *obj UNUSED, int level, const char *message)
for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
if (attr_table[i].topic && !strcmp(attr_table[i].topic, m->topic)) {
+ pthread_mutex_lock(&attr_mutex);
if (attr_values[i]) {
xfree(attr_values[i]);
attr_values[i] = NULL;
}
if (val[0])
attr_values[i] = xstrdup(val);
+ pthread_mutex_unlock(&attr_mutex);
}
}
}
-static struct main_file mqtt_file;
-static struct main_timer mqtt_timer;
-#define MQTT_TIMER_FREQ 5000
+struct http {
+ struct mempool *mp;
+ int sk;
+ char *iobuf;
+ uint iobuf_pos, iobuf_max;
+ char *linebuf;
+ struct fbpool fbpool;
+};
+
+#define IO_TIMEOUT 60000 // in ms
+#define IOBUF_SIZE 1024
+#define LINEBUF_SIZE 1024
-static void mqtt_recalc_main(void)
+static void http_send(struct http *http)
{
- int fd = mosquitto_socket(mosq);
- if (fd != mqtt_file.fd) {
- msg(L_DEBUG, "MQTT: Changing socket fd to %d", fd);
- if (fd < 0) {
- file_del(&mqtt_file);
- mqtt_file.fd = -1;
- } else {
- mqtt_file.fd = fd;
- file_add(&mqtt_file);
+ byte *buf = fbpool_end(&http->fbpool);
+ uint len = mp_size(http->mp, buf);
+
+ while (len) {
+ struct pollfd pfd = { .fd = http->sk, .events = POLLOUT, .revents = 0 };
+ int res = poll(&pfd, 1, IO_TIMEOUT);
+ if (res < 0)
+ die("Poll failed: %m");
+ if (!res) {
+ msg(L_ERROR, "HTTP write timed out");
+ return;
}
+
+ res = write(http->sk, buf, len);
+ if (res < 0) {
+ msg(L_ERROR, "HTTP write failed: %m");
+ return;
+ }
+ buf += res;
+ len -= res;
}
}
-static void mqtt_main_timer(struct main_timer *tm)
+static void http_error(struct http *http, const char *err)
{
- msg(L_DEBUG, "MQTT: Timer handler");
- mosquitto_loop_misc(mosq);
- mqtt_recalc_main();
- timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ);
+ msg(L_INFO, "HTTP error: %s", err);
+ fbpool_start(&http->fbpool, http->mp, 0);
+ bprintf(&http->fbpool.fb, "HTTP/1.1 %s\r\n", err);
+ http_send(http);
}
-static int mqtt_main_read(struct main_file *mf UNUSED)
-{
- msg(L_DEBUG, "MQTT: Read handler");
- mosquitto_loop_read(mosq, 1);
- mqtt_recalc_main();
- return HOOK_IDLE;
+static int http_get_char(struct http *http) {
+ if (http->iobuf_pos >= http->iobuf_max) {
+ struct pollfd pfd = { .fd = http->sk, .events = POLLIN, .revents = 0 };
+ int res = poll(&pfd, 1, IO_TIMEOUT);
+ if (res < 0)
+ die("Poll failed: %m");
+ if (!res) {
+ msg(L_ERROR, "HTTP read timed out");
+ return -1;
+ }
+ int len = read(http->sk, http->iobuf, IOBUF_SIZE);
+ if (len < 0) {
+ msg(L_ERROR, "HTTP read error: %m");
+ return -1;
+ }
+ if (!len) {
+ msg(L_ERROR, "HTTP connection closed");
+ return -1;
+ }
+ http->iobuf_pos = 0;
+ http->iobuf_max = len;
+ }
+ return http->iobuf[http->iobuf_pos++];
}
-static int mqtt_main_write(struct main_file *mf UNUSED)
+static bool http_get_line(struct http *http)
{
- msg(L_DEBUG, "MQTT: Write handler");
- mosquitto_loop_write(mosq, 1);
- mqtt_recalc_main();
- return HOOK_IDLE;
+ uint i = 0;
+ for (;;) {
+ int c = http_get_char(http);
+ if (c < 0)
+ return false;
+ if (c == '\n') {
+ if (i > 0 && http->linebuf[i-1] == '\r')
+ i--;
+ http->linebuf[i] = 0;
+ return true;
+ }
+ if (i >= IOBUF_SIZE-1) {
+ http_error(http, "400 Line too long");
+ return false;
+ }
+ http->linebuf[i++] = c;
+ }
}
-static void mqtt_main_init(void)
+static void http_connection(struct http *http)
{
- mqtt_file.fd = -1;
- mqtt_file.read_handler = mqtt_main_read;
- mqtt_file.write_handler = mqtt_main_write;
+ http->iobuf = mp_alloc(http->mp, IOBUF_SIZE);
+ http->linebuf = mp_alloc(http->mp, LINEBUF_SIZE);
+ fbpool_init(&http->fbpool);
- mqtt_timer.handler = mqtt_main_timer;
- timer_add_rel(&mqtt_timer, MQTT_TIMER_FREQ);
+ if (!http_get_line(http))
+ return;
+ // msg(L_DEBUG, "Request line: <%s>", http->linebuf);
+ char *words[3];
+ if (str_wordsplit(http->linebuf, words, 3) != 3) {
+ http_error(http, "400 Invalid request line");
+ return;
+ }
+ if (strcmp(words[0], "GET")) {
+ http_error(http, "501 Method not implemented");
+ return;
+ }
- mqtt_recalc_main();
+ for (;;) {
+ if (!http_get_line(http))
+ return;
+ if (!http->linebuf[0])
+ break;
+ // msg(L_DEBUG, "Header line: <%s>", http->linebuf);
+ }
+
+ fbpool_start(&http->fbpool, http->mp, 0);
+ struct fastbuf *fb = &http->fbpool.fb;
+ bprintf(fb, "HTTP/1.1 200 OK\r\n");
+ bprintf(fb, "Content-type: text/plain; version=0.0.4\r\n");
+ bprintf(fb, "\r\n");
+ for (uint i=0; i < ARRAY_SIZE(attr_table); i++) {
+ if (attr_table[i].topic) {
+ pthread_mutex_lock(&attr_mutex);
+ bprintf(fb, "%s %s\n", attr_table[i].pm, attr_values[i] ? : "");
+ pthread_mutex_unlock(&attr_mutex);
+ } else {
+ bprintf(fb, "%s\n", attr_table[i].pm);
+ }
+ }
+ http_send(http);
}
static int use_daemon;
{
log_init(argv[0]);
opt_parse(&options, argv+1);
- main_init();
if (use_daemon) {
struct log_stream *ls = log_new_syslog("daemon", LOG_PID);
if (!mosq)
die("Mosquitto: initialization failed");
+ mosquitto_connect_callback_set(mosq, mqtt_conn_callback);
mosquitto_log_callback_set(mosq, mqtt_log_callback);
mosquitto_message_callback_set(mosq, mqtt_msg_callback);
- mqtt_main_init();
#if 0
// FIXME: Publish online/offline status
die("Mosquitto: unable to set will");
#endif
- if (mosquitto_connect(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
+ if (mosquitto_connect_async(mosq, "10.32.184.5", 1883, 60) != MOSQ_ERR_SUCCESS)
die("Mosquitto: connect failed");
- mqtt_setup();
+ if (mosquitto_loop_start(mosq))
+ die("Mosquitto: cannot start service thread");
+
+ int listen_sk = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+ if (listen_sk < 0)
+ die("Cannot create listening socket: %m");
+
+ int one = 1;
+ if (setsockopt(listen_sk, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0)
+ die("Cannot set SO_REUSEADDR: %m");
+
+ struct sockaddr_in sin = { .sin_family = AF_INET, .sin_port = htons(1422), .sin_addr = INADDR_ANY };
+ if (bind(listen_sk, (const struct sockaddr *) &sin, sizeof(sin)) < 0)
+ die("Cannot bind listening socket: %m");
+
+ if (listen(listen_sk, 64) < 0)
+ die("Cannot listen: %m");
- main_loop();
-#if 0
for (;;) {
- int err = mosquitto_loop(mosq, 5000, 1);
- if (err == MOSQ_ERR_NO_CONN) {
- err = mosquitto_reconnect(mosq);
- if (err == MOSQ_ERR_SUCCESS)
- mqtt_setup();
- else
- msg(L_ERROR, "Mosquitto: cannot reconnect, error %d", err);
- } else if (err != MOSQ_ERR_SUCCESS)
- msg(L_ERROR, "Mosquitto: loop returned error %d", err);
+ int sk = accept(listen_sk, NULL, NULL);
+ if (sk < 0) {
+ msg(L_ERROR, "HTTP accept failed: %m");
+ continue;
+ }
+ msg(L_DEBUG, "HTTP accepted connection");
+ struct mempool *mp = mp_new(4096);
+ struct http *http = mp_alloc_zero(mp, sizeof(*http));
+ http->mp = mp;
+ http->sk = sk;
+ http_connection(http);
+ mp_delete(mp);
+ close(sk);
}
-#endif
}