2 * UCW Library -- Main Loop: Record I/O
4 * (c) 2011--2012 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
13 #include <ucw/mainloop.h>
29 rec_io_timer_expired(struct main_timer *tm)
31 struct main_rec_io *rio = tm->data;
32 timer_del(&rio->timer);
33 rio->notify_handler(rio, RIO_ERR_TIMEOUT);
36 static int rec_io_deferred_start_read(struct main_hook *ho);
39 rec_io_add(struct main_rec_io *rio, int fd)
43 rio->timer.handler = rec_io_timer_expired;
44 rio->timer.data = rio;
45 rio->start_read_hook.handler = rec_io_deferred_start_read;
46 rio->start_read_hook.data = rio;
47 clist_init(&rio->idle_write_buffers);
48 clist_init(&rio->busy_write_buffers);
52 rec_io_del(struct main_rec_io *rio)
54 if (!rec_io_is_active(rio))
57 timer_del(&rio->timer);
58 hook_del(&rio->start_read_hook);
63 DBG("RIO: Freeing read buffer");
69 while ((b = clist_remove_head(&rio->idle_write_buffers)) || (b = clist_remove_head(&rio->busy_write_buffers)))
71 DBG("RIO: Freeing write buffer");
77 rec_io_process_read_buf(struct main_rec_io *rio)
80 while (rio->read_running && (got = rio->read_handler(rio)))
82 DBG("RIO READ: Ate %u bytes", got);
85 rio->read_rec_start += got;
86 rio->read_avail -= got;
87 rio->read_prev_avail = 0;
90 DBG("RIO READ: Resetting buffer");
91 rio->read_rec_start = rio->read_buf;
95 DBG("RIO READ: Want more");
96 return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
100 rec_io_read_handler(struct main_file *fi)
102 struct main_rec_io *rio = (struct main_rec_io *) fi;
104 if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
106 rec_io_stop_read(rio);
107 rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
112 uint rec_start_pos = rio->read_rec_start - rio->read_buf;
113 uint rec_end_pos = rec_start_pos + rio->read_avail;
114 uint free_space = rio->read_buf_size - rec_end_pos;
115 DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
116 rec_start_pos, rio->read_avail, rio->read_prev_avail,
117 free_space, rio->read_buf_size);
118 if (free_space <= rio->read_buf_size/8)
120 if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
122 // Moving the partial record to the start of the buffer
123 DBG("RIO READ: Moving partial record to start");
124 memmove(rio->read_buf, rio->read_rec_start, rio->read_avail);
125 rio->read_rec_start = rio->read_buf;
129 DBG("RIO READ: Resizing buffer");
130 rio->read_buf_size *= 2;
131 rio->read_buf = xrealloc(rio->read_buf, rio->read_buf_size);
132 rio->read_rec_start = rio->read_buf + rec_start_pos;
137 int l = read(fi->fd, rio->read_buf + rec_end_pos, free_space);
138 DBG("RIO READ: Read %d bytes", l);
141 if (errno != EINTR && errno != EAGAIN)
143 DBG("RIO READ: Signalling error");
144 rec_io_stop_read(rio);
145 rio->notify_handler(rio, RIO_ERR_READ);
151 DBG("RIO READ: Signalling EOF");
152 rec_io_stop_read(rio);
153 rio->notify_handler(rio, RIO_EVENT_EOF);
156 rio->read_prev_avail = rio->read_avail;
157 rio->read_avail += l;
158 DBG("RIO READ: Available: %u bytes", rio->read_avail);
160 return rec_io_process_read_buf(rio);
164 rec_io_deferred_start_read(struct main_hook *ho)
166 struct main_rec_io *rio = ho->data;
168 DBG("RIO: Starting reading");
171 if (!rio->read_buf_size)
172 rio->read_buf_size = 256;
173 rio->read_buf = xmalloc(rio->read_buf_size);
174 DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size);
175 rio->read_rec_start = rio->read_buf;
178 rio->file.read_handler = rec_io_read_handler;
179 file_chg(&rio->file);
181 rio->read_running = 1;
183 rio->read_prev_avail = 0;
184 return rec_io_process_read_buf(rio);
188 rec_io_recalc_read(struct main_rec_io *rio)
190 uint flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
191 uint run = rio->read_started && flow;
192 DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
193 if (run != rio->read_running)
198 * Since we need to rescan the read buffer for leftover records and we
199 * can be deep in the call stack at this moment, we better defer most
200 * of the work to a main_hook, which will be called in the next iteration
203 if (!hook_is_active(&rio->start_read_hook))
205 DBG("RIO: Scheduling start of reading");
206 hook_add(&rio->start_read_hook);
211 if (hook_is_active(&rio->start_read_hook))
213 DBG("RIO: Descheduling start of reading");
214 hook_del(&rio->start_read_hook);
216 rio->file.read_handler = NULL;
217 file_chg(&rio->file);
218 DBG("RIO: Reading stopped");
219 rio->read_running = 0;
225 rec_io_start_read(struct main_rec_io *rio)
227 ASSERT(rec_io_is_active(rio));
228 rio->read_started = 1;
229 rec_io_recalc_read(rio);
233 rec_io_stop_read(struct main_rec_io *rio)
235 ASSERT(rec_io_is_active(rio));
236 rio->read_started = 0;
237 rec_io_recalc_read(rio);
241 rec_io_stop_write(struct main_rec_io *rio)
243 DBG("RIO WRITE: Stopping write");
244 // XXX: When we are called after a write error, there might still
245 // be some data queued, but we need not care.
246 rio->file.write_handler = NULL;
247 file_chg(&rio->file);
251 rec_io_write_handler(struct main_file *fi)
253 struct main_rec_io *rio = (struct main_rec_io *) fi;
254 struct rio_buffer *b = clist_head(&rio->busy_write_buffers);
257 rec_io_stop_write(rio);
261 int l = write(fi->fd, b->buf + b->written, b->full - b->written);
262 DBG("RIO WRITE: Written %d bytes", l);
265 if (errno != EINTR && errno != EAGAIN)
267 rec_io_stop_write(rio);
268 rio->notify_handler(rio, RIO_ERR_WRITE);
273 if (b->written == b->full)
275 DBG("RIO WRITE: Written full buffer");
277 clist_add_tail(&rio->idle_write_buffers, &b->n);
280 rio->write_watermark -= l;
281 int ret = HOOK_RETRY;
282 if (!rio->write_watermark)
285 rec_io_stop_write(rio);
287 rec_io_recalc_read(rio);
289 // Call the hook, but carefully, because it can delete the RIO structure
290 if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
295 static struct rio_buffer *
296 rec_io_get_buffer(struct main_rec_io *rio)
298 struct rio_buffer *b = clist_remove_tail(&rio->idle_write_buffers);
300 DBG("RIO WRITE: Recycled old buffer");
303 if (!rio->write_buf_size)
304 rio->write_buf_size = 1024;
305 b = xmalloc(sizeof(struct rio_buffer) + rio->write_buf_size);
306 DBG("RIO WRITE: Allocated new buffer");
308 b->full = b->written = 0;
313 rec_io_write(struct main_rec_io *rio, void *data, uint len)
316 ASSERT(rec_io_is_active(rio));
322 struct rio_buffer *b = clist_tail(&rio->busy_write_buffers);
323 if (!b || b->full >= rio->write_buf_size)
325 b = rec_io_get_buffer(rio);
326 clist_add_tail(&rio->busy_write_buffers, &b->n);
328 uint l = MIN(len, rio->write_buf_size - b->full);
329 memcpy(b->buf + b->full, bdata, l);
333 rio->write_watermark += l;
334 DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
335 rec_io_recalc_read(rio);
338 if (!rio->file.write_handler)
340 DBG("RIO WRITE: Starting write");
341 rio->file.write_handler = rec_io_write_handler;
342 file_chg(&rio->file);
347 rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta)
349 DBG("RIO: Setting timeout %u", (uint) expires_delta);
351 timer_del(&rio->timer);
353 timer_add_rel(&rio->timer, expires_delta);
357 rec_io_parse_line(struct main_rec_io *rio)
359 for (uint i = rio->read_prev_avail; i < rio->read_avail; i++)
360 if (rio->read_rec_start[i] == '\n')
367 static uint rhand(struct main_rec_io *rio)
369 uint r = rec_io_parse_line(rio);
372 rio->read_rec_start[r-1] = 0;
373 printf("Read <%s>\n", rio->read_rec_start);
374 if (rio->read_rec_start[0] == '!')
380 rec_io_set_timeout(rio, 10000);
381 rio->read_rec_start[r-1] = '\n';
382 rec_io_write(rio, rio->read_rec_start, r);
387 static int ehand(struct main_rec_io *rio, int cause)
389 if (cause < 0 || cause == RIO_EVENT_EOF)
391 msg(L_ERROR, "Error %d", cause);
398 msg(L_INFO, "Event %d", cause);
409 struct main_rec_io rio = {};
410 rio.read_buf_size = 4;
411 rio.read_handler = rhand;
412 rio.notify_handler = ehand;
413 // rio.read_rec_max = 40;
414 rio.write_buf_size = 4;
415 rio.write_throttle_read = 6;
417 rec_io_start_read(&rio);
418 rec_io_set_timeout(&rio, 10000);
423 msg(L_INFO, "Finished.");
425 if (file_is_active(&rio.file))