if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
{
rec_io_stop_read(rio);
- rio->notify_handler(rio, RIO_ERR_READ_RECORD_TOO_LARGE);
+ rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
return HOOK_IDLE;
}
{
DBG("RIO READ: Signalling EOF");
rec_io_stop_read(rio);
- rio->notify_handler(rio, RIO_ERR_READ_EOF);
+ rio->notify_handler(rio, RIO_EVENT_EOF);
return HOOK_IDLE;
}
rio->read_prev_avail = rio->read_avail;
return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
}
+static void
+rec_io_recalc_read(struct main_rec_io *rio)
+{
+ uns flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
+ uns run = rio->read_started && flow;
+ DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
+ if (run != rio->read_running)
+ {
+ if (run)
+ {
+ if (!rio->read_buf)
+ {
+ if (!rio->read_buf_size)
+ rio->read_buf_size = 256;
+ rio->read_buf = xmalloc(rio->read_buf_size);
+ DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
+ rio->read_rec_start = rio->read_buf;
+ }
+ rio->file.read_handler = rec_io_read_handler;
+ file_chg(&rio->file);
+ DBG("RIO: Reading started");
+ }
+ else
+ {
+ rio->file.read_handler = NULL;
+ file_chg(&rio->file);
+ DBG("RIO: Reading stopped");
+ }
+ rio->read_running = run;
+ }
+}
+
void
rec_io_start_read(struct main_rec_io *rio)
{
ASSERT(clist_is_linked(&rio->file.n));
- if (rio->read_running)
- return;
- if (!rio->read_buf)
- {
- if (!rio->read_buf_size)
- rio->read_buf_size = 256;
- rio->read_buf = xmalloc(rio->read_buf_size);
- DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
- rio->read_rec_start = rio->read_buf;
- }
- rio->file.read_handler = rec_io_read_handler;
- file_chg(&rio->file);
- rio->read_running = 1;
- DBG("RIO: Reading started");
+ rio->read_started = 1;
+ rec_io_recalc_read(rio);
}
void
rec_io_stop_read(struct main_rec_io *rio)
{
ASSERT(clist_is_linked(&rio->file.n));
- if (!rio->read_running)
- return;
- rio->file.read_handler = NULL;
- file_chg(&rio->file);
- rio->read_running = 0;
- DBG("RIO: Reading stopped");
+ rio->read_started = 0;
+ rec_io_recalc_read(rio);
}
static void
ret = HOOK_IDLE;
rec_io_stop_write(rio);
}
+ rec_io_recalc_read(rio);
// Call the hook, but carefully, because it can delete the RIO structure
if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
len -= l;
rio->write_watermark += l;
DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
+ rec_io_recalc_read(rio);
}
if (!rio->file.write_handler)
static int ehand(struct main_rec_io *rio, int cause)
{
- if (cause < 0)
+ if (cause < 0 || cause == RIO_EVENT_EOF)
{
msg(L_ERROR, "Error %d", cause);
rec_io_del(rio);
rio.notify_handler = ehand;
// rio.read_rec_max = 40;
rio.write_buf_size = 4;
+ rio.write_throttle_read = 6;
rec_io_add(&rio, 0);
rec_io_start_read(&rio);
rec_io_set_timeout(&rio, 10000);
/** The record I/O structure. **/
struct main_rec_io {
struct main_file file;
- byte *read_buf; /* Reading half */
+ byte *read_buf;
byte *read_rec_start; /* [*] Start of current record */
uns read_avail; /* [*] How much data is available */
- uns read_prev_avail; /* [*] How much data was available in previous read_done */
- uns read_buf_size; /* [*] Buffer size allocated (can set before rec_io_add()) */
- uns read_running; /* Reading requested */
+ uns read_prev_avail; /* [*] How much data was available in previous read_handler */
+ uns read_buf_size; /* [*] Read buffer size allocated (can be set before rec_io_add()) */
+ uns read_started; /* Reading requested by user */
+ uns read_running; /* Reading really runs (read_started && not stopped by write_throttle_read) */
uns read_rec_max; /* [*] Maximum record size (0=unlimited) */
clist busy_write_buffers;
clist idle_write_buffers;
- uns write_buf_size;
- uns write_watermark;
- uns write_throttle;
- uns (*read_handler)(struct main_rec_io *rio); /* [*] FIXME; describe EOF */
- // FIXME: returns...
- int (*notify_handler)(struct main_rec_io *rio, int status); /* [*] Handler to call on errors */
+ uns write_buf_size; /* [*] Write buffer size allocated (can be set before rec_io_add()) */
+ uns write_watermark; /* [*] How many data are waiting to be written */
+ uns write_throttle_read; /* [*] If more than write_throttle_read bytes are buffered, stop reading; 0=no stopping */
+ uns (*read_handler)(struct main_rec_io *rio); /* [*] Called whenever more bytes are read; returns 0 (want more) or number of bytes eaten */
+ int (*notify_handler)(struct main_rec_io *rio, int status); /* [*] Called to notify about errors and other events */
+ /* Returns either HOOK_RETRY or HOOK_IDLE. */
struct main_timer timer;
void *data; /* [*] Data for use by the handlers */
};
/** Deactivate a record I/O structure. **/
void rec_io_del(struct main_rec_io *rio);
+/** Start reading. **/
void rec_io_start_read(struct main_rec_io *rio);
+
+/** Stop reading. **/
void rec_io_stop_read(struct main_rec_io *rio);
-void rec_io_set_timeout(struct main_rec_io *bio, timestamp_t expires_delta);
-uns rec_io_parse_line(struct main_rec_io *rio);
+/** Analogous to @block_io_set_timeout(). **/
+void rec_io_set_timeout(struct main_rec_io *bio, timestamp_t expires_delta);
void rec_io_write(struct main_rec_io *rio, void *data, uns len);
-// All errors except timeout are fatal
+/**
+ * An auxiliary function used for parsing of lines. When called in the @read_handler,
+ * it searches for the end of line character. When a complete line is found, the length
+ * of the line (including the end of line character) is returned. Otherwise, it returns zero.
+ **/
+uns rec_io_parse_line(struct main_rec_io *rio);
+
+/**
+ * Specifies what kind of error or other event happened, when the @notify_handler
+ * is called. In case of I/O errors, `errno` is still set.
+ *
+ * Upon @RIO_ERR_READ, @RIO_ERR_RECORD_TOO_LARGE and @RIO_EVENT_EOF, reading is stopped
+ * automatically. Upon @RIO_ERR_WRITE, writing is stopped. Upon @RIO_ERR_TIMEOUT, only the
+ * timer is deactivated.
+ *
+ * In all cases, the notification handler is allowed to call @rec_io_del(), but it
+ * must return @HOOK_IDLE in such cases.
+ **/
enum rec_io_notify_status {
- RIO_ERR_READ = -1,
- RIO_ERR_WRITE = -2,
- RIO_ERR_TIMEOUT = -3,
- RIO_ERR_READ_RECORD_TOO_LARGE = -4,
- RIO_ERR_READ_EOF = -5,
- RIO_EVENT_ALL_WRITTEN = 1,
- RIO_EVENT_PART_WRITTEN = 2,
+ RIO_ERR_READ = -1, /* read() returned an error, errno set */
+ RIO_ERR_WRITE = -2, /* write() returned an error, errno set */
+ RIO_ERR_TIMEOUT = -3, /* A timeout has occurred */
+ RIO_ERR_RECORD_TOO_LARGE = -4, /* Read: read_rec_max has been exceeded */
+ RIO_EVENT_ALL_WRITTEN = 1, /* All buffered data has been written */
+ RIO_EVENT_PART_WRITTEN = 2, /* Some buffered data has been written, but more remains */
+ RIO_EVENT_EOF = 3, /* Read: EOF seen */
};
/***