From 2623d85c7a2cfbbfafc9b9b8a4f5e75ab21179f3 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Mon, 2 May 2011 01:23:42 +0200 Subject: [PATCH] Main: Preliminary implementation of record-based I/O --- ucw/Makefile | 2 +- ucw/doc/mainloop.txt | 1 + ucw/main-rec.c | 361 +++++++++++++++++++++++++++++++++++++++++++ ucw/mainloop.h | 55 +++++++ 4 files changed, 418 insertions(+), 1 deletion(-) create mode 100644 ucw/main-rec.c diff --git a/ucw/Makefile b/ucw/Makefile index 174e51b2..df8030ea 100644 --- a/ucw/Makefile +++ b/ucw/Makefile @@ -23,7 +23,7 @@ LIBUCW_MODS= \ prime primetable random timer randomkey \ bit-ffs bit-fls \ url \ - mainloop main-block \ + mainloop main-block main-rec \ exitstatus runcmd sighandler \ lizard lizard-safe adler32 \ md5 sha1 sha1-hmac \ diff --git a/ucw/doc/mainloop.txt b/ucw/doc/mainloop.txt index c747beca..c0e32f22 100644 --- a/ucw/doc/mainloop.txt +++ b/ucw/doc/mainloop.txt @@ -29,6 +29,7 @@ function. The handler is then registered with the main loop. - <> - <> - <> +- <> - <> - <> - <> diff --git a/ucw/main-rec.c b/ucw/main-rec.c new file mode 100644 index 00000000..647bdf53 --- /dev/null +++ b/ucw/main-rec.c @@ -0,0 +1,361 @@ +/* + * UCW Library -- Main Loop: Record I/O + * + * (c) 2011 Martin Mares + * + * This software may be freely distributed and used according to the terms + * of the GNU Lesser General Public License. + */ + +#undef LOCAL_DEBUG + +#include "ucw/lib.h" +#include "ucw/mainloop.h" + +#include +#include +#include +#include +#include + +struct rio_buffer { + cnode n; + uns full; + uns written; + byte buf[]; +}; + +static void +rec_io_timer_expired(struct main_timer *tm) +{ + struct main_rec_io *rio = tm->data; + timer_del(&rio->timer); + rio->notify_handler(rio, RIO_ERR_TIMEOUT); +} + +void +rec_io_add(struct main_rec_io *rio, int fd) +{ + rio->file.fd = fd; + file_add(&rio->file); + rio->timer.handler = rec_io_timer_expired; + rio->timer.data = rio; + clist_init(&rio->idle_write_buffers); + clist_init(&rio->busy_write_buffers); +} + +void +rec_io_del(struct main_rec_io *rio) +{ + timer_del(&rio->timer); + file_del(&rio->file); + + if (rio->read_buf) + { + DBG("RIO: Freeing read buffer"); + xfree(rio->read_buf); + rio->read_buf = NULL; + } + + struct rio_buffer *b; + while ((b = clist_remove_head(&rio->idle_write_buffers)) || (b = clist_remove_head(&rio->busy_write_buffers))) + { + DBG("RIO: Freeing write buffer"); + xfree(b); + } +} + +static int +rec_io_read_handler(struct main_file *fi) +{ + struct main_rec_io *rio = (struct main_rec_io *) fi; + + if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max) + { + rec_io_stop_read(rio); + rio->notify_handler(rio, RIO_ERR_READ_RECORD_TOO_LARGE); + return HOOK_IDLE; + } + +restart: ; + uns rec_start_pos = rio->read_rec_start - rio->read_buf; + uns rec_end_pos = rec_start_pos + rio->read_avail; + uns free_space = rio->read_buf_size - rec_end_pos; + DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u", + rec_start_pos, rio->read_avail, rio->read_prev_avail, + free_space, rio->read_buf_size); + // FIXME: Constants? + if (free_space <= rio->read_buf_size/8) + { + if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2) + { + // Moving the partial record to the start of the buffer + DBG("RIO READ: Moving partial record to start"); + memmove(rio->read_buf, rio->read_rec_start, rio->read_avail); + rio->read_rec_start = rio->read_buf; + } + else + { + DBG("RIO READ: Resizing buffer"); + rio->read_buf_size *= 2; + rio->read_buf = xrealloc(rio->read_buf, rio->read_buf_size); + rio->read_rec_start = rio->read_buf + rec_start_pos; + } + goto restart; + } + + int l = read(fi->fd, rio->read_buf + rec_end_pos, free_space); + DBG("RIO READ: Read %d bytes", l); + if (l < 0) + { + if (errno != EINTR && errno != EAGAIN) + { + DBG("RIO READ: Signalling error"); + rec_io_stop_read(rio); + rio->notify_handler(rio, RIO_ERR_READ); + } + return HOOK_IDLE; + } + if (!l) + { + DBG("RIO READ: Signalling EOF"); + rec_io_stop_read(rio); + rio->notify_handler(rio, RIO_ERR_READ_EOF); + return HOOK_IDLE; + } + rio->read_prev_avail = rio->read_avail; + rio->read_avail += l; + DBG("RIO READ: Available: %u bytes", rio->read_avail); + + uns got; + while (rio->read_running && (got = rio->read_handler(rio))) + { + DBG("RIO READ: Ate %u bytes", got); + rio->read_rec_start += got; + rio->read_avail -= got; + rio->read_prev_avail = 0; + if (!rio->read_avail) + { + DBG("RIO READ: Resetting buffer"); + rio->read_rec_start = rio->read_buf; + break; + } + } + DBG("RIO READ: Want more"); + return (rio->read_running ? HOOK_RETRY : HOOK_IDLE); +} + +void +rec_io_start_read(struct main_rec_io *rio) +{ + ASSERT(clist_is_linked(&rio->file.n)); + if (rio->read_running) + return; + if (!rio->read_buf) + { + if (!rio->read_buf_size) + rio->read_buf_size = 256; + rio->read_buf = xmalloc(rio->read_buf_size); + DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size); + rio->read_rec_start = rio->read_buf; + } + rio->file.read_handler = rec_io_read_handler; + file_chg(&rio->file); + rio->read_running = 1; + DBG("RIO: Reading started"); +} + +void +rec_io_stop_read(struct main_rec_io *rio) +{ + ASSERT(clist_is_linked(&rio->file.n)); + if (!rio->read_running) + return; + rio->file.read_handler = NULL; + file_chg(&rio->file); + rio->read_running = 0; + DBG("RIO: Reading stopped"); +} + +static void +rec_io_stop_write(struct main_rec_io *rio) +{ + DBG("RIO WRITE: Stopping write"); + ASSERT(!rio->write_watermark); + rio->file.write_handler = NULL; + file_chg(&rio->file); +} + +static int +rec_io_write_handler(struct main_file *fi) +{ + struct main_rec_io *rio = (struct main_rec_io *) fi; + struct rio_buffer *b = clist_head(&rio->busy_write_buffers); + if (!b) + { + rec_io_stop_write(rio); + return HOOK_IDLE; + } + + int l = write(fi->fd, b->buf + b->written, b->full - b->written); + DBG("RIO WRITE: Written %d bytes", l); + if (l < 0) + { + if (errno != EINTR && errno != EAGAIN) + { + rec_io_stop_write(rio); + rio->notify_handler(rio, RIO_ERR_WRITE); + } + return HOOK_IDLE; + } + b->written += l; + if (b->written == b->full) + { + DBG("RIO WRITE: Written full buffer"); + clist_remove(&b->n); + clist_add_tail(&rio->idle_write_buffers, &b->n); + } + + rio->write_watermark -= l; + int ret = HOOK_RETRY; + if (!rio->write_watermark) + { + ret = HOOK_IDLE; + rec_io_stop_write(rio); + } + + // Call the hook, but carefully, because it can delete the RIO structure + if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE) + ret = HOOK_IDLE; + return ret; +} + +static struct rio_buffer * +rec_io_get_buffer(struct main_rec_io *rio) +{ + struct rio_buffer *b = clist_remove_tail(&rio->idle_write_buffers); + if (b) + DBG("RIO WRITE: Recycled old buffer"); + else + { + if (!rio->write_buf_size) + rio->write_buf_size = 1024; + b = xmalloc(sizeof(struct rio_buffer) + rio->write_buf_size); + DBG("RIO WRITE: Allocated new buffer"); + } + b->full = b->written = 0; + return b; +} + +void +rec_io_write(struct main_rec_io *rio, void *data, uns len) +{ + byte *bdata = data; + ASSERT(clist_is_linked(&rio->file.n)); + if (!len) + return; + + while (len) + { + struct rio_buffer *b = clist_tail(&rio->busy_write_buffers); + if (!b || b->full >= rio->write_buf_size) + { + b = rec_io_get_buffer(rio); + clist_add_tail(&rio->busy_write_buffers, &b->n); + } + uns l = MIN(len, rio->write_buf_size - b->full); + memcpy(b->buf + b->full, bdata, l); + b->full += l; + bdata += l; + len -= l; + rio->write_watermark += l; + DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark); + } + + if (!rio->file.write_handler) + { + DBG("RIO WRITE: Starting write"); + rio->file.write_handler = rec_io_write_handler; + file_chg(&rio->file); + } +} + +void +rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta) +{ + DBG("RIO: Setting timeout %u", (uns) expires_delta); + if (!expires_delta) + timer_del(&rio->timer); + else + timer_add_rel(&rio->timer, expires_delta); +} + +uns +rec_io_parse_line(struct main_rec_io *rio) +{ + for (uns i = rio->read_prev_avail; i < rio->read_avail; i++) + if (rio->read_rec_start[i] == '\n') + return i+1; + return 0; +} + +#ifdef TEST + +static uns rhand(struct main_rec_io *rio) +{ + uns r = rec_io_parse_line(rio); + if (r) + { + rio->read_rec_start[r-1] = 0; + printf("Read <%s>\n", rio->read_rec_start); + rec_io_set_timeout(rio, 10000); + rio->read_rec_start[r-1] = '\n'; + rec_io_write(rio, rio->read_rec_start, r); + } + return r; +} + +static int ehand(struct main_rec_io *rio, int cause) +{ + if (cause < 0) + { + msg(L_ERROR, "Error %d", cause); + rec_io_del(rio); + main_shut_down(); + return HOOK_IDLE; + } + else + { + msg(L_INFO, "Event %d", cause); + return HOOK_RETRY; + } +} + +int +main(void) +{ + log_init(NULL); + main_init(); + + struct main_rec_io rio = {}; + rio.read_buf_size = 4; + rio.read_handler = rhand; + rio.notify_handler = ehand; + // rio.read_rec_max = 40; + rio.write_buf_size = 4; + rec_io_add(&rio, 0); + rec_io_start_read(&rio); + rec_io_set_timeout(&rio, 10000); + + main_debug(); + + main_loop(); + msg(L_INFO, "Finished."); + + if (clist_is_linked(&rio.file.n)) + rec_io_del(&rio); + main_cleanup(); + return 0; +} + +#endif diff --git a/ucw/mainloop.h b/ucw/mainloop.h index ae1c1e4a..e7b2deb5 100644 --- a/ucw/mainloop.h +++ b/ucw/mainloop.h @@ -371,6 +371,61 @@ void block_io_write(struct main_block_io *bio, void *buf, uns len); **/ void block_io_set_timeout(struct main_block_io *bio, timestamp_t expires_delta); +/*** + * [[recordio]] + * Asynchronous record I/O + * ----------------------- + * + * FIXME + ***/ + +/** The record I/O structure. **/ +struct main_rec_io { + struct main_file file; + byte *read_buf; /* Reading half */ + byte *read_rec_start; /* [*] Start of current record */ + uns read_avail; /* [*] How much data is available */ + uns read_prev_avail; /* [*] How much data was available in previous read_done */ + uns read_buf_size; /* [*] Buffer size allocated (can set before rec_io_add()) */ + uns read_running; /* Reading requested */ + uns read_rec_max; /* [*] Maximum record size (0=unlimited) */ + clist busy_write_buffers; + clist idle_write_buffers; + uns write_buf_size; + uns write_watermark; + uns write_throttle; + uns (*read_handler)(struct main_rec_io *rio); /* [*] FIXME; describe EOF */ + // FIXME: returns... + int (*notify_handler)(struct main_rec_io *rio, int status); /* [*] Handler to call on errors */ + struct main_timer timer; + void *data; /* [*] Data for use by the handlers */ +}; + +/** Activate a record I/O structure. **/ +void rec_io_add(struct main_rec_io *rio, int fd); + +/** Deactivate a record I/O structure. **/ +void rec_io_del(struct main_rec_io *rio); + +void rec_io_start_read(struct main_rec_io *rio); +void rec_io_stop_read(struct main_rec_io *rio); +void rec_io_set_timeout(struct main_rec_io *bio, timestamp_t expires_delta); + +uns rec_io_parse_line(struct main_rec_io *rio); + +void rec_io_write(struct main_rec_io *rio, void *data, uns len); + +// All errors except timeout are fatal +enum rec_io_notify_status { + RIO_ERR_READ = -1, + RIO_ERR_WRITE = -2, + RIO_ERR_TIMEOUT = -3, + RIO_ERR_READ_RECORD_TOO_LARGE = -4, + RIO_ERR_READ_EOF = -5, + RIO_EVENT_ALL_WRITTEN = 1, + RIO_EVENT_PART_WRITTEN = 2, +}; + /*** * [[hooks]] * Loop hooks -- 2.39.2