2 * UCW Library -- Main Loop: Record I/O
4 * (c) 2011--2012 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
13 #include <ucw/mainloop.h>
29 rec_io_timer_expired(struct main_timer *tm)
31 struct main_rec_io *rio = tm->data;
32 timer_del(&rio->timer);
33 rio->notify_handler(rio, RIO_ERR_TIMEOUT);
36 static int rec_io_deferred_start_read(struct main_hook *ho);
39 rec_io_add(struct main_rec_io *rio, int fd)
43 rio->timer.handler = rec_io_timer_expired;
44 rio->timer.data = rio;
45 rio->start_read_hook.handler = rec_io_deferred_start_read;
46 rio->start_read_hook.data = rio;
47 clist_init(&rio->idle_write_buffers);
48 clist_init(&rio->busy_write_buffers);
52 rec_io_del(struct main_rec_io *rio)
54 if (!rec_io_is_active(rio))
57 timer_del(&rio->timer);
58 hook_del(&rio->start_read_hook);
61 rio->file.read_handler = NULL;
62 rio->read_started = rio->read_running = 0;
63 rio->read_avail = rio->read_prev_avail = 0;
66 DBG("RIO: Freeing read buffer");
68 rio->read_buf = rio->read_rec_start = NULL;
71 rio->file.write_handler = NULL;
72 rio->write_watermark = 0;
74 while ((b = clist_remove_head(&rio->idle_write_buffers)) || (b = clist_remove_head(&rio->busy_write_buffers)))
76 DBG("RIO: Freeing write buffer");
82 rec_io_process_read_buf(struct main_rec_io *rio)
85 ASSERT(rio->read_prev_avail < rio->read_avail);
86 while (rio->read_running && (got = rio->read_handler(rio)))
88 DBG("RIO READ: Ate %u bytes", got);
91 ASSERT(got <= rio->read_avail);
92 rio->read_rec_start += got;
93 rio->read_avail -= got;
94 rio->read_prev_avail = 0;
97 DBG("RIO READ: Resetting buffer");
98 rio->read_rec_start = rio->read_buf;
102 DBG("RIO READ: Want more");
103 return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
107 rec_io_read_handler(struct main_file *fi)
109 struct main_rec_io *rio = (struct main_rec_io *) fi;
111 if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
113 rec_io_stop_read(rio);
114 rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
119 uint rec_start_pos = rio->read_rec_start - rio->read_buf;
120 uint rec_end_pos = rec_start_pos + rio->read_avail;
121 uint free_space = rio->read_buf_size - rec_end_pos;
122 DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
123 rec_start_pos, rio->read_avail, rio->read_prev_avail,
124 free_space, rio->read_buf_size);
125 if (free_space <= rio->read_buf_size/8)
127 if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
129 // Moving the partial record to the start of the buffer
130 DBG("RIO READ: Moving partial record to start");
131 memmove(rio->read_buf, rio->read_rec_start, rio->read_avail);
132 rio->read_rec_start = rio->read_buf;
136 DBG("RIO READ: Resizing buffer");
137 rio->read_buf_size *= 2;
138 rio->read_buf = xrealloc(rio->read_buf, rio->read_buf_size);
139 rio->read_rec_start = rio->read_buf + rec_start_pos;
144 int l = read(fi->fd, rio->read_buf + rec_end_pos, free_space);
145 DBG("RIO READ: Read %d bytes", l);
148 if (errno != EINTR && errno != EAGAIN)
150 DBG("RIO READ: Signalling error");
151 rec_io_stop_read(rio);
152 rio->notify_handler(rio, RIO_ERR_READ);
158 DBG("RIO READ: Signalling EOF");
159 rec_io_stop_read(rio);
160 rio->notify_handler(rio, RIO_EVENT_EOF);
163 rio->read_prev_avail = rio->read_avail;
164 rio->read_avail += l;
165 DBG("RIO READ: Available: %u bytes", rio->read_avail);
167 return rec_io_process_read_buf(rio);
171 rec_io_deferred_start_read(struct main_hook *ho)
173 struct main_rec_io *rio = ho->data;
175 DBG("RIO: Starting reading");
178 if (!rio->read_buf_size)
179 rio->read_buf_size = 256;
180 rio->read_buf = xmalloc(rio->read_buf_size);
181 DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size);
182 rio->read_rec_start = rio->read_buf;
185 rio->file.read_handler = rec_io_read_handler;
186 file_chg(&rio->file);
188 rio->read_running = 1;
190 rio->read_prev_avail = 0;
192 rec_io_process_read_buf(rio);
198 rec_io_recalc_read(struct main_rec_io *rio)
200 uint flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
201 uint run = rio->read_started && flow;
202 DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
203 if (run != rio->read_running)
208 * Since we need to rescan the read buffer for leftover records and we
209 * can be deep in the call stack at this moment, we better defer most
210 * of the work to a main_hook, which will be called in the next iteration
213 if (!hook_is_active(&rio->start_read_hook))
215 DBG("RIO: Scheduling start of reading");
216 hook_add(&rio->start_read_hook);
221 if (hook_is_active(&rio->start_read_hook))
223 DBG("RIO: Descheduling start of reading");
224 hook_del(&rio->start_read_hook);
226 rio->file.read_handler = NULL;
227 file_chg(&rio->file);
228 DBG("RIO: Reading stopped");
229 rio->read_running = 0;
235 rec_io_start_read(struct main_rec_io *rio)
237 ASSERT(rec_io_is_active(rio));
238 rio->read_started = 1;
239 rec_io_recalc_read(rio);
243 rec_io_stop_read(struct main_rec_io *rio)
245 ASSERT(rec_io_is_active(rio));
246 rio->read_started = 0;
247 rec_io_recalc_read(rio);
251 rec_io_stop_write(struct main_rec_io *rio)
253 DBG("RIO WRITE: Stopping write");
254 // XXX: When we are called after a write error, there might still
255 // be some data queued, but we need not care.
256 rio->file.write_handler = NULL;
257 file_chg(&rio->file);
261 rec_io_write_handler(struct main_file *fi)
263 struct main_rec_io *rio = (struct main_rec_io *) fi;
264 struct rio_buffer *b = clist_head(&rio->busy_write_buffers);
267 rec_io_stop_write(rio);
271 int l = write(fi->fd, b->buf + b->written, b->full - b->written);
272 DBG("RIO WRITE: Written %d bytes", l);
275 if (errno != EINTR && errno != EAGAIN)
277 rec_io_stop_write(rio);
278 rio->notify_handler(rio, RIO_ERR_WRITE);
283 if (b->written == b->full)
285 DBG("RIO WRITE: Written full buffer");
287 clist_add_tail(&rio->idle_write_buffers, &b->n);
290 rio->write_watermark -= l;
291 int ret = HOOK_RETRY;
292 if (!rio->write_watermark)
295 rec_io_stop_write(rio);
297 rec_io_recalc_read(rio);
299 // Call the hook, but carefully, because it can delete the RIO structure
300 if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
305 static struct rio_buffer *
306 rec_io_get_buffer(struct main_rec_io *rio)
308 struct rio_buffer *b = clist_remove_tail(&rio->idle_write_buffers);
310 DBG("RIO WRITE: Recycled old buffer");
313 if (!rio->write_buf_size)
314 rio->write_buf_size = 1024;
315 b = xmalloc(sizeof(struct rio_buffer) + rio->write_buf_size);
316 DBG("RIO WRITE: Allocated new buffer");
318 b->full = b->written = 0;
323 rec_io_write(struct main_rec_io *rio, const void *data, uint len)
325 const byte *bdata = data;
326 ASSERT(rec_io_is_active(rio));
332 struct rio_buffer *b = clist_tail(&rio->busy_write_buffers);
333 if (!b || b->full >= rio->write_buf_size)
335 b = rec_io_get_buffer(rio);
336 clist_add_tail(&rio->busy_write_buffers, &b->n);
338 uint l = MIN(len, rio->write_buf_size - b->full);
339 memcpy(b->buf + b->full, bdata, l);
343 rio->write_watermark += l;
344 DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
345 rec_io_recalc_read(rio);
348 if (!rio->file.write_handler)
350 DBG("RIO WRITE: Starting write");
351 rio->file.write_handler = rec_io_write_handler;
352 file_chg(&rio->file);
357 rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta)
359 DBG("RIO: Setting timeout %u", (uint) expires_delta);
361 timer_del(&rio->timer);
363 timer_add_rel(&rio->timer, expires_delta);
367 rec_io_parse_line(struct main_rec_io *rio)
369 for (uint i = rio->read_prev_avail; i < rio->read_avail; i++)
370 if (rio->read_rec_start[i] == '\n')
377 static uint rhand(struct main_rec_io *rio)
379 uint r = rec_io_parse_line(rio);
382 rio->read_rec_start[r-1] = 0;
383 printf("Read <%s>\n", rio->read_rec_start);
384 if (rio->read_rec_start[0] == '!')
390 rec_io_set_timeout(rio, 10000);
391 rio->read_rec_start[r-1] = '\n';
392 rec_io_write(rio, rio->read_rec_start, r);
397 static int ehand(struct main_rec_io *rio, int cause)
399 if (cause < 0 || cause == RIO_EVENT_EOF)
401 msg(L_ERROR, "Error %d", cause);
408 msg(L_INFO, "Event %d", cause);
419 struct main_rec_io rio = {};
420 rio.read_buf_size = 4;
421 rio.read_handler = rhand;
422 rio.notify_handler = ehand;
423 // rio.read_rec_max = 40;
424 rio.write_buf_size = 4;
425 rio.write_throttle_read = 6;
427 rec_io_start_read(&rio);
428 rec_io_set_timeout(&rio, 10000);
433 msg(L_INFO, "Finished.");
435 if (file_is_active(&rio.file))