]> mj.ucw.cz Git - libucw.git/blobdiff - ucw/main-rec.c
xtypes&tableprinter: added parsing of size and tests
[libucw.git] / ucw / main-rec.c
index 647bdf5364b5bd5827ee64522fd4ecdabdbcc0c8..f85aa4aacd935fa85f38159241b361505c13427f 100644 (file)
@@ -1,7 +1,7 @@
 /*
  *     UCW Library -- Main Loop: Record I/O
  *
- *     (c) 2011 Martin Mares <mj@ucw.cz>
+ *     (c) 2011--2012 Martin Mares <mj@ucw.cz>
  *
  *     This software may be freely distributed and used according to the terms
  *     of the GNU Lesser General Public License.
@@ -9,8 +9,8 @@
 
 #undef LOCAL_DEBUG
 
-#include "ucw/lib.h"
-#include "ucw/mainloop.h"
+#include <ucw/lib.h>
+#include <ucw/mainloop.h>
 
 #include <stdio.h>
 #include <string.h>
@@ -20,8 +20,8 @@
 
 struct rio_buffer {
   cnode n;
-  uns full;
-  uns written;
+  uint full;
+  uint written;
   byte buf[];
 };
 
@@ -33,6 +33,8 @@ rec_io_timer_expired(struct main_timer *tm)
   rio->notify_handler(rio, RIO_ERR_TIMEOUT);
 }
 
+static int rec_io_deferred_start_read(struct main_hook *ho);
+
 void
 rec_io_add(struct main_rec_io *rio, int fd)
 {
@@ -40,6 +42,8 @@ rec_io_add(struct main_rec_io *rio, int fd)
   file_add(&rio->file);
   rio->timer.handler = rec_io_timer_expired;
   rio->timer.data = rio;
+  rio->start_read_hook.handler = rec_io_deferred_start_read;
+  rio->start_read_hook.data = rio;
   clist_init(&rio->idle_write_buffers);
   clist_init(&rio->busy_write_buffers);
 }
@@ -47,7 +51,11 @@ rec_io_add(struct main_rec_io *rio, int fd)
 void
 rec_io_del(struct main_rec_io *rio)
 {
+  if (!rec_io_is_active(rio))
+    return;
+
   timer_del(&rio->timer);
+  hook_del(&rio->start_read_hook);
   file_del(&rio->file);
 
   if (rio->read_buf)
@@ -65,6 +73,29 @@ rec_io_del(struct main_rec_io *rio)
     }
 }
 
+static int
+rec_io_process_read_buf(struct main_rec_io *rio)
+{
+  uint got;
+  while (rio->read_running && (got = rio->read_handler(rio)))
+    {
+      DBG("RIO READ: Ate %u bytes", got);
+      if (got == ~0U)
+       return HOOK_IDLE;
+      rio->read_rec_start += got;
+      rio->read_avail -= got;
+      rio->read_prev_avail = 0;
+      if (!rio->read_avail)
+       {
+         DBG("RIO READ: Resetting buffer");
+         rio->read_rec_start = rio->read_buf;
+         break;
+       }
+    }
+  DBG("RIO READ: Want more");
+  return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
+}
+
 static int
 rec_io_read_handler(struct main_file *fi)
 {
@@ -73,18 +104,17 @@ rec_io_read_handler(struct main_file *fi)
   if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
     {
       rec_io_stop_read(rio);
-      rio->notify_handler(rio, RIO_ERR_READ_RECORD_TOO_LARGE);
+      rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
       return HOOK_IDLE;
     }
 
 restart: ;
-  uns rec_start_pos = rio->read_rec_start - rio->read_buf;
-  uns rec_end_pos = rec_start_pos + rio->read_avail;
-  uns free_space = rio->read_buf_size - rec_end_pos;
+  uint rec_start_pos = rio->read_rec_start - rio->read_buf;
+  uint rec_end_pos = rec_start_pos + rio->read_avail;
+  uint free_space = rio->read_buf_size - rec_end_pos;
   DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
     rec_start_pos, rio->read_avail, rio->read_prev_avail,
     free_space, rio->read_buf_size);
-  // FIXME: Constants?
   if (free_space <= rio->read_buf_size/8)
     {
       if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
@@ -120,68 +150,99 @@ restart: ;
     {
       DBG("RIO READ: Signalling EOF");
       rec_io_stop_read(rio);
-      rio->notify_handler(rio, RIO_ERR_READ_EOF);
+      rio->notify_handler(rio, RIO_EVENT_EOF);
       return HOOK_IDLE;
     }
   rio->read_prev_avail = rio->read_avail;
   rio->read_avail += l;
   DBG("RIO READ: Available: %u bytes", rio->read_avail);
 
-  uns got;
-  while (rio->read_running && (got = rio->read_handler(rio)))
-    {
-      DBG("RIO READ: Ate %u bytes", got);
-      rio->read_rec_start += got;
-      rio->read_avail -= got;
-      rio->read_prev_avail = 0;
-      if (!rio->read_avail)
-       {
-         DBG("RIO READ: Resetting buffer");
-         rio->read_rec_start = rio->read_buf;
-         break;
-       }
-    }
-  DBG("RIO READ: Want more");
-  return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
+  return rec_io_process_read_buf(rio);
 }
 
-void
-rec_io_start_read(struct main_rec_io *rio)
+static int
+rec_io_deferred_start_read(struct main_hook *ho)
 {
-  ASSERT(clist_is_linked(&rio->file.n));
-  if (rio->read_running)
-    return;
+  struct main_rec_io *rio = ho->data;
+
+  DBG("RIO: Starting reading");
   if (!rio->read_buf)
     {
       if (!rio->read_buf_size)
        rio->read_buf_size = 256;
       rio->read_buf = xmalloc(rio->read_buf_size);
-      DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
+      DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size);
       rio->read_rec_start = rio->read_buf;
     }
+
   rio->file.read_handler = rec_io_read_handler;
   file_chg(&rio->file);
+  hook_del(ho);
   rio->read_running = 1;
-  DBG("RIO: Reading started");
+
+  rio->read_prev_avail = 0;
+  return rec_io_process_read_buf(rio);
+}
+
+static void
+rec_io_recalc_read(struct main_rec_io *rio)
+{
+  uint flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
+  uint run = rio->read_started && flow;
+  DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
+  if (run != rio->read_running)
+    {
+      if (run)
+       {
+         /*
+          * Since we need to rescan the read buffer for leftover records and we
+          * can be deep in the call stack at this moment, we better defer most
+          * of the work to a main_hook, which will be called in the next iteration
+          * of the main loop.
+          */
+         if (!hook_is_active(&rio->start_read_hook))
+           {
+             DBG("RIO: Scheduling start of reading");
+             hook_add(&rio->start_read_hook);
+           }
+       }
+      else
+       {
+         if (hook_is_active(&rio->start_read_hook))
+           {
+             DBG("RIO: Descheduling start of reading");
+             hook_del(&rio->start_read_hook);
+           }
+         rio->file.read_handler = NULL;
+         file_chg(&rio->file);
+         DBG("RIO: Reading stopped");
+         rio->read_running = 0;
+       }
+    }
+}
+
+void
+rec_io_start_read(struct main_rec_io *rio)
+{
+  ASSERT(rec_io_is_active(rio));
+  rio->read_started = 1;
+  rec_io_recalc_read(rio);
 }
 
 void
 rec_io_stop_read(struct main_rec_io *rio)
 {
-  ASSERT(clist_is_linked(&rio->file.n));
-  if (!rio->read_running)
-    return;
-  rio->file.read_handler = NULL;
-  file_chg(&rio->file);
-  rio->read_running = 0;
-  DBG("RIO: Reading stopped");
+  ASSERT(rec_io_is_active(rio));
+  rio->read_started = 0;
+  rec_io_recalc_read(rio);
 }
 
 static void
 rec_io_stop_write(struct main_rec_io *rio)
 {
   DBG("RIO WRITE: Stopping write");
-  ASSERT(!rio->write_watermark);
+  // XXX: When we are called after a write error, there might still
+  // be some data queued, but we need not care.
   rio->file.write_handler = NULL;
   file_chg(&rio->file);
 }
