]> mj.ucw.cz Git - libucw.git/commitdiff
Main record I/O: Flow control and a few bits of documentation
authorMartin Mares <mj@ucw.cz>
Sun, 1 May 2011 23:49:59 +0000 (01:49 +0200)
committerMartin Mares <mj@ucw.cz>
Sun, 1 May 2011 23:49:59 +0000 (01:49 +0200)
ucw/main-rec.c
ucw/mainloop.h

index 647bdf5364b5bd5827ee64522fd4ecdabdbcc0c8..3c4c0678b73fa9f783f771e970c0aaf70ca0b2f6 100644 (file)
@@ -73,7 +73,7 @@ rec_io_read_handler(struct main_file *fi)
   if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
     {
       rec_io_stop_read(rio);
-      rio->notify_handler(rio, RIO_ERR_READ_RECORD_TOO_LARGE);
+      rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
       return HOOK_IDLE;
     }
 
@@ -120,7 +120,7 @@ restart: ;
     {
       DBG("RIO READ: Signalling EOF");
       rec_io_stop_read(rio);
-      rio->notify_handler(rio, RIO_ERR_READ_EOF);
+      rio->notify_handler(rio, RIO_EVENT_EOF);
       return HOOK_IDLE;
     }
   rio->read_prev_avail = rio->read_avail;
@@ -145,36 +145,52 @@ restart: ;
   return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
 }
 
+static void
+rec_io_recalc_read(struct main_rec_io *rio)
+{
+  uns flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
+  uns run = rio->read_started && flow;
+  DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
+  if (run != rio->read_running)
+    {
+      if (run)
+       {
+         if (!rio->read_buf)
+           {
+             if (!rio->read_buf_size)
+               rio->read_buf_size = 256;
+             rio->read_buf = xmalloc(rio->read_buf_size);
+             DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
+             rio->read_rec_start = rio->read_buf;
+           }
+         rio->file.read_handler = rec_io_read_handler;
+         file_chg(&rio->file);
+         DBG("RIO: Reading started");
+       }
+      else
+       {
+         rio->file.read_handler = NULL;
+         file_chg(&rio->file);
+         DBG("RIO: Reading stopped");
+       }
+      rio->read_running = run;
+    }
+}
+
 void
 rec_io_start_read(struct main_rec_io *rio)
 {
   ASSERT(clist_is_linked(&rio->file.n));
-  if (rio->read_running)
-    return;
-  if (!rio->read_buf)
-    {
-      if (!rio->read_buf_size)
-       rio->read_buf_size = 256;
-      rio->read_buf = xmalloc(rio->read_buf_size);
-      DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
-      rio->read_rec_start = rio->read_buf;
-    }
-  rio->file.read_handler = rec_io_read_handler;
-  file_chg(&rio->file);
-  rio->read_running = 1;
-  DBG("RIO: Reading started");
+  rio->read_started = 1;
+  rec_io_recalc_read(rio);
 }
 
 void
 rec_io_stop_read(struct main_rec_io *rio)
 {
   ASSERT(clist_is_linked(&rio->file.n));
-  if (!rio->read_running)
-    return;
-  rio->file.read_handler = NULL;
-  file_chg(&rio->file);
-  rio->read_running = 0;
-  DBG("RIO: Reading stopped");
+  rio->read_started = 0;
+  rec_io_recalc_read(rio);
 }
 
 static void
@@ -223,6 +239,7 @@ rec_io_write_handler(struct main_file *fi)
       ret = HOOK_IDLE;
       rec_io_stop_write(rio);
     }
+  rec_io_recalc_read(rio);
 
   // Call the hook, but carefully, because it can delete the RIO structure
   if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
@@ -270,6 +287,7 @@ rec_io_write(struct main_rec_io *rio, void *data, uns len)
       len -= l;
       rio->write_watermark += l;
       DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
+      rec_io_recalc_read(rio);
     }
 
   if (!rio->file.write_handler)
