]> mj.ucw.cz Git - libucw.git/blob - ucw/main-rec.c
Merge branch 'dev-mainloop' of ssh://git.ucw.cz/projects/libucw/GIT/libucw into dev...
[libucw.git] / ucw / main-rec.c
1 /*
2  *      UCW Library -- Main Loop: Record I/O
3  *
4  *      (c) 2011 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "ucw/lib.h"
13 #include "ucw/mainloop.h"
14
15 #include <stdio.h>
16 #include <string.h>
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <errno.h>
20
21 struct rio_buffer {
22   cnode n;
23   uns full;
24   uns written;
25   byte buf[];
26 };
27
28 static void
29 rec_io_timer_expired(struct main_timer *tm)
30 {
31   struct main_rec_io *rio = tm->data;
32   timer_del(&rio->timer);
33   rio->notify_handler(rio, RIO_ERR_TIMEOUT);
34 }
35
36 static int rec_io_deferred_start_read(struct main_hook *ho);
37
38 void
39 rec_io_add(struct main_rec_io *rio, int fd)
40 {
41   rio->file.fd = fd;
42   file_add(&rio->file);
43   rio->timer.handler = rec_io_timer_expired;
44   rio->timer.data = rio;
45   rio->start_read_hook.handler = rec_io_deferred_start_read;
46   rio->start_read_hook.data = rio;
47   clist_init(&rio->idle_write_buffers);
48   clist_init(&rio->busy_write_buffers);
49 }
50
51 void
52 rec_io_del(struct main_rec_io *rio)
53 {
54   timer_del(&rio->timer);
55   if (hook_is_active(&rio->start_read_hook))
56     hook_del(&rio->start_read_hook);
57   file_del(&rio->file);
58
59   if (rio->read_buf)
60     {
61       DBG("RIO: Freeing read buffer");
62       xfree(rio->read_buf);
63       rio->read_buf = NULL;
64     }
65
66   struct rio_buffer *b;
67   while ((b = clist_remove_head(&rio->idle_write_buffers)) || (b = clist_remove_head(&rio->busy_write_buffers)))
68     {
69       DBG("RIO: Freeing write buffer");
70       xfree(b);
71     }
72 }
73
74 static int
75 rec_io_process_read_buf(struct main_rec_io *rio)
76 {
77   uns got;
78   while (rio->read_running && (got = rio->read_handler(rio)))
79     {
80       DBG("RIO READ: Ate %u bytes", got);
81       if (got == ~0U)
82         return HOOK_IDLE;
83       rio->read_rec_start += got;
84       rio->read_avail -= got;
85       rio->read_prev_avail = 0;
86       if (!rio->read_avail)
87         {
88           DBG("RIO READ: Resetting buffer");
89           rio->read_rec_start = rio->read_buf;
90           break;
91         }
92     }
93   DBG("RIO READ: Want more");
94   return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
95 }
96
97 static int
98 rec_io_read_handler(struct main_file *fi)
99 {
100   struct main_rec_io *rio = (struct main_rec_io *) fi;
101
102   if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
103     {
104       rec_io_stop_read(rio);
105       rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
106       return HOOK_IDLE;
107     }
108
109 restart: ;
110   uns rec_start_pos = rio->read_rec_start - rio->read_buf;
111   uns rec_end_pos = rec_start_pos + rio->read_avail;
112   uns free_space = rio->read_buf_size - rec_end_pos;
113   DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
114     rec_start_pos, rio->read_avail, rio->read_prev_avail,
115     free_space, rio->read_buf_size);
116   if (free_space <= rio->read_buf_size/8)
117     {
118       if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
119         {
120           // Moving the partial record to the start of the buffer
121           DBG("RIO READ: Moving partial record to start");
122           memmove(rio->read_buf, rio->read_rec_start, rio->read_avail);
123           rio->read_rec_start = rio->read_buf;
124         }
125       else
126         {
127           DBG("RIO READ: Resizing buffer");
128           rio->read_buf_size *= 2;
129           rio->read_buf = xrealloc(rio->read_buf, rio->read_buf_size);
130           rio->read_rec_start = rio->read_buf + rec_start_pos;
131         }
132       goto restart;
133     }
134
135   int l = read(fi->fd, rio->read_buf + rec_end_pos, free_space);
136   DBG("RIO READ: Read %d bytes", l);
137   if (l < 0)
138     {
139       if (errno != EINTR && errno != EAGAIN)
140         {
141           DBG("RIO READ: Signalling error");
142           rec_io_stop_read(rio);
143           rio->notify_handler(rio, RIO_ERR_READ);
144         }
145       return HOOK_IDLE;
146     }
147   if (!l)
148     {
149       DBG("RIO READ: Signalling EOF");
150       rec_io_stop_read(rio);
151       rio->notify_handler(rio, RIO_EVENT_EOF);
152       return HOOK_IDLE;
153     }
154   rio->read_prev_avail = rio->read_avail;
155   rio->read_avail += l;
156   DBG("RIO READ: Available: %u bytes", rio->read_avail);
157
158   return rec_io_process_read_buf(rio);
159 }
160
161 static int
162 rec_io_deferred_start_read(struct main_hook *ho)
163 {
164   struct main_rec_io *rio = ho->data;
165
166   DBG("RIO: Starting reading");
167   if (!rio->read_buf)
168     {
169       if (!rio->read_buf_size)
170         rio->read_buf_size = 256;
171       rio->read_buf = xmalloc(rio->read_buf_size);
172       DBG("RIO: Created read buffer (%u bytes)", rio->read_buf_size);
173       rio->read_rec_start = rio->read_buf;
174     }
175
176   rio->file.read_handler = rec_io_read_handler;
177   file_chg(&rio->file);
178   hook_del(ho);
179   rio->read_running = 1;
180
181   rio->read_prev_avail = 0;
182   return rec_io_process_read_buf(rio);
183 }
184
185 static void
186 rec_io_recalc_read(struct main_rec_io *rio)
187 {
188   uns flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
189   uns run = rio->read_started && flow;
190   DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
191   if (run != rio->read_running)
192     {
193       if (run)
194         {
195           /*
196            * Since we need to rescan the read buffer for leftover records and we
197            * can be deep in the call stack at this moment, we better defer most
198            * of the work to a main_hook, which will be called in the next iteration
199            * of the main loop.
200            */
201           if (!hook_is_active(&rio->start_read_hook))
202             {
203               DBG("RIO: Scheduling start of reading");
204               hook_add(&rio->start_read_hook);
205             }
206         }
207       else
208         {
209           if (hook_is_active(&rio->start_read_hook))
210             {
211               DBG("RIO: Descheduling start of reading");
212               hook_del(&rio->start_read_hook);
213             }
214           rio->file.read_handler = NULL;
215           file_chg(&rio->file);
216           DBG("RIO: Reading stopped");
217           rio->read_running = 0;
218         }
219     }
220 }
221
222 void
223 rec_io_start_read(struct main_rec_io *rio)
224 {
225   ASSERT(rec_io_is_active(rio));
226   rio->read_started = 1;
227   rec_io_recalc_read(rio);
228 }
229
230 void
231 rec_io_stop_read(struct main_rec_io *rio)
232 {
233   ASSERT(rec_io_is_active(rio));
234   rio->read_started = 0;
235   rec_io_recalc_read(rio);
236 }
237
238 static void
239 rec_io_stop_write(struct main_rec_io *rio)
240 {
241   DBG("RIO WRITE: Stopping write");
242   // XXX: When we are called after a write error, there might still
243   // be some data queued, but we need not care.
244   rio->file.write_handler = NULL;
245   file_chg(&rio->file);
246 }
247
248 static int
249 rec_io_write_handler(struct main_file *fi)
250 {
251   struct main_rec_io *rio = (struct main_rec_io *) fi;
252   struct rio_buffer *b = clist_head(&rio->busy_write_buffers);
253   if (!b)
254     {
255       rec_io_stop_write(rio);
256       return HOOK_IDLE;
257     }
258
259   int l = write(fi->fd, b->buf + b->written, b->full - b->written);
260   DBG("RIO WRITE: Written %d bytes", l);
261   if (l < 0)
262     {
263       if (errno != EINTR && errno != EAGAIN)
264         {
265           rec_io_stop_write(rio);
266           rio->notify_handler(rio, RIO_ERR_WRITE);
267         }
268       return HOOK_IDLE;
269     }
270   b->written += l;
271   if (b->written == b->full)
272     {
273       DBG("RIO WRITE: Written full buffer");
274       clist_remove(&b->n);
275       clist_add_tail(&rio->idle_write_buffers, &b->n);
276     }
277
278   rio->write_watermark -= l;
279   int ret = HOOK_RETRY;
280   if (!rio->write_watermark)
281     {
282       ret = HOOK_IDLE;
283       rec_io_stop_write(rio);
284     }
285   rec_io_recalc_read(rio);
286
287   // Call the hook, but carefully, because it can delete the RIO structure
288   if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
289     ret = HOOK_IDLE;
290   return ret;
291 }
292
293 static struct rio_buffer *
294 rec_io_get_buffer(struct main_rec_io *rio)
295 {
296   struct rio_buffer *b = clist_remove_tail(&rio->idle_write_buffers);
297   if (b)
298     DBG("RIO WRITE: Recycled old buffer");
299   else
300     {
301       if (!rio->write_buf_size)
302         rio->write_buf_size = 1024;
303       b = xmalloc(sizeof(struct rio_buffer) + rio->write_buf_size);
304       DBG("RIO WRITE: Allocated new buffer");
305     }
306   b->full = b->written = 0;
307   return b;
308 }
309
310 void
311 rec_io_write(struct main_rec_io *rio, void *data, uns len)
312 {
313   byte *bdata = data;
314   ASSERT(rec_io_is_active(rio));
315   if (!len)
316     return;
317
318   while (len)
319     {
320       struct rio_buffer *b = clist_tail(&rio->busy_write_buffers);
321       if (!b || b->full >= rio->write_buf_size)
322         {
323           b = rec_io_get_buffer(rio);
324           clist_add_tail(&rio->busy_write_buffers, &b->n);
325         }
326       uns l = MIN(len, rio->write_buf_size - b->full);
327       memcpy(b->buf + b->full, bdata, l);
328       b->full += l;
329       bdata += l;
330       len -= l;
331       rio->write_watermark += l;
332       DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
333       rec_io_recalc_read(rio);
334     }
335
336   if (!rio->file.write_handler)
337     {
338       DBG("RIO WRITE: Starting write");
339       rio->file.write_handler = rec_io_write_handler;
340       file_chg(&rio->file);
341     }
342 }
343
344 void
345 rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta)
346 {
347   DBG("RIO: Setting timeout %u", (uns) expires_delta);
348   if (!expires_delta)
349     timer_del(&rio->timer);
350   else
351     timer_add_rel(&rio->timer, expires_delta);
352 }
353
354 uns
355 rec_io_parse_line(struct main_rec_io *rio)
356 {
357   for (uns i = rio->read_prev_avail; i < rio->read_avail; i++)
358     if (rio->read_rec_start[i] == '\n')
359       return i+1;
360   return 0;
361 }
362
363 #ifdef TEST
364
365 static uns rhand(struct main_rec_io *rio)
366 {
367   uns r = rec_io_parse_line(rio);
368   if (r)
369     {
370       rio->read_rec_start[r-1] = 0;
371       printf("Read <%s>\n", rio->read_rec_start);
372       if (rio->read_rec_start[0] == '!')
373         {
374           rec_io_del(rio);
375           main_shut_down();
376           return ~0U;
377         }
378       rec_io_set_timeout(rio, 10000);
379       rio->read_rec_start[r-1] = '\n';
380       rec_io_write(rio, rio->read_rec_start, r);
381     }
382   return r;
383 }
384
385 static int ehand(struct main_rec_io *rio, int cause)
386 {
387   if (cause < 0 || cause == RIO_EVENT_EOF)
388     {
389       msg(L_ERROR, "Error %d", cause);
390       rec_io_del(rio);
391       main_shut_down();
392       return HOOK_IDLE;
393     }
394   else
395     {
396       msg(L_INFO, "Event %d", cause);
397       return HOOK_RETRY;
398     }
399 }
400
401 int
402 main(void)
403 {
404   log_init(NULL);
405   main_init();
406
407   struct main_rec_io rio = {};
408   rio.read_buf_size = 4;
409   rio.read_handler = rhand;
410   rio.notify_handler = ehand;
411   // rio.read_rec_max = 40;
412   rio.write_buf_size = 4;
413   rio.write_throttle_read = 6;
414   rec_io_add(&rio, 0);
415   rec_io_start_read(&rio);
416   rec_io_set_timeout(&rio, 10000);
417
418   main_debug();
419
420   main_loop();
421   msg(L_INFO, "Finished.");
422
423   if (file_is_active(&rio.file))
424     rec_io_del(&rio);
425   main_cleanup();
426   return 0;
427 }
428
429 #endif