@@ -223,6 +284,7 @@ rec_io_write_handler(struct main_file *fi)
       ret = HOOK_IDLE;
       rec_io_stop_write(rio);
     }
+  rec_io_recalc_read(rio);
 
   // Call the hook, but carefully, because it can delete the RIO structure
   if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
@@ -248,10 +310,10 @@ rec_io_get_buffer(struct main_rec_io *rio)
 }
 
 void
-rec_io_write(struct main_rec_io *rio, void *data, uns len)
+rec_io_write(struct main_rec_io *rio, void *data, uint len)
 {
   byte *bdata = data;
-  ASSERT(clist_is_linked(&rio->file.n));
+  ASSERT(rec_io_is_active(rio));
   if (!len)
     return;
 
@@ -263,13 +325,14 @@ rec_io_write(struct main_rec_io *rio, void *data, uns len)
          b = rec_io_get_buffer(rio);
          clist_add_tail(&rio->busy_write_buffers, &b->n);
        }
-      uns l = MIN(len, rio->write_buf_size - b->full);
+      uint l = MIN(len, rio->write_buf_size - b->full);
       memcpy(b->buf + b->full, bdata, l);
       b->full += l;
       bdata += l;
       len -= l;
       rio->write_watermark += l;
       DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
+      rec_io_recalc_read(rio);
     }
 
   if (!rio->file.write_handler)
