/*
* UCW Library -- Main Loop: Record I/O
*
- * (c) 2011 Martin Mares <mj@ucw.cz>
+ * (c) 2011--2012 Martin Mares <mj@ucw.cz>
*
* This software may be freely distributed and used according to the terms
* of the GNU Lesser General Public License.
#undef LOCAL_DEBUG
-#include "ucw/lib.h"
-#include "ucw/mainloop.h"
+#include <ucw/lib.h>
+#include <ucw/mainloop.h>
#include <stdio.h>
#include <string.h>
rio->notify_handler(rio, RIO_ERR_TIMEOUT);
}
+static int rec_io_deferred_start_read(struct main_hook *ho);
+
void
rec_io_add(struct main_rec_io *rio, int fd)
{
file_add(&rio->file);
rio->timer.handler = rec_io_timer_expired;
rio->timer.data = rio;
+ rio->start_read_hook.handler = rec_io_deferred_start_read;
+ rio->start_read_hook.data = rio;
clist_init(&rio->idle_write_buffers);
clist_init(&rio->busy_write_buffers);
}
void
rec_io_del(struct main_rec_io *rio)
{
+ if (!rec_io_is_active(rio))
+ return;
+
timer_del(&rio->timer);
+ hook_del(&rio->start_read_hook);
file_del(&rio->file);
if (rio->read_buf)
}
}
+static int
+rec_io_process_read_buf(struct main_rec_io *rio)
+{
+ uns got;
+ while (rio->read_running && (got = rio->read_handler(rio)))
+ {
+ DBG("RIO READ: Ate %u bytes", got);
+ if (got == ~0U)
+ return HOOK_IDLE;
+ rio->read_rec_start += got;
+ rio->read_avail -= got;
+ rio->read_prev_avail = 0;
+ if (!rio->read_avail)
+ {
+ DBG("RIO READ: Resetting buffer");
+ rio->read_rec_start = rio->read_buf;
+ break;
+ }
+ }
+ DBG("RIO READ: Want more");
+ return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
+}
+
static int
rec_io_read_handler(struct main_file *fi)
{
if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
{
rec_io_stop_read(rio);
- rio->notify_handler(rio, RIO_ERR_READ_RECORD_TOO_LARGE);
+ rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
return HOOK_IDLE;
}
DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
rec_start_pos, rio->read_avail, rio->read_prev_avail,
free_space, rio->read_buf_size);
- // FIXME: Constants?
if (free_space <= rio->read_buf_size/8)
{
if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
{
DBG("RIO READ: Signalling EOF");
rec_io_stop_read(rio);
- rio->notify_handler(rio, RIO_ERR_READ_EOF);
+ rio->notify_handler(rio, RIO_EVENT_EOF);
return HOOK_IDLE;
}
rio->read_prev_avail = rio->read_avail;
rio->read_avail += l;
DBG("RIO READ: Available: %u bytes", rio->read_avail);
- uns got;
- while (rio->read_running && (got = rio->read_handler(rio)))
- {
- DBG("RIO READ: Ate %u bytes", got);
- rio->read_rec_start += got;
- rio->read_avail -= got;
- rio->read_prev_avail = 0;
- if (!rio->read_avail)
- {
- DBG("RIO READ: Resetting buffer");
- rio->read_rec_start = rio->read_buf;
- break;
- }
- }
- DBG("RIO READ: Want more");
- return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
+ return rec_io_process_read_buf(rio);
}
-void
-rec_io_start_read(struct main_rec_io *rio)
+static int
+rec_io_deferred_start_read(struct main_hook *ho)
{
- ASSERT(clist_is_linked(&rio->file.n));
- if (rio->read_running)
- return;
+ struct main_rec_io *rio = ho->data;
+
+ DBG("RIO: Starting reading");
if (!rio->read_buf)
{
if (!rio->read_buf_size)
rio->read_buf_size = 256;
rio->read_buf = xmalloc(rio->read_buf_size);
- DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
+ DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size);
rio->read_rec_start = rio->read_buf;
}
+
rio->file.read_handler = rec_io_read_handler;
file_chg(&rio->file);
+ hook_del(ho);
rio->read_running = 1;
- DBG("RIO: Reading started");
+
+ rio->read_prev_avail = 0;
+ return rec_io_process_read_buf(rio);
+}
+
+static void
+rec_io_recalc_read(struct main_rec_io *rio)
+{
+ uns flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
+ uns run = rio->read_started && flow;
+ DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
+ if (run != rio->read_running)
+ {
+ if (run)
+ {
+ /*
+ * Since we need to rescan the read buffer for leftover records and we
+ * can be deep in the call stack at this moment, we better defer most
+ * of the work to a main_hook, which will be called in the next iteration
+ * of the main loop.
+ */
+ if (!hook_is_active(&rio->start_read_hook))
+ {
+ DBG("RIO: Scheduling start of reading");
+ hook_add(&rio->start_read_hook);
+ }
+ }
+ else
+ {
+ if (hook_is_active(&rio->start_read_hook))
+ {
+ DBG("RIO: Descheduling start of reading");
+ hook_del(&rio->start_read_hook);
+ }
+ rio->file.read_handler = NULL;
+ file_chg(&rio->file);
+ DBG("RIO: Reading stopped");
+ rio->read_running = 0;
+ }
+ }
+}
+
+void
+rec_io_start_read(struct main_rec_io *rio)
+{
+ ASSERT(rec_io_is_active(rio));
+ rio->read_started = 1;
+ rec_io_recalc_read(rio);
}
void
rec_io_stop_read(struct main_rec_io *rio)
{
- ASSERT(clist_is_linked(&rio->file.n));
- if (!rio->read_running)
- return;
- rio->file.read_handler = NULL;
- file_chg(&rio->file);
- rio->read_running = 0;
- DBG("RIO: Reading stopped");
+ ASSERT(rec_io_is_active(rio));
+ rio->read_started = 0;
+ rec_io_recalc_read(rio);
}
static void
rec_io_stop_write(struct main_rec_io *rio)
{
DBG("RIO WRITE: Stopping write");
- ASSERT(!rio->write_watermark);
+ // XXX: When we are called after a write error, there might still
+ // be some data queued, but we need not care.
rio->file.write_handler = NULL;
file_chg(&rio->file);
}
ret = HOOK_IDLE;
rec_io_stop_write(rio);
}
+ rec_io_recalc_read(rio);
// Call the hook, but carefully, because it can delete the RIO structure
if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
rec_io_write(struct main_rec_io *rio, void *data, uns len)
{
byte *bdata = data;
- ASSERT(clist_is_linked(&rio->file.n));
+ ASSERT(rec_io_is_active(rio));
if (!len)
return;
len -= l;
rio->write_watermark += l;
DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
+ rec_io_recalc_read(rio);
}
if (!rio->file.write_handler)
{
rio->read_rec_start[r-1] = 0;
printf("Read <%s>\n", rio->read_rec_start);
+ if (rio->read_rec_start[0] == '!')
+ {
+ rec_io_del(rio);
+ main_shut_down();
+ return ~0U;
+ }
rec_io_set_timeout(rio, 10000);
rio->read_rec_start[r-1] = '\n';
rec_io_write(rio, rio->read_rec_start, r);
static int ehand(struct main_rec_io *rio, int cause)
{
- if (cause < 0)
+ if (cause < 0 || cause == RIO_EVENT_EOF)
{
msg(L_ERROR, "Error %d", cause);
rec_io_del(rio);
rio.notify_handler = ehand;
// rio.read_rec_max = 40;
rio.write_buf_size = 4;
+ rio.write_throttle_read = 6;
rec_io_add(&rio, 0);
rec_io_start_read(&rio);
rec_io_set_timeout(&rio, 10000);
main_loop();
msg(L_INFO, "Finished.");
- if (clist_is_linked(&rio.file.n))
+ if (file_is_active(&rio.file))
rec_io_del(&rio);
main_cleanup();
return 0;