]> mj.ucw.cz Git - libucw.git/commitdiff
Main: Preliminary implementation of record-based I/O
authorMartin Mares <mj@ucw.cz>
Sun, 1 May 2011 23:23:42 +0000 (01:23 +0200)
committerMartin Mares <mj@ucw.cz>
Sun, 1 May 2011 23:23:42 +0000 (01:23 +0200)
ucw/Makefile
ucw/doc/mainloop.txt
ucw/main-rec.c [new file with mode: 0644]
ucw/mainloop.h

index 174e51b27d1bc44d947bd550986f77d8031fb2d7..df8030ea452976673548963769f853a45da8f9ce 100644 (file)
@@ -23,7 +23,7 @@ LIBUCW_MODS= \
        prime primetable random timer randomkey \
        bit-ffs bit-fls \
        url \
-       mainloop main-block \
+       mainloop main-block main-rec \
        exitstatus runcmd sighandler \
        lizard lizard-safe adler32 \
        md5 sha1 sha1-hmac \
index c747beca615e5d468eec9204c32aaca459c63312..c0e32f223608f00f4017a8703a134eaec49f17d6 100644 (file)
@@ -29,6 +29,7 @@ function. The handler is then registered with the main loop.
 - <<time,Time and timers>>
 - <<file,Activity on file descriptors>>
 - <<blockio,Asynchronous block I/O>>
+- <<recordio,Asynchronous record I/O>>
 - <<hooks,Loop hooks>>
 - <<process,Child processes>>
 - <<signal,Synchronous delivery of signals>>
