X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;ds=sidebyside;f=ucw%2Fmain-rec.c;h=ca313187fdf905490bb498035bff01b7d5e7a1a5;hb=97c8e41bd5fba61fe60c7e8fc20484a58a371583;hp=647bdf5364b5bd5827ee64522fd4ecdabdbcc0c8;hpb=2623d85c7a2cfbbfafc9b9b8a4f5e75ab21179f3;p=libucw.git diff --git a/ucw/main-rec.c b/ucw/main-rec.c index 647bdf53..ca313187 100644 --- a/ucw/main-rec.c +++ b/ucw/main-rec.c @@ -1,7 +1,7 @@ /* * UCW Library -- Main Loop: Record I/O * - * (c) 2011 Martin Mares + * (c) 2011--2012 Martin Mares * * This software may be freely distributed and used according to the terms * of the GNU Lesser General Public License. @@ -9,8 +9,8 @@ #undef LOCAL_DEBUG -#include "ucw/lib.h" -#include "ucw/mainloop.h" +#include +#include #include #include @@ -33,6 +33,8 @@ rec_io_timer_expired(struct main_timer *tm) rio->notify_handler(rio, RIO_ERR_TIMEOUT); } +static int rec_io_deferred_start_read(struct main_hook *ho); + void rec_io_add(struct main_rec_io *rio, int fd) { @@ -40,6 +42,8 @@ rec_io_add(struct main_rec_io *rio, int fd) file_add(&rio->file); rio->timer.handler = rec_io_timer_expired; rio->timer.data = rio; + rio->start_read_hook.handler = rec_io_deferred_start_read; + rio->start_read_hook.data = rio; clist_init(&rio->idle_write_buffers); clist_init(&rio->busy_write_buffers); } @@ -47,7 +51,11 @@ rec_io_add(struct main_rec_io *rio, int fd) void rec_io_del(struct main_rec_io *rio) { + if (!rec_io_is_active(rio)) + return; + timer_del(&rio->timer); + hook_del(&rio->start_read_hook); file_del(&rio->file); if (rio->read_buf) @@ -65,6 +73,29 @@ rec_io_del(struct main_rec_io *rio) } } +static int +rec_io_process_read_buf(struct main_rec_io *rio) +{ + uns got; + while (rio->read_running && (got = rio->read_handler(rio))) + { + DBG("RIO READ: Ate %u bytes", got); + if (got == ~0U) + return HOOK_IDLE; + rio->read_rec_start += got; + rio->read_avail -= got; + rio->read_prev_avail = 0; + if (!rio->read_avail) + { + DBG("RIO READ: Resetting buffer"); + rio->read_rec_start = rio->read_buf; + break; + } + } + DBG("RIO READ: Want more"); + return (rio->read_running ? HOOK_RETRY : HOOK_IDLE); +} + static int rec_io_read_handler(struct main_file *fi) { @@ -73,7 +104,7 @@ rec_io_read_handler(struct main_file *fi) if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max) { rec_io_stop_read(rio); - rio->notify_handler(rio, RIO_ERR_READ_RECORD_TOO_LARGE); + rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE); return HOOK_IDLE; } @@ -84,7 +115,6 @@ restart: ; DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u", rec_start_pos, rio->read_avail, rio->read_prev_avail, free_space, rio->read_buf_size); - // FIXME: Constants? if (free_space <= rio->read_buf_size/8) { if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2) @@ -120,68 +150,99 @@ restart: ; { DBG("RIO READ: Signalling EOF"); rec_io_stop_read(rio); - rio->notify_handler(rio, RIO_ERR_READ_EOF); + rio->notify_handler(rio, RIO_EVENT_EOF); return HOOK_IDLE; } rio->read_prev_avail = rio->read_avail; rio->read_avail += l; DBG("RIO READ: Available: %u bytes", rio->read_avail); - uns got; - while (rio->read_running && (got = rio->read_handler(rio))) - { - DBG("RIO READ: Ate %u bytes", got); - rio->read_rec_start += got; - rio->read_avail -= got; - rio->read_prev_avail = 0; - if (!rio->read_avail) - { - DBG("RIO READ: Resetting buffer"); - rio->read_rec_start = rio->read_buf; - break; - } - } - DBG("RIO READ: Want more"); - return (rio->read_running ? HOOK_RETRY : HOOK_IDLE); + return rec_io_process_read_buf(rio); } -void -rec_io_start_read(struct main_rec_io *rio) +static int +rec_io_deferred_start_read(struct main_hook *ho) { - ASSERT(clist_is_linked(&rio->file.n)); - if (rio->read_running) - return; + struct main_rec_io *rio = ho->data; + + DBG("RIO: Starting reading"); if (!rio->read_buf) { if (!rio->read_buf_size) rio->read_buf_size = 256; rio->read_buf = xmalloc(rio->read_buf_size); - DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size); + DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size); rio->read_rec_start = rio->read_buf; } + rio->file.read_handler = rec_io_read_handler; file_chg(&rio->file); + hook_del(ho); rio->read_running = 1; - DBG("RIO: Reading started"); + + rio->read_prev_avail = 0; + return rec_io_process_read_buf(rio); +} + +static void +rec_io_recalc_read(struct main_rec_io *rio) +{ + uns flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read; + uns run = rio->read_started && flow; + DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run); + if (run != rio->read_running) + { + if (run) + { + /* + * Since we need to rescan the read buffer for leftover records and we + * can be deep in the call stack at this moment, we better defer most + * of the work to a main_hook, which will be called in the next iteration + * of the main loop. + */ + if (!hook_is_active(&rio->start_read_hook)) + { + DBG("RIO: Scheduling start of reading"); + hook_add(&rio->start_read_hook); + } + } + else + { + if (hook_is_active(&rio->start_read_hook)) + { + DBG("RIO: Descheduling start of reading"); + hook_del(&rio->start_read_hook); + } + rio->file.read_handler = NULL; + file_chg(&rio->file); + DBG("RIO: Reading stopped"); + rio->read_running = 0; + } + } +} + +void +rec_io_start_read(struct main_rec_io *rio) +{ + ASSERT(rec_io_is_active(rio)); + rio->read_started = 1; + rec_io_recalc_read(rio); } void rec_io_stop_read(struct main_rec_io *rio) { - ASSERT(clist_is_linked(&rio->file.n)); - if (!rio->read_running) - return; - rio->file.read_handler = NULL; - file_chg(&rio->file); - rio->read_running = 0; - DBG("RIO: Reading stopped"); + ASSERT(rec_io_is_active(rio)); + rio->read_started = 0; + rec_io_recalc_read(rio); } static void rec_io_stop_write(struct main_rec_io *rio) { DBG("RIO WRITE: Stopping write"); - ASSERT(!rio->write_watermark); + // XXX: When we are called after a write error, there might still + // be some data queued, but we need not care. rio->file.write_handler = NULL; file_chg(&rio->file); } @@ -223,6 +284,7 @@ rec_io_write_handler(struct main_file *fi) ret = HOOK_IDLE; rec_io_stop_write(rio); } + rec_io_recalc_read(rio); // Call the hook, but carefully, because it can delete the RIO structure if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE) @@ -251,7 +313,7 @@ void rec_io_write(struct main_rec_io *rio, void *data, uns len) { byte *bdata = data; - ASSERT(clist_is_linked(&rio->file.n)); + ASSERT(rec_io_is_active(rio)); if (!len) return; @@ -270,6 +332,7 @@ rec_io_write(struct main_rec_io *rio, void *data, uns len) len -= l; rio->write_watermark += l; DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark); + rec_io_recalc_read(rio); } if (!rio->file.write_handler) @@ -308,6 +371,12 @@ static uns rhand(struct main_rec_io *rio) { rio->read_rec_start[r-1] = 0; printf("Read <%s>\n", rio->read_rec_start); + if (rio->read_rec_start[0] == '!') + { + rec_io_del(rio); + main_shut_down(); + return ~0U; + } rec_io_set_timeout(rio, 10000); rio->read_rec_start[r-1] = '\n'; rec_io_write(rio, rio->read_rec_start, r); @@ -317,7 +386,7 @@ static uns rhand(struct main_rec_io *rio) static int ehand(struct main_rec_io *rio, int cause) { - if (cause < 0) + if (cause < 0 || cause == RIO_EVENT_EOF) { msg(L_ERROR, "Error %d", cause); rec_io_del(rio); @@ -343,6 +412,7 @@ main(void) rio.notify_handler = ehand; // rio.read_rec_max = 40; rio.write_buf_size = 4; + rio.write_throttle_read = 6; rec_io_add(&rio, 0); rec_io_start_read(&rio); rec_io_set_timeout(&rio, 10000); @@ -352,7 +422,7 @@ main(void) main_loop(); msg(L_INFO, "Finished."); - if (clist_is_linked(&rio.file.n)) + if (file_is_active(&rio.file)) rec_io_del(&rio); main_cleanup(); return 0;