@@ -317,7 +335,7 @@ static uns rhand(struct main_rec_io *rio)
 
 static int ehand(struct main_rec_io *rio, int cause)
 {
-  if (cause < 0)
+  if (cause < 0 || cause == RIO_EVENT_EOF)
     {
       msg(L_ERROR, "Error %d", cause);
       rec_io_del(rio);
@@ -343,6 +361,7 @@ main(void)
   rio.notify_handler = ehand;
   // rio.read_rec_max = 40;
   rio.write_buf_size = 4;
+  rio.write_throttle_read = 6;
   rec_io_add(&rio, 0);
   rec_io_start_read(&rio);
   rec_io_set_timeout(&rio, 10000);
index e7b2deb5841530ad85aa80ed17aefdd841e30422..322ee8b2e737a720b4594811ab14f450498a1298 100644 (file)
@@ -382,21 +382,22 @@ void block_io_set_timeout(struct main_block_io *bio, timestamp_t expires_delta);
 /** The record I/O structure. **/
 struct main_rec_io {
   struct main_file file;
-  byte *read_buf;                              /* Reading half */
+  byte *read_buf;
   byte *read_rec_start;                                /* [*] Start of current record */
   uns read_avail;                              /* [*] How much data is available */
-  uns read_prev_avail;                         /* [*] How much data was available in previous read_done */
-  uns read_buf_size;                           /* [*] Buffer size allocated (can set before rec_io_add()) */
-  uns read_running;                            /* Reading requested */
+  uns read_prev_avail;                         /* [*] How much data was available in previous read_handler */
+  uns read_buf_size;                           /* [*] Read buffer size allocated (can be set before rec_io_add()) */
+  uns read_started;                            /* Reading requested by user */
+  uns read_running;                            /* Reading really runs (read_started && not stopped by write_throttle_read) */
   uns read_rec_max;                            /* [*] Maximum record size (0=unlimited) */
   clist busy_write_buffers;
   clist idle_write_buffers;
-  uns write_buf_size;
-  uns write_watermark;
-  uns write_throttle;
-  uns (*read_handler)(struct main_rec_io *rio);        /* [*] FIXME; describe EOF */
-  // FIXME: returns...
-  int (*notify_handler)(struct main_rec_io *rio, int status);  /* [*] Handler to call on errors */
+  uns write_buf_size;                          /* [*] Write buffer size allocated (can be set before rec_io_add()) */
+  uns write_watermark;                         /* [*] How many data are waiting to be written */
+  uns write_throttle_read;                     /* [*] If more than write_throttle_read bytes are buffered, stop reading; 0=no stopping */
+  uns (*read_handler)(struct main_rec_io *rio);        /* [*] Called whenever more bytes are read; returns 0 (want more) or number of bytes eaten */
+  int (*notify_handler)(struct main_rec_io *rio, int status);  /* [*] Called to notify about errors and other events */
+                                               /* Returns either HOOK_RETRY or HOOK_IDLE. */
   struct main_timer timer;
   void *data;                                  /* [*] Data for use by the handlers */
 };
@@ -407,23 +408,43 @@ void rec_io_add(struct main_rec_io *rio, int fd);
 /** Deactivate a record I/O structure. **/
 void rec_io_del(struct main_rec_io *rio);
 
+/** Start reading. **/
 void rec_io_start_read(struct main_rec_io *rio);
+
+/** Stop reading. **/
 void rec_io_stop_read(struct main_rec_io *rio);
-void rec_io_set_timeout(struct main_rec_io *bio, timestamp_t expires_delta);
 
-uns rec_io_parse_line(struct main_rec_io *rio);
+/** Analogous to @block_io_set_timeout(). **/
+void rec_io_set_timeout(struct main_rec_io *bio, timestamp_t expires_delta);
 
 void rec_io_write(struct main_rec_io *rio, void *data, uns len);
 
-// All errors except timeout are fatal
+/**
+ * An auxiliary function used for parsing of lines. When called in the @read_handler,
+ * it searches for the end of line character. When a complete line is found, the length
+ * of the line (including the end of line character) is returned. Otherwise, it returns zero.
+ **/
+uns rec_io_parse_line(struct main_rec_io *rio);
+
+/**
+ * Specifies what kind of error or other event happened, when the @notify_handler
+ * is called. In case of I/O errors, `errno` is still set.
+ *
+ * Upon @RIO_ERR_READ, @RIO_ERR_RECORD_TOO_LARGE and @RIO_EVENT_EOF, reading is stopped
+ * automatically. Upon @RIO_ERR_WRITE, writing is stopped. Upon @RIO_ERR_TIMEOUT, only the
+ * timer is deactivated.
+ *
+ * In all cases, the notification handler is allowed to call @rec_io_del(), but it
+ * must return @HOOK_IDLE in such cases.
+ **/
 enum rec_io_notify_status {
-  RIO_ERR_READ = -1,
-  RIO_ERR_WRITE = -2,
-  RIO_ERR_TIMEOUT = -3,
-  RIO_ERR_READ_RECORD_TOO_LARGE = -4,
-  RIO_ERR_READ_EOF = -5,
-  RIO_EVENT_ALL_WRITTEN = 1,
-  RIO_EVENT_PART_WRITTEN = 2,
+  RIO_ERR_READ = -1,                   /* read() returned an error, errno set */
+  RIO_ERR_WRITE = -2,                  /* write() returned an error, errno set */
+  RIO_ERR_TIMEOUT = -3,                        /* A timeout has occurred */
+  RIO_ERR_RECORD_TOO_LARGE = -4,       /* Read: read_rec_max has been exceeded */
+  RIO_EVENT_ALL_WRITTEN = 1,           /* All buffered data has been written */
+  RIO_EVENT_PART_WRITTEN = 2,          /* Some buffered data has been written, but more remains */
+  RIO_EVENT_EOF = 3,                   /* Read: EOF seen */
 };
 
 /***