]> mj.ucw.cz Git - libucw.git/commitdiff
Main recio: Allow rec_io_(start|stop)_read from the read_handler
authorMartin Mares <mj@ucw.cz>
Thu, 9 Jun 2011 16:04:26 +0000 (18:04 +0200)
committerMartin Mares <mj@ucw.cz>
Thu, 9 Jun 2011 16:04:26 +0000 (18:04 +0200)
This involves rescanning the read buffer on rec_io_start_read()
as there might be leftover data from previous invokations of the
read_handler.

However, we have to be careful to avoid arbitrarily deep recursion
of various handlers. We do that by deferring the actual start of
reading to a main_hook.

ucw/main-rec.c
ucw/mainloop.h

index 2def9c8971f6ecf4cb36d7df77425e76dcc675a9..06c574944144d4b6b547323ae8c961df93d0e92f 100644 (file)
@@ -33,6 +33,8 @@ rec_io_timer_expired(struct main_timer *tm)
   rio->notify_handler(rio, RIO_ERR_TIMEOUT);
 }
 
+static int rec_io_deferred_start_read(struct main_hook *ho);
+
 void
 rec_io_add(struct main_rec_io *rio, int fd)
 {
@@ -40,6 +42,8 @@ rec_io_add(struct main_rec_io *rio, int fd)
   file_add(&rio->file);
   rio->timer.handler = rec_io_timer_expired;
   rio->timer.data = rio;
+  rio->start_read_hook.handler = rec_io_deferred_start_read;
+  rio->start_read_hook.data = rio;
   clist_init(&rio->idle_write_buffers);
   clist_init(&rio->busy_write_buffers);
 }
@@ -48,6 +52,8 @@ void
 rec_io_del(struct main_rec_io *rio)
 {
   timer_del(&rio->timer);
+  if (clist_is_linked(&rio->start_read_hook.n))
+    hook_del(&rio->start_read_hook);
   file_del(&rio->file);
 
   if (rio->read_buf)
@@ -65,6 +71,29 @@ rec_io_del(struct main_rec_io *rio)
     }
 }
 
+static int
+rec_io_process_read_buf(struct main_rec_io *rio)
+{
+  uns got;
+  while (rio->read_running && (got = rio->read_handler(rio)))
+    {
+      DBG("RIO READ: Ate %u bytes", got);
+      if (got == ~0U)
+       return HOOK_IDLE;
+      rio->read_rec_start += got;
+      rio->read_avail -= got;
+      rio->read_prev_avail = 0;
+      if (!rio->read_avail)
+       {
+         DBG("RIO READ: Resetting buffer");
+         rio->read_rec_start = rio->read_buf;
+         break;
+       }
+    }
+  DBG("RIO READ: Want more");
+  return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
+}
+
 static int
 rec_io_read_handler(struct main_file *fi)
 {
@@ -127,24 +156,31 @@ restart: ;
   rio->read_avail += l;
   DBG("RIO READ: Available: %u bytes", rio->read_avail);
 
-  uns got;
-  while (rio->read_running && (got = rio->read_handler(rio)))
+  return rec_io_process_read_buf(rio);
+}
+
+static int
+rec_io_deferred_start_read(struct main_hook *ho)
+{
+  struct main_rec_io *rio = ho->data;
+
+  DBG("RIO: Starting reading");
+  if (!rio->read_buf)
     {
-      DBG("RIO READ: Ate %u bytes", got);
-      if (got == ~0U)
-       return HOOK_IDLE;
-      rio->read_rec_start += got;
-      rio->read_avail -= got;
-      rio->read_prev_avail = 0;
-      if (!rio->read_avail)
-       {
-         DBG("RIO READ: Resetting buffer");
-         rio->read_rec_start = rio->read_buf;
-         break;
-       }
+      if (!rio->read_buf_size)
+       rio->read_buf_size = 256;
+      rio->read_buf = xmalloc(rio->read_buf_size);
+      DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size);
+      rio->read_rec_start = rio->read_buf;
     }
-  DBG("RIO READ: Want more");
-  return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
+
+  rio->file.read_handler = rec_io_read_handler;
+  file_chg(&rio->file);
+  hook_del(ho);
+  rio->read_running = 1;
+
+  rio->read_prev_avail = 0;
+  return rec_io_process_read_buf(rio);
 }
 
 static void
@@ -157,25 +193,30 @@ rec_io_recalc_read(struct main_rec_io *rio)
     {
       if (run)
        {
-         if (!rio->read_buf)
+         /*
+          * Since we need to rescan the read buffer for leftover records and we
+          * can be deep in the call stack at this moment, we better defer most
+          * of the work to a main_hook, which will be called in the next iteration
+          * of the main loop.
+          */
+         if (!clist_is_linked(&rio->start_read_hook.n))
            {
-             if (!rio->read_buf_size)
-               rio->read_buf_size = 256;
-             rio->read_buf = xmalloc(rio->read_buf_size);
-             DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
-             rio->read_rec_start = rio->read_buf;
+             DBG("RIO: Scheduling start of reading");
+             hook_add(&rio->start_read_hook);
            }
-         rio->file.read_handler = rec_io_read_handler;
-         file_chg(&rio->file);
-         DBG("RIO: Reading started");
        }
       else
        {
+         if (clist_is_linked(&rio->start_read_hook.n))
+           {
+             DBG("RIO: Descheduling start of reading");
+             hook_del(&rio->start_read_hook);
+           }
          rio->file.read_handler = NULL;
          file_chg(&rio->file);
          DBG("RIO: Reading stopped");
+         rio->read_running = 0;
        }
-      rio->read_running = run;
     }
 }
 
index 19303f49d81911a6ae8a761d727a8ab8965b9def..bf5cfcaef4f5737104be2c840d3a93ffe6b6f1e1 100644 (file)
@@ -487,6 +487,7 @@ struct main_rec_io {
   int (*notify_handler)(struct main_rec_io *rio, int status);  /* [*] Called to notify about errors and other events */
                                                /* Returns either HOOK_RETRY or HOOK_IDLE. */
   struct main_timer timer;
+  struct main_hook start_read_hook;            /* Used internally to defer rec_io_start_read() */
   void *data;                                  /* [*] Data for use by the handlers */
 };
 
@@ -496,7 +497,14 @@ void rec_io_add(struct main_rec_io *rio, int fd);
 /** Deactivate a record I/O structure. **/
 void rec_io_del(struct main_rec_io *rio);
 
-/** Start reading. **/
+/**
+ * Start reading.
+ *
+ * When there were some data in the buffer (e.g., because @rec_io_stop_read()
+ * was called from the `read_handler`), it is processed as if it were read
+ * from the file once again. That is, `read_prev_avail` is reset to 0 and
+ * the `read_handler` is called to process all buffered data.
+ ***/
 void rec_io_start_read(struct main_rec_io *rio);
 
 /** Stop reading. **/