2 * UCW Library -- Main Loop: Record I/O
4 * (c) 2011 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
13 #include "ucw/mainloop.h"
29 rec_io_timer_expired(struct main_timer *tm)
31 struct main_rec_io *rio = tm->data;
32 timer_del(&rio->timer);
33 rio->notify_handler(rio, RIO_ERR_TIMEOUT);
37 rec_io_add(struct main_rec_io *rio, int fd)
41 rio->timer.handler = rec_io_timer_expired;
42 rio->timer.data = rio;
43 clist_init(&rio->idle_write_buffers);
44 clist_init(&rio->busy_write_buffers);
48 rec_io_del(struct main_rec_io *rio)
50 timer_del(&rio->timer);
55 DBG("RIO: Freeing read buffer");
61 while ((b = clist_remove_head(&rio->idle_write_buffers)) || (b = clist_remove_head(&rio->busy_write_buffers)))
63 DBG("RIO: Freeing write buffer");
69 rec_io_read_handler(struct main_file *fi)
71 struct main_rec_io *rio = (struct main_rec_io *) fi;
73 if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
75 rec_io_stop_read(rio);
76 rio->notify_handler(rio, RIO_ERR_READ_RECORD_TOO_LARGE);
81 uns rec_start_pos = rio->read_rec_start - rio->read_buf;
82 uns rec_end_pos = rec_start_pos + rio->read_avail;
83 uns free_space = rio->read_buf_size - rec_end_pos;
84 DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
85 rec_start_pos, rio->read_avail, rio->read_prev_avail,
86 free_space, rio->read_buf_size);
88 if (free_space <= rio->read_buf_size/8)
90 if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
92 // Moving the partial record to the start of the buffer
93 DBG("RIO READ: Moving partial record to start");
94 memmove(rio->read_buf, rio->read_rec_start, rio->read_avail);
95 rio->read_rec_start = rio->read_buf;
99 DBG("RIO READ: Resizing buffer");
100 rio->read_buf_size *= 2;
101 rio->read_buf = xrealloc(rio->read_buf, rio->read_buf_size);
102 rio->read_rec_start = rio->read_buf + rec_start_pos;
107 int l = read(fi->fd, rio->read_buf + rec_end_pos, free_space);
108 DBG("RIO READ: Read %d bytes", l);
111 if (errno != EINTR && errno != EAGAIN)
113 DBG("RIO READ: Signalling error");
114 rec_io_stop_read(rio);
115 rio->notify_handler(rio, RIO_ERR_READ);
121 DBG("RIO READ: Signalling EOF");
122 rec_io_stop_read(rio);
123 rio->notify_handler(rio, RIO_ERR_READ_EOF);
126 rio->read_prev_avail = rio->read_avail;
127 rio->read_avail += l;
128 DBG("RIO READ: Available: %u bytes", rio->read_avail);
131 while (rio->read_running && (got = rio->read_handler(rio)))
133 DBG("RIO READ: Ate %u bytes", got);
134 rio->read_rec_start += got;
135 rio->read_avail -= got;
136 rio->read_prev_avail = 0;
137 if (!rio->read_avail)
139 DBG("RIO READ: Resetting buffer");
140 rio->read_rec_start = rio->read_buf;
144 DBG("RIO READ: Want more");
145 return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
149 rec_io_start_read(struct main_rec_io *rio)
151 ASSERT(clist_is_linked(&rio->file.n));
152 if (rio->read_running)
156 if (!rio->read_buf_size)
157 rio->read_buf_size = 256;
158 rio->read_buf = xmalloc(rio->read_buf_size);
159 DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
160 rio->read_rec_start = rio->read_buf;
162 rio->file.read_handler = rec_io_read_handler;
163 file_chg(&rio->file);
164 rio->read_running = 1;
165 DBG("RIO: Reading started");
169 rec_io_stop_read(struct main_rec_io *rio)
171 ASSERT(clist_is_linked(&rio->file.n));
172 if (!rio->read_running)
174 rio->file.read_handler = NULL;
175 file_chg(&rio->file);
176 rio->read_running = 0;
177 DBG("RIO: Reading stopped");
181 rec_io_stop_write(struct main_rec_io *rio)
183 DBG("RIO WRITE: Stopping write");
184 ASSERT(!rio->write_watermark);
185 rio->file.write_handler = NULL;
186 file_chg(&rio->file);
190 rec_io_write_handler(struct main_file *fi)
192 struct main_rec_io *rio = (struct main_rec_io *) fi;
193 struct rio_buffer *b = clist_head(&rio->busy_write_buffers);
196 rec_io_stop_write(rio);
200 int l = write(fi->fd, b->buf + b->written, b->full - b->written);
201 DBG("RIO WRITE: Written %d bytes", l);
204 if (errno != EINTR && errno != EAGAIN)
206 rec_io_stop_write(rio);
207 rio->notify_handler(rio, RIO_ERR_WRITE);
212 if (b->written == b->full)
214 DBG("RIO WRITE: Written full buffer");
216 clist_add_tail(&rio->idle_write_buffers, &b->n);
219 rio->write_watermark -= l;
220 int ret = HOOK_RETRY;
221 if (!rio->write_watermark)
224 rec_io_stop_write(rio);
227 // Call the hook, but carefully, because it can delete the RIO structure
228 if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
233 static struct rio_buffer *
234 rec_io_get_buffer(struct main_rec_io *rio)
236 struct rio_buffer *b = clist_remove_tail(&rio->idle_write_buffers);
238 DBG("RIO WRITE: Recycled old buffer");
241 if (!rio->write_buf_size)
242 rio->write_buf_size = 1024;
243 b = xmalloc(sizeof(struct rio_buffer) + rio->write_buf_size);
244 DBG("RIO WRITE: Allocated new buffer");
246 b->full = b->written = 0;
251 rec_io_write(struct main_rec_io *rio, void *data, uns len)
254 ASSERT(clist_is_linked(&rio->file.n));
260 struct rio_buffer *b = clist_tail(&rio->busy_write_buffers);
261 if (!b || b->full >= rio->write_buf_size)
263 b = rec_io_get_buffer(rio);
264 clist_add_tail(&rio->busy_write_buffers, &b->n);
266 uns l = MIN(len, rio->write_buf_size - b->full);
267 memcpy(b->buf + b->full, bdata, l);
271 rio->write_watermark += l;
272 DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
275 if (!rio->file.write_handler)
277 DBG("RIO WRITE: Starting write");
278 rio->file.write_handler = rec_io_write_handler;
279 file_chg(&rio->file);
284 rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta)
286 DBG("RIO: Setting timeout %u", (uns) expires_delta);
288 timer_del(&rio->timer);
290 timer_add_rel(&rio->timer, expires_delta);
294 rec_io_parse_line(struct main_rec_io *rio)
296 for (uns i = rio->read_prev_avail; i < rio->read_avail; i++)
297 if (rio->read_rec_start[i] == '\n')
304 static uns rhand(struct main_rec_io *rio)
306 uns r = rec_io_parse_line(rio);
309 rio->read_rec_start[r-1] = 0;
310 printf("Read <%s>\n", rio->read_rec_start);
311 rec_io_set_timeout(rio, 10000);
312 rio->read_rec_start[r-1] = '\n';
313 rec_io_write(rio, rio->read_rec_start, r);
318 static int ehand(struct main_rec_io *rio, int cause)
322 msg(L_ERROR, "Error %d", cause);
329 msg(L_INFO, "Event %d", cause);
340 struct main_rec_io rio = {};
341 rio.read_buf_size = 4;
342 rio.read_handler = rhand;
343 rio.notify_handler = ehand;
344 // rio.read_rec_max = 40;
345 rio.write_buf_size = 4;
347 rec_io_start_read(&rio);
348 rec_io_set_timeout(&rio, 10000);
353 msg(L_INFO, "Finished.");
355 if (clist_is_linked(&rio.file.n))