rio->notify_handler(rio, RIO_ERR_TIMEOUT);
}
+static int rec_io_deferred_start_read(struct main_hook *ho);
+
void
rec_io_add(struct main_rec_io *rio, int fd)
{
file_add(&rio->file);
rio->timer.handler = rec_io_timer_expired;
rio->timer.data = rio;
+ rio->start_read_hook.handler = rec_io_deferred_start_read;
+ rio->start_read_hook.data = rio;
clist_init(&rio->idle_write_buffers);
clist_init(&rio->busy_write_buffers);
}
rec_io_del(struct main_rec_io *rio)
{
timer_del(&rio->timer);
+ if (clist_is_linked(&rio->start_read_hook.n))
+ hook_del(&rio->start_read_hook);
file_del(&rio->file);
if (rio->read_buf)
}
}
+static int
+rec_io_process_read_buf(struct main_rec_io *rio)
+{
+ uns got;
+ while (rio->read_running && (got = rio->read_handler(rio)))
+ {
+ DBG("RIO READ: Ate %u bytes", got);
+ if (got == ~0U)
+ return HOOK_IDLE;
+ rio->read_rec_start += got;
+ rio->read_avail -= got;
+ rio->read_prev_avail = 0;
+ if (!rio->read_avail)
+ {
+ DBG("RIO READ: Resetting buffer");
+ rio->read_rec_start = rio->read_buf;
+ break;
+ }
+ }
+ DBG("RIO READ: Want more");
+ return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
+}
+
static int
rec_io_read_handler(struct main_file *fi)
{
rio->read_avail += l;
DBG("RIO READ: Available: %u bytes", rio->read_avail);
- uns got;
- while (rio->read_running && (got = rio->read_handler(rio)))
+ return rec_io_process_read_buf(rio);
+}
+
+static int
+rec_io_deferred_start_read(struct main_hook *ho)
+{
+ struct main_rec_io *rio = ho->data;
+
+ DBG("RIO: Starting reading");
+ if (!rio->read_buf)
{
- DBG("RIO READ: Ate %u bytes", got);
- if (got == ~0U)
- return HOOK_IDLE;
- rio->read_rec_start += got;
- rio->read_avail -= got;
- rio->read_prev_avail = 0;
- if (!rio->read_avail)
- {
- DBG("RIO READ: Resetting buffer");
- rio->read_rec_start = rio->read_buf;
- break;
- }
+ if (!rio->read_buf_size)
+ rio->read_buf_size = 256;
+ rio->read_buf = xmalloc(rio->read_buf_size);
+ DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size);
+ rio->read_rec_start = rio->read_buf;
}
- DBG("RIO READ: Want more");
- return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
+
+ rio->file.read_handler = rec_io_read_handler;
+ file_chg(&rio->file);
+ hook_del(ho);
+ rio->read_running = 1;
+
+ rio->read_prev_avail = 0;
+ return rec_io_process_read_buf(rio);
}
static void
{
if (run)
{
- if (!rio->read_buf)
+ /*
+ * Since we need to rescan the read buffer for leftover records and we
+ * can be deep in the call stack at this moment, we better defer most
+ * of the work to a main_hook, which will be called in the next iteration
+ * of the main loop.
+ */
+ if (!clist_is_linked(&rio->start_read_hook.n))
{
- if (!rio->read_buf_size)
- rio->read_buf_size = 256;
- rio->read_buf = xmalloc(rio->read_buf_size);
- DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
- rio->read_rec_start = rio->read_buf;
+ DBG("RIO: Scheduling start of reading");
+ hook_add(&rio->start_read_hook);
}
- rio->file.read_handler = rec_io_read_handler;
- file_chg(&rio->file);
- DBG("RIO: Reading started");
}
else
{
+ if (clist_is_linked(&rio->start_read_hook.n))
+ {
+ DBG("RIO: Descheduling start of reading");
+ hook_del(&rio->start_read_hook);
+ }
rio->file.read_handler = NULL;
file_chg(&rio->file);
DBG("RIO: Reading stopped");
+ rio->read_running = 0;
}
- rio->read_running = run;
}
}
int (*notify_handler)(struct main_rec_io *rio, int status); /* [*] Called to notify about errors and other events */
/* Returns either HOOK_RETRY or HOOK_IDLE. */
struct main_timer timer;
+ struct main_hook start_read_hook; /* Used internally to defer rec_io_start_read() */
void *data; /* [*] Data for use by the handlers */
};
/** Deactivate a record I/O structure. **/
void rec_io_del(struct main_rec_io *rio);
-/** Start reading. **/
+/**
+ * Start reading.
+ *
+ * When there were some data in the buffer (e.g., because @rec_io_stop_read()
+ * was called from the `read_handler`), it is processed as if it were read
+ * from the file once again. That is, `read_prev_avail` is reset to 0 and
+ * the `read_handler` is called to process all buffered data.
+ ***/
void rec_io_start_read(struct main_rec_io *rio);
/** Stop reading. **/