X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=ucw%2Fmain-rec.c;h=ca313187fdf905490bb498035bff01b7d5e7a1a5;hb=ae2b00416589dfe798fc40f0575f62a0c664798f;hp=3c4c0678b73fa9f783f771e970c0aaf70ca0b2f6;hpb=49160a46b98fed232d0de854ee224199a11df0e0;p=libucw.git diff --git a/ucw/main-rec.c b/ucw/main-rec.c index 3c4c0678..ca313187 100644 --- a/ucw/main-rec.c +++ b/ucw/main-rec.c @@ -1,7 +1,7 @@ /* * UCW Library -- Main Loop: Record I/O * - * (c) 2011 Martin Mares + * (c) 2011--2012 Martin Mares * * This software may be freely distributed and used according to the terms * of the GNU Lesser General Public License. @@ -9,8 +9,8 @@ #undef LOCAL_DEBUG -#include "ucw/lib.h" -#include "ucw/mainloop.h" +#include +#include #include #include @@ -33,6 +33,8 @@ rec_io_timer_expired(struct main_timer *tm) rio->notify_handler(rio, RIO_ERR_TIMEOUT); } +static int rec_io_deferred_start_read(struct main_hook *ho); + void rec_io_add(struct main_rec_io *rio, int fd) { @@ -40,6 +42,8 @@ rec_io_add(struct main_rec_io *rio, int fd) file_add(&rio->file); rio->timer.handler = rec_io_timer_expired; rio->timer.data = rio; + rio->start_read_hook.handler = rec_io_deferred_start_read; + rio->start_read_hook.data = rio; clist_init(&rio->idle_write_buffers); clist_init(&rio->busy_write_buffers); } @@ -47,7 +51,11 @@ rec_io_add(struct main_rec_io *rio, int fd) void rec_io_del(struct main_rec_io *rio) { + if (!rec_io_is_active(rio)) + return; + timer_del(&rio->timer); + hook_del(&rio->start_read_hook); file_del(&rio->file); if (rio->read_buf) @@ -65,6 +73,29 @@ rec_io_del(struct main_rec_io *rio) } } +static int +rec_io_process_read_buf(struct main_rec_io *rio) +{ + uns got; + while (rio->read_running && (got = rio->read_handler(rio))) + { + DBG("RIO READ: Ate %u bytes", got); + if (got == ~0U) + return HOOK_IDLE; + rio->read_rec_start += got; + rio->read_avail -= got; + rio->read_prev_avail = 0; + if (!rio->read_avail) + { + DBG("RIO READ: Resetting buffer"); + rio->read_rec_start = rio->read_buf; + break; + } + } + DBG("RIO READ: Want more"); + return (rio->read_running ? HOOK_RETRY : HOOK_IDLE); +} + static int rec_io_read_handler(struct main_file *fi) { @@ -84,7 +115,6 @@ restart: ; DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u", rec_start_pos, rio->read_avail, rio->read_prev_avail, free_space, rio->read_buf_size); - // FIXME: Constants? if (free_space <= rio->read_buf_size/8) { if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2) @@ -127,22 +157,31 @@ restart: ; rio->read_avail += l; DBG("RIO READ: Available: %u bytes", rio->read_avail); - uns got; - while (rio->read_running && (got = rio->read_handler(rio))) + return rec_io_process_read_buf(rio); +} + +static int +rec_io_deferred_start_read(struct main_hook *ho) +{ + struct main_rec_io *rio = ho->data; + + DBG("RIO: Starting reading"); + if (!rio->read_buf) { - DBG("RIO READ: Ate %u bytes", got); - rio->read_rec_start += got; - rio->read_avail -= got; - rio->read_prev_avail = 0; - if (!rio->read_avail) - { - DBG("RIO READ: Resetting buffer"); - rio->read_rec_start = rio->read_buf; - break; - } + if (!rio->read_buf_size) + rio->read_buf_size = 256; + rio->read_buf = xmalloc(rio->read_buf_size); + DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size); + rio->read_rec_start = rio->read_buf; } - DBG("RIO READ: Want more"); - return (rio->read_running ? HOOK_RETRY : HOOK_IDLE); + + rio->file.read_handler = rec_io_read_handler; + file_chg(&rio->file); + hook_del(ho); + rio->read_running = 1; + + rio->read_prev_avail = 0; + return rec_io_process_read_buf(rio); } static void @@ -155,32 +194,37 @@ rec_io_recalc_read(struct main_rec_io *rio) { if (run) { - if (!rio->read_buf) + /* + * Since we need to rescan the read buffer for leftover records and we + * can be deep in the call stack at this moment, we better defer most + * of the work to a main_hook, which will be called in the next iteration + * of the main loop. + */ + if (!hook_is_active(&rio->start_read_hook)) { - if (!rio->read_buf_size) - rio->read_buf_size = 256; - rio->read_buf = xmalloc(rio->read_buf_size); - DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size); - rio->read_rec_start = rio->read_buf; + DBG("RIO: Scheduling start of reading"); + hook_add(&rio->start_read_hook); } - rio->file.read_handler = rec_io_read_handler; - file_chg(&rio->file); - DBG("RIO: Reading started"); } else { + if (hook_is_active(&rio->start_read_hook)) + { + DBG("RIO: Descheduling start of reading"); + hook_del(&rio->start_read_hook); + } rio->file.read_handler = NULL; file_chg(&rio->file); DBG("RIO: Reading stopped"); + rio->read_running = 0; } - rio->read_running = run; } } void rec_io_start_read(struct main_rec_io *rio) { - ASSERT(clist_is_linked(&rio->file.n)); + ASSERT(rec_io_is_active(rio)); rio->read_started = 1; rec_io_recalc_read(rio); } @@ -188,7 +232,7 @@ rec_io_start_read(struct main_rec_io *rio) void rec_io_stop_read(struct main_rec_io *rio) { - ASSERT(clist_is_linked(&rio->file.n)); + ASSERT(rec_io_is_active(rio)); rio->read_started = 0; rec_io_recalc_read(rio); } @@ -197,7 +241,8 @@ static void rec_io_stop_write(struct main_rec_io *rio) { DBG("RIO WRITE: Stopping write"); - ASSERT(!rio->write_watermark); + // XXX: When we are called after a write error, there might still + // be some data queued, but we need not care. rio->file.write_handler = NULL; file_chg(&rio->file); } @@ -268,7 +313,7 @@ void rec_io_write(struct main_rec_io *rio, void *data, uns len) { byte *bdata = data; - ASSERT(clist_is_linked(&rio->file.n)); + ASSERT(rec_io_is_active(rio)); if (!len) return; @@ -326,6 +371,12 @@ static uns rhand(struct main_rec_io *rio) { rio->read_rec_start[r-1] = 0; printf("Read <%s>\n", rio->read_rec_start); + if (rio->read_rec_start[0] == '!') + { + rec_io_del(rio); + main_shut_down(); + return ~0U; + } rec_io_set_timeout(rio, 10000); rio->read_rec_start[r-1] = '\n'; rec_io_write(rio, rio->read_rec_start, r); @@ -371,7 +422,7 @@ main(void) main_loop(); msg(L_INFO, "Finished."); - if (clist_is_linked(&rio.file.n)) + if (file_is_active(&rio.file)) rec_io_del(&rio); main_cleanup(); return 0;