From 49160a46b98fed232d0de854ee224199a11df0e0 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Mon, 2 May 2011 01:49:59 +0200 Subject: [PATCH] Main record I/O: Flow control and a few bits of documentation --- ucw/main-rec.c | 65 ++++++++++++++++++++++++++++++++------------------ ucw/mainloop.h | 61 ++++++++++++++++++++++++++++++---------------- 2 files changed, 83 insertions(+), 43 deletions(-) diff --git a/ucw/main-rec.c b/ucw/main-rec.c index 647bdf53..3c4c0678 100644 --- a/ucw/main-rec.c +++ b/ucw/main-rec.c @@ -73,7 +73,7 @@ rec_io_read_handler(struct main_file *fi) if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max) { rec_io_stop_read(rio); - rio->notify_handler(rio, RIO_ERR_READ_RECORD_TOO_LARGE); + rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE); return HOOK_IDLE; } @@ -120,7 +120,7 @@ restart: ; { DBG("RIO READ: Signalling EOF"); rec_io_stop_read(rio); - rio->notify_handler(rio, RIO_ERR_READ_EOF); + rio->notify_handler(rio, RIO_EVENT_EOF); return HOOK_IDLE; } rio->read_prev_avail = rio->read_avail; @@ -145,36 +145,52 @@ restart: ; return (rio->read_running ? HOOK_RETRY : HOOK_IDLE); } +static void +rec_io_recalc_read(struct main_rec_io *rio) +{ + uns flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read; + uns run = rio->read_started && flow; + DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run); + if (run != rio->read_running) + { + if (run) + { + if (!rio->read_buf) + { + if (!rio->read_buf_size) + rio->read_buf_size = 256; + rio->read_buf = xmalloc(rio->read_buf_size); + DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size); + rio->read_rec_start = rio->read_buf; + } + rio->file.read_handler = rec_io_read_handler; + file_chg(&rio->file); + DBG("RIO: Reading started"); + } + else + { + rio->file.read_handler = NULL; + file_chg(&rio->file); + DBG("RIO: Reading stopped"); + } + rio->read_running = run; + } +} + void rec_io_start_read(struct main_rec_io *rio) { ASSERT(clist_is_linked(&rio->file.n)); - if (rio->read_running) - return; - if (!rio->read_buf) - { - if (!rio->read_buf_size) - rio->read_buf_size = 256; - rio->read_buf = xmalloc(rio->read_buf_size); - DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size); - rio->read_rec_start = rio->read_buf; - } - rio->file.read_handler = rec_io_read_handler; - file_chg(&rio->file); - rio->read_running = 1; - DBG("RIO: Reading started"); + rio->read_started = 1; + rec_io_recalc_read(rio); } void rec_io_stop_read(struct main_rec_io *rio) { ASSERT(clist_is_linked(&rio->file.n)); - if (!rio->read_running) - return; - rio->file.read_handler = NULL; - file_chg(&rio->file); - rio->read_running = 0; - DBG("RIO: Reading stopped"); + rio->read_started = 0; + rec_io_recalc_read(rio); } static void @@ -223,6 +239,7 @@ rec_io_write_handler(struct main_file *fi) ret = HOOK_IDLE; rec_io_stop_write(rio); } + rec_io_recalc_read(rio); // Call the hook, but carefully, because it can delete the RIO structure if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE) @@ -270,6 +287,7 @@ rec_io_write(struct main_rec_io *rio, void *data, uns len) len -= l; rio->write_watermark += l; DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark); + rec_io_recalc_read(rio); } if (!rio->file.write_handler) @@ -317,7 +335,7 @@ static uns rhand(struct main_rec_io *rio) static int ehand(struct main_rec_io *rio, int cause) { - if (cause < 0) + if (cause < 0 || cause == RIO_EVENT_EOF) { msg(L_ERROR, "Error %d", cause); rec_io_del(rio); @@ -343,6 +361,7 @@ main(void) rio.notify_handler = ehand; // rio.read_rec_max = 40; rio.write_buf_size = 4; + rio.write_throttle_read = 6; rec_io_add(&rio, 0); rec_io_start_read(&rio); rec_io_set_timeout(&rio, 10000); diff --git a/ucw/mainloop.h b/ucw/mainloop.h index e7b2deb5..322ee8b2 100644 --- a/ucw/mainloop.h +++ b/ucw/mainloop.h @@ -382,21 +382,22 @@ void block_io_set_timeout(struct main_block_io *bio, timestamp_t expires_delta); /** The record I/O structure. **/ struct main_rec_io { struct main_file file; - byte *read_buf; /* Reading half */ + byte *read_buf; byte *read_rec_start; /* [*] Start of current record */ uns read_avail; /* [*] How much data is available */ - uns read_prev_avail; /* [*] How much data was available in previous read_done */ - uns read_buf_size; /* [*] Buffer size allocated (can set before rec_io_add()) */ - uns read_running; /* Reading requested */ + uns read_prev_avail; /* [*] How much data was available in previous read_handler */ + uns read_buf_size; /* [*] Read buffer size allocated (can be set before rec_io_add()) */ + uns read_started; /* Reading requested by user */ + uns read_running; /* Reading really runs (read_started && not stopped by write_throttle_read) */ uns read_rec_max; /* [*] Maximum record size (0=unlimited) */ clist busy_write_buffers; clist idle_write_buffers; - uns write_buf_size; - uns write_watermark; - uns write_throttle; - uns (*read_handler)(struct main_rec_io *rio); /* [*] FIXME; describe EOF */ - // FIXME: returns... - int (*notify_handler)(struct main_rec_io *rio, int status); /* [*] Handler to call on errors */ + uns write_buf_size; /* [*] Write buffer size allocated (can be set before rec_io_add()) */ + uns write_watermark; /* [*] How many data are waiting to be written */ + uns write_throttle_read; /* [*] If more than write_throttle_read bytes are buffered, stop reading; 0=no stopping */ + uns (*read_handler)(struct main_rec_io *rio); /* [*] Called whenever more bytes are read; returns 0 (want more) or number of bytes eaten */ + int (*notify_handler)(struct main_rec_io *rio, int status); /* [*] Called to notify about errors and other events */ + /* Returns either HOOK_RETRY or HOOK_IDLE. */ struct main_timer timer; void *data; /* [*] Data for use by the handlers */ }; @@ -407,23 +408,43 @@ void rec_io_add(struct main_rec_io *rio, int fd); /** Deactivate a record I/O structure. **/ void rec_io_del(struct main_rec_io *rio); +/** Start reading. **/ void rec_io_start_read(struct main_rec_io *rio); + +/** Stop reading. **/ void rec_io_stop_read(struct main_rec_io *rio); -void rec_io_set_timeout(struct main_rec_io *bio, timestamp_t expires_delta); -uns rec_io_parse_line(struct main_rec_io *rio); +/** Analogous to @block_io_set_timeout(). **/ +void rec_io_set_timeout(struct main_rec_io *bio, timestamp_t expires_delta); void rec_io_write(struct main_rec_io *rio, void *data, uns len); -// All errors except timeout are fatal +/** + * An auxiliary function used for parsing of lines. When called in the @read_handler, + * it searches for the end of line character. When a complete line is found, the length + * of the line (including the end of line character) is returned. Otherwise, it returns zero. + **/ +uns rec_io_parse_line(struct main_rec_io *rio); + +/** + * Specifies what kind of error or other event happened, when the @notify_handler + * is called. In case of I/O errors, `errno` is still set. + * + * Upon @RIO_ERR_READ, @RIO_ERR_RECORD_TOO_LARGE and @RIO_EVENT_EOF, reading is stopped + * automatically. Upon @RIO_ERR_WRITE, writing is stopped. Upon @RIO_ERR_TIMEOUT, only the + * timer is deactivated. + * + * In all cases, the notification handler is allowed to call @rec_io_del(), but it + * must return @HOOK_IDLE in such cases. + **/ enum rec_io_notify_status { - RIO_ERR_READ = -1, - RIO_ERR_WRITE = -2, - RIO_ERR_TIMEOUT = -3, - RIO_ERR_READ_RECORD_TOO_LARGE = -4, - RIO_ERR_READ_EOF = -5, - RIO_EVENT_ALL_WRITTEN = 1, - RIO_EVENT_PART_WRITTEN = 2, + RIO_ERR_READ = -1, /* read() returned an error, errno set */ + RIO_ERR_WRITE = -2, /* write() returned an error, errno set */ + RIO_ERR_TIMEOUT = -3, /* A timeout has occurred */ + RIO_ERR_RECORD_TOO_LARGE = -4, /* Read: read_rec_max has been exceeded */ + RIO_EVENT_ALL_WRITTEN = 1, /* All buffered data has been written */ + RIO_EVENT_PART_WRITTEN = 2, /* Some buffered data has been written, but more remains */ + RIO_EVENT_EOF = 3, /* Read: EOF seen */ }; /*** -- 2.39.2