diff --git a/ucw/main-rec.c b/ucw/main-rec.c
new file mode 100644 (file)
index 0000000..647bdf5
--- /dev/null
@@ -0,0 +1,361 @@
+/*
+ *     UCW Library -- Main Loop: Record I/O
+ *
+ *     (c) 2011 Martin Mares <mj@ucw.cz>
+ *
+ *     This software may be freely distributed and used according to the terms
+ *     of the GNU Lesser General Public License.
+ */
+
+#undef LOCAL_DEBUG
+
+#include "ucw/lib.h"
+#include "ucw/mainloop.h"
+
+#include <stdio.h>
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+
+struct rio_buffer {
+  cnode n;
+  uns full;
+  uns written;
+  byte buf[];
+};
+
+static void
+rec_io_timer_expired(struct main_timer *tm)
+{
+  struct main_rec_io *rio = tm->data;
+  timer_del(&rio->timer);
+  rio->notify_handler(rio, RIO_ERR_TIMEOUT);
+}
+
+void
+rec_io_add(struct main_rec_io *rio, int fd)
+{
+  rio->file.fd = fd;
+  file_add(&rio->file);
+  rio->timer.handler = rec_io_timer_expired;
+  rio->timer.data = rio;
+  clist_init(&rio->idle_write_buffers);
+  clist_init(&rio->busy_write_buffers);
+}
+
+void
+rec_io_del(struct main_rec_io *rio)
+{
+  timer_del(&rio->timer);
+  file_del(&rio->file);
+
+  if (rio->read_buf)
+    {
+      DBG("RIO: Freeing read buffer");
+      xfree(rio->read_buf);
+      rio->read_buf = NULL;
+    }
+
+  struct rio_buffer *b;
+  while ((b = clist_remove_head(&rio->idle_write_buffers)) || (b = clist_remove_head(&rio->busy_write_buffers)))
+    {
+      DBG("RIO: Freeing write buffer");
+      xfree(b);
+    }
+}
+
+static int
+rec_io_read_handler(struct main_file *fi)
+{
+  struct main_rec_io *rio = (struct main_rec_io *) fi;
+
+  if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
+    {
+      rec_io_stop_read(rio);
+      rio->notify_handler(rio, RIO_ERR_READ_RECORD_TOO_LARGE);
+      return HOOK_IDLE;
+    }
+
+restart: ;
+  uns rec_start_pos = rio->read_rec_start - rio->read_buf;
+  uns rec_end_pos = rec_start_pos + rio->read_avail;
+  uns free_space = rio->read_buf_size - rec_end_pos;
+  DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
+    rec_start_pos, rio->read_avail, rio->read_prev_avail,
+    free_space, rio->read_buf_size);
+  // FIXME: Constants?
+  if (free_space <= rio->read_buf_size/8)
+    {
+      if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
+       {
+         // Moving the partial record to the start of the buffer
+         DBG("RIO READ: Moving partial record to start");
+         memmove(rio->read_buf, rio->read_rec_start, rio->read_avail);
+         rio->read_rec_start = rio->read_buf;
+       }
+      else
+       {
+         DBG("RIO READ: Resizing buffer");
+         rio->read_buf_size *= 2;
+         rio->read_buf = xrealloc(rio->read_buf, rio->read_buf_size);
+         rio->read_rec_start = rio->read_buf + rec_start_pos;
+       }
+      goto restart;
+    }
+
+  int l = read(fi->fd, rio->read_buf + rec_end_pos, free_space);
+  DBG("RIO READ: Read %d bytes", l);
+  if (l < 0)
+    {
+      if (errno != EINTR && errno != EAGAIN)
+       {
+         DBG("RIO READ: Signalling error");
+         rec_io_stop_read(rio);
+         rio->notify_handler(rio, RIO_ERR_READ);
+       }
+      return HOOK_IDLE;
+    }
+  if (!l)
+    {
+      DBG("RIO READ: Signalling EOF");
+      rec_io_stop_read(rio);
+      rio->notify_handler(rio, RIO_ERR_READ_EOF);
+      return HOOK_IDLE;
+    }
+  rio->read_prev_avail = rio->read_avail;
+  rio->read_avail += l;
+  DBG("RIO READ: Available: %u bytes", rio->read_avail);
+
+  uns got;
+  while (rio->read_running && (got = rio->read_handler(rio)))
+    {
+      DBG("RIO READ: Ate %u bytes", got);
+      rio->read_rec_start += got;
+      rio->read_avail -= got;
+      rio->read_prev_avail = 0;
+      if (!rio->read_avail)
+       {
+         DBG("RIO READ: Resetting buffer");
+         rio->read_rec_start = rio->read_buf;
+         break;
+       }
+    }
+  DBG("RIO READ: Want more");
+  return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
+}
+
+void
+rec_io_start_read(struct main_rec_io *rio)
+{
+  ASSERT(clist_is_linked(&rio->file.n));
+  if (rio->read_running)
+    return;
+  if (!rio->read_buf)
+    {
+      if (!rio->read_buf_size)
+       rio->read_buf_size = 256;
+      rio->read_buf = xmalloc(rio->read_buf_size);
+      DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
+      rio->read_rec_start = rio->read_buf;
+    }
+  rio->file.read_handler = rec_io_read_handler;
+  file_chg(&rio->file);
+  rio->read_running = 1;
+  DBG("RIO: Reading started");
+}
+
+void
+rec_io_stop_read(struct main_rec_io *rio)
+{
+  ASSERT(clist_is_linked(&rio->file.n));
+  if (!rio->read_running)
+    return;
+  rio->file.read_handler = NULL;
+  file_chg(&rio->file);
+  rio->read_running = 0;
+  DBG("RIO: Reading stopped");
+}
+
+static void
+rec_io_stop_write(struct main_rec_io *rio)
+{
+  DBG("RIO WRITE: Stopping write");
+  ASSERT(!rio->write_watermark);
+  rio->file.write_handler = NULL;
+  file_chg(&rio->file);
+}
+
+static int
+rec_io_write_handler(struct main_file *fi)
+{
+  struct main_rec_io *rio = (struct main_rec_io *) fi;
+  struct rio_buffer *b = clist_head(&rio->busy_write_buffers);
+  if (!b)
+    {
+      rec_io_stop_write(rio);
+      return HOOK_IDLE;
+    }
+
+  int l = write(fi->fd, b->buf + b->written, b->full - b->written);
+  DBG("RIO WRITE: Written %d bytes", l);
+  if (l < 0)
+    {
+      if (errno != EINTR && errno != EAGAIN)
+       {
+         rec_io_stop_write(rio);
+         rio->notify_handler(rio, RIO_ERR_WRITE);
+       }
+      return HOOK_IDLE;
+    }
+  b->written += l;
+  if (b->written == b->full)
+    {
+      DBG("RIO WRITE: Written full buffer");
+      clist_remove(&b->n);
+      clist_add_tail(&rio->idle_write_buffers, &b->n);
+    }
+
+  rio->write_watermark -= l;
+  int ret = HOOK_RETRY;
+  if (!rio->write_watermark)
+    {
+      ret = HOOK_IDLE;
+      rec_io_stop_write(rio);
+    }
+
+  // Call the hook, but carefully, because it can delete the RIO structure
+  if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
+    ret = HOOK_IDLE;
+  return ret;
+}
+
+static struct rio_buffer *
+rec_io_get_buffer(struct main_rec_io *rio)
+{
+  struct rio_buffer *b = clist_remove_tail(&rio->idle_write_buffers);
+  if (b)
+    DBG("RIO WRITE: Recycled old buffer");
+  else
+    {
+      if (!rio->write_buf_size)
+       rio->write_buf_size = 1024;
+      b = xmalloc(sizeof(struct rio_buffer) + rio->write_buf_size);
+      DBG("RIO WRITE: Allocated new buffer");
+    }
+  b->full = b->written = 0;
+  return b;
+}
+
+void
+rec_io_write(struct main_rec_io *rio, void *data, uns len)
+{
+  byte *bdata = data;
+  ASSERT(clist_is_linked(&rio->file.n));
+  if (!len)
+    return;
+
+  while (len)
+    {
+      struct rio_buffer *b = clist_tail(&rio->busy_write_buffers);
+      if (!b || b->full >= rio->write_buf_size)
+       {
+         b = rec_io_get_buffer(rio);
+         clist_add_tail(&rio->busy_write_buffers, &b->n);
+       }
+      uns l = MIN(len, rio->write_buf_size - b->full);
+      memcpy(b->buf + b->full, bdata, l);
+      b->full += l;
+      bdata += l;
+      len -= l;
+      rio->write_watermark += l;
+      DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
+    }
+
+  if (!rio->file.write_handler)
+    {
+      DBG("RIO WRITE: Starting write");
+      rio->file.write_handler = rec_io_write_handler;
+      file_chg(&rio->file);
+    }
+}
+
+void
+rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta)
+{
+  DBG("RIO: Setting timeout %u", (uns) expires_delta);
+  if (!expires_delta)
+    timer_del(&rio->timer);
+  else
+    timer_add_rel(&rio->timer, expires_delta);
+}
+
+uns
+rec_io_parse_line(struct main_rec_io *rio)
+{
+  for (uns i = rio->read_prev_avail; i < rio->read_avail; i++)
+    if (rio->read_rec_start[i] == '\n')
+      return i+1;
+  return 0;
+}
+
+#ifdef TEST
+
+static uns rhand(struct main_rec_io *rio)
+{
+  uns r = rec_io_parse_line(rio);
+  if (r)
+    {
+      rio->read_rec_start[r-1] = 0;
+      printf("Read <%s>\n", rio->read_rec_start);
+      rec_io_set_timeout(rio, 10000);
+      rio->read_rec_start[r-1] = '\n';
+      rec_io_write(rio, rio->read_rec_start, r);
+    }
+  return r;
+}
+
+static int ehand(struct main_rec_io *rio, int cause)
+{
+  if (cause < 0)
+    {
+      msg(L_ERROR, "Error %d", cause);
+      rec_io_del(rio);
+      main_shut_down();
+      return HOOK_IDLE;
+    }
+  else
+    {
+      msg(L_INFO, "Event %d", cause);
+      return HOOK_RETRY;
+    }
+}
+
+int
+main(void)
+{
+  log_init(NULL);
+  main_init();
+
+  struct main_rec_io rio = {};
+  rio.read_buf_size = 4;
+  rio.read_handler = rhand;
+  rio.notify_handler = ehand;
+  // rio.read_rec_max = 40;
+  rio.write_buf_size = 4;
+  rec_io_add(&rio, 0);
+  rec_io_start_read(&rio);
+  rec_io_set_timeout(&rio, 10000);
+
+  main_debug();
+
+  main_loop();
+  msg(L_INFO, "Finished.");
+
+  if (clist_is_linked(&rio.file.n))
+    rec_io_del(&rio);
+  main_cleanup();
+  return 0;
+}
+
+#endif
index ae1c1e4a65600f44c4ad01d8c28c8f2c9f58fa15..e7b2deb5841530ad85aa80ed17aefdd841e30422 100644 (file)
@@ -371,6 +371,61 @@ void block_io_write(struct main_block_io *bio, void *buf, uns len);
  **/
 void block_io_set_timeout(struct main_block_io *bio, timestamp_t expires_delta);
 
