]> mj.ucw.cz Git - libucw.git/blob - ucw/main-rec.c
Tests: xtypes-test sets an explicit timezone
[libucw.git] / ucw / main-rec.c
1 /*
2  *      UCW Library -- Main Loop: Record I/O
3  *
4  *      (c) 2011--2012 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include <ucw/lib.h>
13 #include <ucw/mainloop.h>
14
15 #include <stdio.h>
16 #include <string.h>
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <errno.h>
20
21 struct rio_buffer {
22   cnode n;
23   uint full;
24   uint written;
25   byte buf[];
26 };
27
28 static void
29 rec_io_timer_expired(struct main_timer *tm)
30 {
31   struct main_rec_io *rio = tm->data;
32   timer_del(&rio->timer);
33   rio->notify_handler(rio, RIO_ERR_TIMEOUT);
34 }
35
36 static int rec_io_deferred_start_read(struct main_hook *ho);
37
38 void
39 rec_io_add(struct main_rec_io *rio, int fd)
40 {
41   rio->file.fd = fd;
42   file_add(&rio->file);
43   rio->timer.handler = rec_io_timer_expired;
44   rio->timer.data = rio;
45   rio->start_read_hook.handler = rec_io_deferred_start_read;
46   rio->start_read_hook.data = rio;
47   clist_init(&rio->idle_write_buffers);
48   clist_init(&rio->busy_write_buffers);
49 }
50
51 void
52 rec_io_del(struct main_rec_io *rio)
53 {
54   if (!rec_io_is_active(rio))
55     return;
56
57   timer_del(&rio->timer);
58   hook_del(&rio->start_read_hook);
59   file_del(&rio->file);
60
61   rio->file.read_handler = NULL;
62   rio->read_started = rio->read_running = 0;
63   rio->read_avail = rio->read_prev_avail = 0;
64   if (rio->read_buf)
65     {
66       DBG("RIO: Freeing read buffer");
67       xfree(rio->read_buf);
68       rio->read_buf = rio->read_rec_start = NULL;
69     }
70
71   rio->file.write_handler = NULL;
72   rio->write_watermark = 0;
73   struct rio_buffer *b;
74   while ((b = clist_remove_head(&rio->idle_write_buffers)) || (b = clist_remove_head(&rio->busy_write_buffers)))
75     {
76       DBG("RIO: Freeing write buffer");
77       xfree(b);
78     }
79 }
80
81 static int
82 rec_io_process_read_buf(struct main_rec_io *rio)
83 {
84   uint got;
85   ASSERT(rio->read_prev_avail < rio->read_avail);
86   while (rio->read_running && (got = rio->read_handler(rio)))
87     {
88       DBG("RIO READ: Ate %u bytes", got);
89       if (got == ~0U)
90         return HOOK_IDLE;
91       ASSERT(got <= rio->read_avail);
92       rio->read_rec_start += got;
93       rio->read_avail -= got;
94       rio->read_prev_avail = 0;
95       if (!rio->read_avail)
96         {
97           DBG("RIO READ: Resetting buffer");
98           rio->read_rec_start = rio->read_buf;
99           break;
100         }
101     }
102   DBG("RIO READ: Want more");
103   return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
104 }
105
106 static int
107 rec_io_read_handler(struct main_file *fi)
108 {
109   struct main_rec_io *rio = (struct main_rec_io *) fi;
110
111   if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
112     {
113       rec_io_stop_read(rio);
114       rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
115       return HOOK_IDLE;
116     }
117
118 restart: ;
119   uint rec_start_pos = rio->read_rec_start - rio->read_buf;
120   uint rec_end_pos = rec_start_pos + rio->read_avail;
121   uint free_space = rio->read_buf_size - rec_end_pos;
122   DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
123     rec_start_pos, rio->read_avail, rio->read_prev_avail,
124     free_space, rio->read_buf_size);
125   if (free_space <= rio->read_buf_size/8)
126     {
127       if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
128         {
129           // Moving the partial record to the start of the buffer
130           DBG("RIO READ: Moving partial record to start");
131           memmove(rio->read_buf, rio->read_rec_start, rio->read_avail);
132           rio->read_rec_start = rio->read_buf;
133         }
134       else
135         {
136           DBG("RIO READ: Resizing buffer");
137           rio->read_buf_size *= 2;
138           rio->read_buf = xrealloc(rio->read_buf, rio->read_buf_size);
139           rio->read_rec_start = rio->read_buf + rec_start_pos;
140         }
141       goto restart;
142     }
143
144   int l = read(fi->fd, rio->read_buf + rec_end_pos, free_space);
145   DBG("RIO READ: Read %d bytes", l);
146   if (l < 0)
147     {
148       if (errno != EINTR && errno != EAGAIN)
149         {
150           DBG("RIO READ: Signalling error");
151           rec_io_stop_read(rio);
152           rio->notify_handler(rio, RIO_ERR_READ);
153         }
154       return HOOK_IDLE;
155     }
156   if (!l)
157     {
158       DBG("RIO READ: Signalling EOF");
159       rec_io_stop_read(rio);
160       rio->notify_handler(rio, RIO_EVENT_EOF);
161       return HOOK_IDLE;
162     }
163   rio->read_prev_avail = rio->read_avail;
164   rio->read_avail += l;
165   DBG("RIO READ: Available: %u bytes", rio->read_avail);
166
167   return rec_io_process_read_buf(rio);
168 }
169
170 static int
171 rec_io_deferred_start_read(struct main_hook *ho)
172 {
173   struct main_rec_io *rio = ho->data;
174
175   DBG("RIO: Starting reading");
176   if (!rio->read_buf)
177     {
178       if (!rio->read_buf_size)
179         rio->read_buf_size = 256;
180       rio->read_buf = xmalloc(rio->read_buf_size);
181       DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size);
182       rio->read_rec_start = rio->read_buf;
183     }
184
185   rio->file.read_handler = rec_io_read_handler;
186   file_chg(&rio->file);
187   hook_del(ho);
188   rio->read_running = 1;
189
190   rio->read_prev_avail = 0;
191   if (rio->read_avail)
192     rec_io_process_read_buf(rio);
193
194   return HOOK_IDLE;
195 }
196
197 static void
198 rec_io_recalc_read(struct main_rec_io *rio)
199 {
200   uint flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
201   uint run = rio->read_started && flow;
202   DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
203   if (run != rio->read_running)
204     {
205       if (run)
206         {
207           /*
208            * Since we need to rescan the read buffer for leftover records and we
209            * can be deep in the call stack at this moment, we better defer most
210            * of the work to a main_hook, which will be called in the next iteration
211            * of the main loop.
212            */
213           if (!hook_is_active(&rio->start_read_hook))
214             {
215               DBG("RIO: Scheduling start of reading");
216               hook_add(&rio->start_read_hook);
217             }
218         }
219       else
220         {
221           if (hook_is_active(&rio->start_read_hook))
222             {
223               DBG("RIO: Descheduling start of reading");
224               hook_del(&rio->start_read_hook);
225             }
226           rio->file.read_handler = NULL;
227           file_chg(&rio->file);
228           DBG("RIO: Reading stopped");
229           rio->read_running = 0;
230         }
231     }
232 }
233
234 void
235 rec_io_start_read(struct main_rec_io *rio)
236 {
237   ASSERT(rec_io_is_active(rio));
238   rio->read_started = 1;
239   rec_io_recalc_read(rio);
240 }
241
242 void
243 rec_io_stop_read(struct main_rec_io *rio)
244 {
245   ASSERT(rec_io_is_active(rio));
246   rio->read_started = 0;
247   rec_io_recalc_read(rio);
248 }
249
250 static void
251 rec_io_stop_write(struct main_rec_io *rio)
252 {
253   DBG("RIO WRITE: Stopping write");
254   // XXX: When we are called after a write error, there might still
255   // be some data queued, but we need not care.
256   rio->file.write_handler = NULL;
257   file_chg(&rio->file);
258 }
259
260 static int
261 rec_io_write_handler(struct main_file *fi)
262 {
263   struct main_rec_io *rio = (struct main_rec_io *) fi;
264   struct rio_buffer *b = clist_head(&rio->busy_write_buffers);
265   if (!b)
266     {
267       rec_io_stop_write(rio);
268       return HOOK_IDLE;
269     }
270
271   int l = write(fi->fd, b->buf + b->written, b->full - b->written);
272   DBG("RIO WRITE: Written %d bytes", l);
273   if (l < 0)
274     {
275       if (errno != EINTR && errno != EAGAIN)
276         {
277           rec_io_stop_write(rio);
278           rio->notify_handler(rio, RIO_ERR_WRITE);
279         }
280       return HOOK_IDLE;
281     }
282   b->written += l;
283   if (b->written == b->full)
284     {
285       DBG("RIO WRITE: Written full buffer");
286       clist_remove(&b->n);
287       clist_add_tail(&rio->idle_write_buffers, &b->n);
288     }
289
290   rio->write_watermark -= l;
291   int ret = HOOK_RETRY;
292   if (!rio->write_watermark)
293     {
294       ret = HOOK_IDLE;
295       rec_io_stop_write(rio);
296     }
297   rec_io_recalc_read(rio);
298
299   // Call the hook, but carefully, because it can delete the RIO structure
300   if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
301     ret = HOOK_IDLE;
302   return ret;
303 }
304
305 static struct rio_buffer *
306 rec_io_get_buffer(struct main_rec_io *rio)
307 {
308   struct rio_buffer *b = clist_remove_tail(&rio->idle_write_buffers);
309   if (b)
310     DBG("RIO WRITE: Recycled old buffer");
311   else
312     {
313       if (!rio->write_buf_size)
314         rio->write_buf_size = 1024;
315       b = xmalloc(sizeof(struct rio_buffer) + rio->write_buf_size);
316       DBG("RIO WRITE: Allocated new buffer");
317     }
318   b->full = b->written = 0;
319   return b;
320 }
321
322 void
323 rec_io_write(struct main_rec_io *rio, const void *data, uint len)
324 {
325   const byte *bdata = data;
326   ASSERT(rec_io_is_active(rio));
327   if (!len)
328     return;
329
330   while (len)
331     {
332       struct rio_buffer *b = clist_tail(&rio->busy_write_buffers);
333       if (!b || b->full >= rio->write_buf_size)
334         {
335           b = rec_io_get_buffer(rio);
336           clist_add_tail(&rio->busy_write_buffers, &b->n);
337         }
338       uint l = MIN(len, rio->write_buf_size - b->full);
339       memcpy(b->buf + b->full, bdata, l);
340       b->full += l;
341       bdata += l;
342       len -= l;
343       rio->write_watermark += l;
344       DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
345       rec_io_recalc_read(rio);
346     }
347
348   if (!rio->file.write_handler)
349     {
350       DBG("RIO WRITE: Starting write");
351       rio->file.write_handler = rec_io_write_handler;
352       file_chg(&rio->file);
353     }
354 }
355
356 void
357 rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta)
358 {
359   DBG("RIO: Setting timeout %u", (uint) expires_delta);
360   if (!expires_delta)
361     timer_del(&rio->timer);
362   else
363     timer_add_rel(&rio->timer, expires_delta);
364 }
365
366 uint
367 rec_io_parse_line(struct main_rec_io *rio)
368 {
369   for (uint i = rio->read_prev_avail; i < rio->read_avail; i++)
370     if (rio->read_rec_start[i] == '\n')
371       return i+1;
372   return 0;
373 }
374
375 #ifdef TEST
376
377 static uint rhand(struct main_rec_io *rio)
378 {
379   uint r = rec_io_parse_line(rio);
380   if (r)
381     {
382       rio->read_rec_start[r-1] = 0;
383       printf("Read <%s>\n", rio->read_rec_start);
384       if (rio->read_rec_start[0] == '!')
385         {
386           rec_io_del(rio);
387           main_shut_down();
388           return ~0U;
389         }
390       rec_io_set_timeout(rio, 10000);
391       rio->read_rec_start[r-1] = '\n';
392       rec_io_write(rio, rio->read_rec_start, r);
393     }
394   return r;
395 }
396
397 static int ehand(struct main_rec_io *rio, int cause)
398 {
399   if (cause < 0 || cause == RIO_EVENT_EOF)
400     {
401       msg(L_ERROR, "Error %d", cause);
402       rec_io_del(rio);
403       main_shut_down();
404       return HOOK_IDLE;
405     }
406   else
407     {
408       msg(L_INFO, "Event %d", cause);
409       return HOOK_RETRY;
410     }
411 }
412
413 int
414 main(void)
415 {
416   log_init(NULL);
417   main_init();
418
419   struct main_rec_io rio = {};
420   rio.read_buf_size = 4;
421   rio.read_handler = rhand;
422   rio.notify_handler = ehand;
423   // rio.read_rec_max = 40;
424   rio.write_buf_size = 4;
425   rio.write_throttle_read = 6;
426   rec_io_add(&rio, 0);
427   rec_io_start_read(&rio);
428   rec_io_set_timeout(&rio, 10000);
429
430   main_debug();
431
432   main_loop();
433   msg(L_INFO, "Finished.");
434
435   if (file_is_active(&rio.file))
436     rec_io_del(&rio);
437   main_cleanup();
438   return 0;
439 }
440
441 #endif