@@ -283,17 +346,17 @@ rec_io_write(struct main_rec_io *rio, void *data, uns len)
 void
 rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta)
 {
-  DBG("RIO: Setting timeout %u", (uns) expires_delta);
+  DBG("RIO: Setting timeout %u", (uint) expires_delta);
   if (!expires_delta)
     timer_del(&rio->timer);
   else
     timer_add_rel(&rio->timer, expires_delta);
 }
 
-uns
+uint
 rec_io_parse_line(struct main_rec_io *rio)
 {
-  for (uns i = rio->read_prev_avail; i < rio->read_avail; i++)
+  for (uint i = rio->read_prev_avail; i < rio->read_avail; i++)
     if (rio->read_rec_start[i] == '\n')
       return i+1;
   return 0;
@@ -301,13 +364,19 @@ rec_io_parse_line(struct main_rec_io *rio)
 
 #ifdef TEST
 
-static uns rhand(struct main_rec_io *rio)
+static uint rhand(struct main_rec_io *rio)
 {
-  uns r = rec_io_parse_line(rio);
+  uint r = rec_io_parse_line(rio);
   if (r)
     {
       rio->read_rec_start[r-1] = 0;
       printf("Read <%s>\n", rio->read_rec_start);
+      if (rio->read_rec_start[0] == '!')
+       {
+         rec_io_del(rio);
+         main_shut_down();
+         return ~0U;
+       }
       rec_io_set_timeout(rio, 10000);
       rio->read_rec_start[r-1] = '\n';
       rec_io_write(rio, rio->read_rec_start, r);
@@ -317,7 +386,7 @@ static uns rhand(struct main_rec_io *rio)
 
 static int ehand(struct main_rec_io *rio, int cause)
 {
-  if (cause < 0)
+  if (cause < 0 || cause == RIO_EVENT_EOF)
     {
       msg(L_ERROR, "Error %d", cause);
       rec_io_del(rio);
@@ -343,6 +412,7 @@ main(void)
   rio.notify_handler = ehand;
   // rio.read_rec_max = 40;
   rio.write_buf_size = 4;
+  rio.write_throttle_read = 6;
   rec_io_add(&rio, 0);
   rec_io_start_read(&rio);
   rec_io_set_timeout(&rio, 10000);
@@ -352,7 +422,7 @@ main(void)
   main_loop();
   msg(L_INFO, "Finished.");
 
-  if (clist_is_linked(&rio.file.n))
+  if (file_is_active(&rio.file))
     rec_io_del(&rio);
   main_cleanup();
   return 0;