+/***
+ * [[recordio]]
+ * Asynchronous record I/O
+ * -----------------------
+ *
+ * FIXME
+ ***/
+
+/** The record I/O structure. **/
+struct main_rec_io {
+  struct main_file file;
+  byte *read_buf;                              /* Reading half */
+  byte *read_rec_start;                                /* [*] Start of current record */
+  uns read_avail;                              /* [*] How much data is available */
+  uns read_prev_avail;                         /* [*] How much data was available in previous read_done */
+  uns read_buf_size;                           /* [*] Buffer size allocated (can set before rec_io_add()) */
+  uns read_running;                            /* Reading requested */
+  uns read_rec_max;                            /* [*] Maximum record size (0=unlimited) */
+  clist busy_write_buffers;
+  clist idle_write_buffers;
+  uns write_buf_size;
+  uns write_watermark;
+  uns write_throttle;
+  uns (*read_handler)(struct main_rec_io *rio);        /* [*] FIXME; describe EOF */
+  // FIXME: returns...
+  int (*notify_handler)(struct main_rec_io *rio, int status);  /* [*] Handler to call on errors */
+  struct main_timer timer;
+  void *data;                                  /* [*] Data for use by the handlers */
+};
+
+/** Activate a record I/O structure. **/
+void rec_io_add(struct main_rec_io *rio, int fd);
+
+/** Deactivate a record I/O structure. **/
+void rec_io_del(struct main_rec_io *rio);
+
+void rec_io_start_read(struct main_rec_io *rio);
+void rec_io_stop_read(struct main_rec_io *rio);
+void rec_io_set_timeout(struct main_rec_io *bio, timestamp_t expires_delta);
+
+uns rec_io_parse_line(struct main_rec_io *rio);
+
+void rec_io_write(struct main_rec_io *rio, void *data, uns len);
+
+// All errors except timeout are fatal
+enum rec_io_notify_status {
+  RIO_ERR_READ = -1,
+  RIO_ERR_WRITE = -2,
+  RIO_ERR_TIMEOUT = -3,
+  RIO_ERR_READ_RECORD_TOO_LARGE = -4,
+  RIO_ERR_READ_EOF = -5,
+  RIO_EVENT_ALL_WRITTEN = 1,
+  RIO_EVENT_PART_WRITTEN = 2,
+};
+
 /***
  * [[hooks]]
  * Loop hooks