]> mj.ucw.cz Git - libucw.git/blob - ucw/main-rec.c
Extended types: Cleaned up unit name parsing
[libucw.git] / ucw / main-rec.c
1 /*
2  *      UCW Library -- Main Loop: Record I/O
3  *
4  *      (c) 2011--2012 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include <ucw/lib.h>
13 #include <ucw/mainloop.h>
14
15 #include <stdio.h>
16 #include <string.h>
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <errno.h>
20
21 struct rio_buffer {
22   cnode n;
23   uint full;
24   uint written;
25   byte buf[];
26 };
27
28 static void
29 rec_io_timer_expired(struct main_timer *tm)
30 {
31   struct main_rec_io *rio = tm->data;
32   timer_del(&rio->timer);
33   rio->notify_handler(rio, RIO_ERR_TIMEOUT);
34 }
35
36 static int rec_io_deferred_start_read(struct main_hook *ho);
37
38 void
39 rec_io_add(struct main_rec_io *rio, int fd)
40 {
41   rio->file.fd = fd;
42   file_add(&rio->file);
43   rio->timer.handler = rec_io_timer_expired;
44   rio->timer.data = rio;
45   rio->start_read_hook.handler = rec_io_deferred_start_read;
46   rio->start_read_hook.data = rio;
47   clist_init(&rio->idle_write_buffers);
48   clist_init(&rio->busy_write_buffers);
49 }
50
51 void
52 rec_io_del(struct main_rec_io *rio)
53 {
54   if (!rec_io_is_active(rio))
55     return;
56
57   timer_del(&rio->timer);
58   hook_del(&rio->start_read_hook);
59   file_del(&rio->file);
60
61   if (rio->read_buf)
62     {
63       DBG("RIO: Freeing read buffer");
64       xfree(rio->read_buf);
65       rio->read_buf = NULL;
66     }
67
68   struct rio_buffer *b;
69   while ((b = clist_remove_head(&rio->idle_write_buffers)) || (b = clist_remove_head(&rio->busy_write_buffers)))
70     {
71       DBG("RIO: Freeing write buffer");
72       xfree(b);
73     }
74 }
75
76 static int
77 rec_io_process_read_buf(struct main_rec_io *rio)
78 {
79   uint got;
80   while (rio->read_running && (got = rio->read_handler(rio)))
81     {
82       DBG("RIO READ: Ate %u bytes", got);
83       if (got == ~0U)
84         return HOOK_IDLE;
85       rio->read_rec_start += got;
86       rio->read_avail -= got;
87       rio->read_prev_avail = 0;
88       if (!rio->read_avail)
89         {
90           DBG("RIO READ: Resetting buffer");
91           rio->read_rec_start = rio->read_buf;
92           break;
93         }
94     }
95   DBG("RIO READ: Want more");
96   return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
97 }
98
99 static int
100 rec_io_read_handler(struct main_file *fi)
101 {
102   struct main_rec_io *rio = (struct main_rec_io *) fi;
103
104   if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
105     {
106       rec_io_stop_read(rio);
107       rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
108       return HOOK_IDLE;
109     }
110
111 restart: ;
112   uint rec_start_pos = rio->read_rec_start - rio->read_buf;
113   uint rec_end_pos = rec_start_pos + rio->read_avail;
114   uint free_space = rio->read_buf_size - rec_end_pos;
115   DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
116     rec_start_pos, rio->read_avail, rio->read_prev_avail,
117     free_space, rio->read_buf_size);
118   if (free_space <= rio->read_buf_size/8)
119     {
120       if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
121         {
122           // Moving the partial record to the start of the buffer
123           DBG("RIO READ: Moving partial record to start");
124           memmove(rio->read_buf, rio->read_rec_start, rio->read_avail);
125           rio->read_rec_start = rio->read_buf;
126         }
127       else
128         {
129           DBG("RIO READ: Resizing buffer");
130           rio->read_buf_size *= 2;
131           rio->read_buf = xrealloc(rio->read_buf, rio->read_buf_size);
132           rio->read_rec_start = rio->read_buf + rec_start_pos;
133         }
134       goto restart;
135     }
136
137   int l = read(fi->fd, rio->read_buf + rec_end_pos, free_space);
138   DBG("RIO READ: Read %d bytes", l);
139   if (l < 0)
140     {
141       if (errno != EINTR && errno != EAGAIN)
142         {
143           DBG("RIO READ: Signalling error");
144           rec_io_stop_read(rio);
145           rio->notify_handler(rio, RIO_ERR_READ);
146         }
147       return HOOK_IDLE;
148     }
149   if (!l)
150     {
151       DBG("RIO READ: Signalling EOF");
152       rec_io_stop_read(rio);
153       rio->notify_handler(rio, RIO_EVENT_EOF);
154       return HOOK_IDLE;
155     }
156   rio->read_prev_avail = rio->read_avail;
157   rio->read_avail += l;
158   DBG("RIO READ: Available: %u bytes", rio->read_avail);
159
160   return rec_io_process_read_buf(rio);
161 }
162
163 static int
164 rec_io_deferred_start_read(struct main_hook *ho)
165 {
166   struct main_rec_io *rio = ho->data;
167
168   DBG("RIO: Starting reading");
169   if (!rio->read_buf)
170     {
171       if (!rio->read_buf_size)
172         rio->read_buf_size = 256;
173       rio->read_buf = xmalloc(rio->read_buf_size);
174       DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size);
175       rio->read_rec_start = rio->read_buf;
176     }
177
178   rio->file.read_handler = rec_io_read_handler;
179   file_chg(&rio->file);
180   hook_del(ho);
181   rio->read_running = 1;
182
183   rio->read_prev_avail = 0;
184   return rec_io_process_read_buf(rio);
185 }
186
187 static void
188 rec_io_recalc_read(struct main_rec_io *rio)
189 {
190   uint flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
191   uint run = rio->read_started && flow;
192   DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
193   if (run != rio->read_running)
194     {
195       if (run)
196         {
197           /*
198            * Since we need to rescan the read buffer for leftover records and we
199            * can be deep in the call stack at this moment, we better defer most
200            * of the work to a main_hook, which will be called in the next iteration
201            * of the main loop.
202            */
203           if (!hook_is_active(&rio->start_read_hook))
204             {
205               DBG("RIO: Scheduling start of reading");
206               hook_add(&rio->start_read_hook);
207             }
208         }
209       else
210         {
211           if (hook_is_active(&rio->start_read_hook))
212             {
213               DBG("RIO: Descheduling start of reading");
214               hook_del(&rio->start_read_hook);
215             }
216           rio->file.read_handler = NULL;
217           file_chg(&rio->file);
218           DBG("RIO: Reading stopped");
219           rio->read_running = 0;
220         }
221     }
222 }
223
224 void
225 rec_io_start_read(struct main_rec_io *rio)
226 {
227   ASSERT(rec_io_is_active(rio));
228   rio->read_started = 1;
229   rec_io_recalc_read(rio);
230 }
231
232 void
233 rec_io_stop_read(struct main_rec_io *rio)
234 {
235   ASSERT(rec_io_is_active(rio));
236   rio->read_started = 0;
237   rec_io_recalc_read(rio);
238 }
239
240 static void
241 rec_io_stop_write(struct main_rec_io *rio)
242 {
243   DBG("RIO WRITE: Stopping write");
244   // XXX: When we are called after a write error, there might still
245   // be some data queued, but we need not care.
246   rio->file.write_handler = NULL;
247   file_chg(&rio->file);
248 }
249
250 static int
251 rec_io_write_handler(struct main_file *fi)
252 {
253   struct main_rec_io *rio = (struct main_rec_io *) fi;
254   struct rio_buffer *b = clist_head(&rio->busy_write_buffers);
255   if (!b)
256     {
257       rec_io_stop_write(rio);
258       return HOOK_IDLE;
259     }
260
261   int l = write(fi->fd, b->buf + b->written, b->full - b->written);
262   DBG("RIO WRITE: Written %d bytes", l);
263   if (l < 0)
264     {
265       if (errno != EINTR && errno != EAGAIN)
266         {
267           rec_io_stop_write(rio);
268           rio->notify_handler(rio, RIO_ERR_WRITE);
269         }
270       return HOOK_IDLE;
271     }
272   b->written += l;
273   if (b->written == b->full)
274     {
275       DBG("RIO WRITE: Written full buffer");
276       clist_remove(&b->n);
277       clist_add_tail(&rio->idle_write_buffers, &b->n);
278     }
279
280   rio->write_watermark -= l;
281   int ret = HOOK_RETRY;
282   if (!rio->write_watermark)
283     {
284       ret = HOOK_IDLE;
285       rec_io_stop_write(rio);
286     }
287   rec_io_recalc_read(rio);
288
289   // Call the hook, but carefully, because it can delete the RIO structure
290   if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
291     ret = HOOK_IDLE;
292   return ret;
293 }
294
295 static struct rio_buffer *
296 rec_io_get_buffer(struct main_rec_io *rio)
297 {
298   struct rio_buffer *b = clist_remove_tail(&rio->idle_write_buffers);
299   if (b)
300     DBG("RIO WRITE: Recycled old buffer");
301   else
302     {
303       if (!rio->write_buf_size)
304         rio->write_buf_size = 1024;
305       b = xmalloc(sizeof(struct rio_buffer) + rio->write_buf_size);
306       DBG("RIO WRITE: Allocated new buffer");
307     }
308   b->full = b->written = 0;
309   return b;
310 }
311
312 void
313 rec_io_write(struct main_rec_io *rio, void *data, uint len)
314 {
315   byte *bdata = data;
316   ASSERT(rec_io_is_active(rio));
317   if (!len)
318     return;
319
320   while (len)
321     {
322       struct rio_buffer *b = clist_tail(&rio->busy_write_buffers);
323       if (!b || b->full >= rio->write_buf_size)
324         {
325           b = rec_io_get_buffer(rio);
326           clist_add_tail(&rio->busy_write_buffers, &b->n);
327         }
328       uint l = MIN(len, rio->write_buf_size - b->full);
329       memcpy(b->buf + b->full, bdata, l);
330       b->full += l;
331       bdata += l;
332       len -= l;
333       rio->write_watermark += l;
334       DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
335       rec_io_recalc_read(rio);
336     }
337
338   if (!rio->file.write_handler)
339     {
340       DBG("RIO WRITE: Starting write");
341       rio->file.write_handler = rec_io_write_handler;
342       file_chg(&rio->file);
343     }
344 }
345
346 void
347 rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta)
348 {
349   DBG("RIO: Setting timeout %u", (uint) expires_delta);
350   if (!expires_delta)
351     timer_del(&rio->timer);
352   else
353     timer_add_rel(&rio->timer, expires_delta);
354 }
355
356 uint
357 rec_io_parse_line(struct main_rec_io *rio)
358 {
359   for (uint i = rio->read_prev_avail; i < rio->read_avail; i++)
360     if (rio->read_rec_start[i] == '\n')
361       return i+1;
362   return 0;
363 }
364
365 #ifdef TEST
366
367 static uint rhand(struct main_rec_io *rio)
368 {
369   uint r = rec_io_parse_line(rio);
370   if (r)
371     {
372       rio->read_rec_start[r-1] = 0;
373       printf("Read <%s>\n", rio->read_rec_start);
374       if (rio->read_rec_start[0] == '!')
375         {
376           rec_io_del(rio);
377           main_shut_down();
378           return ~0U;
379         }
380       rec_io_set_timeout(rio, 10000);
381       rio->read_rec_start[r-1] = '\n';
382       rec_io_write(rio, rio->read_rec_start, r);
383     }
384   return r;
385 }
386
387 static int ehand(struct main_rec_io *rio, int cause)
388 {
389   if (cause < 0 || cause == RIO_EVENT_EOF)
390     {
391       msg(L_ERROR, "Error %d", cause);
392       rec_io_del(rio);
393       main_shut_down();
394       return HOOK_IDLE;
395     }
396   else
397     {
398       msg(L_INFO, "Event %d", cause);
399       return HOOK_RETRY;
400     }
401 }
402
403 int
404 main(void)
405 {
406   log_init(NULL);
407   main_init();
408
409   struct main_rec_io rio = {};
410   rio.read_buf_size = 4;
411   rio.read_handler = rhand;
412   rio.notify_handler = ehand;
413   // rio.read_rec_max = 40;
414   rio.write_buf_size = 4;
415   rio.write_throttle_read = 6;
416   rec_io_add(&rio, 0);
417   rec_io_start_read(&rio);
418   rec_io_set_timeout(&rio, 10000);
419
420   main_debug();
421
422   main_loop();
423   msg(L_INFO, "Finished.");
424
425   if (file_is_active(&rio.file))
426     rec_io_del(&rio);
427   main_cleanup();
428   return 0;
429 }
430
431 #endif