]> mj.ucw.cz Git - libucw.git/blob - ucw/main-rec.c
Main record-based I/O: allow freeing of recio from read handlers
[libucw.git] / ucw / main-rec.c
1 /*
2  *      UCW Library -- Main Loop: Record I/O
3  *
4  *      (c) 2011 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "ucw/lib.h"
13 #include "ucw/mainloop.h"
14
15 #include <stdio.h>
16 #include <string.h>
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <errno.h>
20
21 struct rio_buffer {
22   cnode n;
23   uns full;
24   uns written;
25   byte buf[];
26 };
27
28 static void
29 rec_io_timer_expired(struct main_timer *tm)
30 {
31   struct main_rec_io *rio = tm->data;
32   timer_del(&rio->timer);
33   rio->notify_handler(rio, RIO_ERR_TIMEOUT);
34 }
35
36 void
37 rec_io_add(struct main_rec_io *rio, int fd)
38 {
39   rio->file.fd = fd;
40   file_add(&rio->file);
41   rio->timer.handler = rec_io_timer_expired;
42   rio->timer.data = rio;
43   clist_init(&rio->idle_write_buffers);
44   clist_init(&rio->busy_write_buffers);
45 }
46
47 void
48 rec_io_del(struct main_rec_io *rio)
49 {
50   timer_del(&rio->timer);
51   file_del(&rio->file);
52
53   if (rio->read_buf)
54     {
55       DBG("RIO: Freeing read buffer");
56       xfree(rio->read_buf);
57       rio->read_buf = NULL;
58     }
59
60   struct rio_buffer *b;
61   while ((b = clist_remove_head(&rio->idle_write_buffers)) || (b = clist_remove_head(&rio->busy_write_buffers)))
62     {
63       DBG("RIO: Freeing write buffer");
64       xfree(b);
65     }
66 }
67
68 static int
69 rec_io_read_handler(struct main_file *fi)
70 {
71   struct main_rec_io *rio = (struct main_rec_io *) fi;
72
73   if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
74     {
75       rec_io_stop_read(rio);
76       rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
77       return HOOK_IDLE;
78     }
79
80 restart: ;
81   uns rec_start_pos = rio->read_rec_start - rio->read_buf;
82   uns rec_end_pos = rec_start_pos + rio->read_avail;
83   uns free_space = rio->read_buf_size - rec_end_pos;
84   DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
85     rec_start_pos, rio->read_avail, rio->read_prev_avail,
86     free_space, rio->read_buf_size);
87   // FIXME: Constants?
88   if (free_space <= rio->read_buf_size/8)
89     {
90       if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
91         {
92           // Moving the partial record to the start of the buffer
93           DBG("RIO READ: Moving partial record to start");
94           memmove(rio->read_buf, rio->read_rec_start, rio->read_avail);
95           rio->read_rec_start = rio->read_buf;
96         }
97       else
98         {
99           DBG("RIO READ: Resizing buffer");
100           rio->read_buf_size *= 2;
101           rio->read_buf = xrealloc(rio->read_buf, rio->read_buf_size);
102           rio->read_rec_start = rio->read_buf + rec_start_pos;
103         }
104       goto restart;
105     }
106
107   int l = read(fi->fd, rio->read_buf + rec_end_pos, free_space);
108   DBG("RIO READ: Read %d bytes", l);
109   if (l < 0)
110     {
111       if (errno != EINTR && errno != EAGAIN)
112         {
113           DBG("RIO READ: Signalling error");
114           rec_io_stop_read(rio);
115           rio->notify_handler(rio, RIO_ERR_READ);
116         }
117       return HOOK_IDLE;
118     }
119   if (!l)
120     {
121       DBG("RIO READ: Signalling EOF");
122       rec_io_stop_read(rio);
123       rio->notify_handler(rio, RIO_EVENT_EOF);
124       return HOOK_IDLE;
125     }
126   rio->read_prev_avail = rio->read_avail;
127   rio->read_avail += l;
128   DBG("RIO READ: Available: %u bytes", rio->read_avail);
129
130   uns got;
131   while (rio->read_running && (got = rio->read_handler(rio)))
132     {
133       DBG("RIO READ: Ate %u bytes", got);
134       if (got == ~0U)
135         return HOOK_IDLE;
136       rio->read_rec_start += got;
137       rio->read_avail -= got;
138       rio->read_prev_avail = 0;
139       if (!rio->read_avail)
140         {
141           DBG("RIO READ: Resetting buffer");
142           rio->read_rec_start = rio->read_buf;
143           break;
144         }
145     }
146   DBG("RIO READ: Want more");
147   return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
148 }
149
150 static void
151 rec_io_recalc_read(struct main_rec_io *rio)
152 {
153   uns flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
154   uns run = rio->read_started && flow;
155   DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
156   if (run != rio->read_running)
157     {
158       if (run)
159         {
160           if (!rio->read_buf)
161             {
162               if (!rio->read_buf_size)
163                 rio->read_buf_size = 256;
164               rio->read_buf = xmalloc(rio->read_buf_size);
165               DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
166               rio->read_rec_start = rio->read_buf;
167             }
168           rio->file.read_handler = rec_io_read_handler;
169           file_chg(&rio->file);
170           DBG("RIO: Reading started");
171         }
172       else
173         {
174           rio->file.read_handler = NULL;
175           file_chg(&rio->file);
176           DBG("RIO: Reading stopped");
177         }
178       rio->read_running = run;
179     }
180 }
181
182 void
183 rec_io_start_read(struct main_rec_io *rio)
184 {
185   ASSERT(clist_is_linked(&rio->file.n));
186   rio->read_started = 1;
187   rec_io_recalc_read(rio);
188 }
189
190 void
191 rec_io_stop_read(struct main_rec_io *rio)
192 {
193   ASSERT(clist_is_linked(&rio->file.n));
194   rio->read_started = 0;
195   rec_io_recalc_read(rio);
196 }
197
198 static void
199 rec_io_stop_write(struct main_rec_io *rio)
200 {
201   DBG("RIO WRITE: Stopping write");
202   ASSERT(!rio->write_watermark);
203   rio->file.write_handler = NULL;
204   file_chg(&rio->file);
205 }
206
207 static int
208 rec_io_write_handler(struct main_file *fi)
209 {
210   struct main_rec_io *rio = (struct main_rec_io *) fi;
211   struct rio_buffer *b = clist_head(&rio->busy_write_buffers);
212   if (!b)
213     {
214       rec_io_stop_write(rio);
215       return HOOK_IDLE;
216     }
217
218   int l = write(fi->fd, b->buf + b->written, b->full - b->written);
219   DBG("RIO WRITE: Written %d bytes", l);
220   if (l < 0)
221     {
222       if (errno != EINTR && errno != EAGAIN)
223         {
224           rec_io_stop_write(rio);
225           rio->notify_handler(rio, RIO_ERR_WRITE);
226         }
227       return HOOK_IDLE;
228     }
229   b->written += l;
230   if (b->written == b->full)
231     {
232       DBG("RIO WRITE: Written full buffer");
233       clist_remove(&b->n);
234       clist_add_tail(&rio->idle_write_buffers, &b->n);
235     }
236
237   rio->write_watermark -= l;
238   int ret = HOOK_RETRY;
239   if (!rio->write_watermark)
240     {
241       ret = HOOK_IDLE;
242       rec_io_stop_write(rio);
243     }
244   rec_io_recalc_read(rio);
245
246   // Call the hook, but carefully, because it can delete the RIO structure
247   if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
248     ret = HOOK_IDLE;
249   return ret;
250 }
251
252 static struct rio_buffer *
253 rec_io_get_buffer(struct main_rec_io *rio)
254 {
255   struct rio_buffer *b = clist_remove_tail(&rio->idle_write_buffers);
256   if (b)
257     DBG("RIO WRITE: Recycled old buffer");
258   else
259     {
260       if (!rio->write_buf_size)
261         rio->write_buf_size = 1024;
262       b = xmalloc(sizeof(struct rio_buffer) + rio->write_buf_size);
263       DBG("RIO WRITE: Allocated new buffer");
264     }
265   b->full = b->written = 0;
266   return b;
267 }
268
269 void
270 rec_io_write(struct main_rec_io *rio, void *data, uns len)
271 {
272   byte *bdata = data;
273   ASSERT(clist_is_linked(&rio->file.n));
274   if (!len)
275     return;
276
277   while (len)
278     {
279       struct rio_buffer *b = clist_tail(&rio->busy_write_buffers);
280       if (!b || b->full >= rio->write_buf_size)
281         {
282           b = rec_io_get_buffer(rio);
283           clist_add_tail(&rio->busy_write_buffers, &b->n);
284         }
285       uns l = MIN(len, rio->write_buf_size - b->full);
286       memcpy(b->buf + b->full, bdata, l);
287       b->full += l;
288       bdata += l;
289       len -= l;
290       rio->write_watermark += l;
291       DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
292       rec_io_recalc_read(rio);
293     }
294
295   if (!rio->file.write_handler)
296     {
297       DBG("RIO WRITE: Starting write");
298       rio->file.write_handler = rec_io_write_handler;
299       file_chg(&rio->file);
300     }
301 }
302
303 void
304 rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta)
305 {
306   DBG("RIO: Setting timeout %u", (uns) expires_delta);
307   if (!expires_delta)
308     timer_del(&rio->timer);
309   else
310     timer_add_rel(&rio->timer, expires_delta);
311 }
312
313 uns
314 rec_io_parse_line(struct main_rec_io *rio)
315 {
316   for (uns i = rio->read_prev_avail; i < rio->read_avail; i++)
317     if (rio->read_rec_start[i] == '\n')
318       return i+1;
319   return 0;
320 }
321
322 #ifdef TEST
323
324 static uns rhand(struct main_rec_io *rio)
325 {
326   uns r = rec_io_parse_line(rio);
327   if (r)
328     {
329       rio->read_rec_start[r-1] = 0;
330       printf("Read <%s>\n", rio->read_rec_start);
331       if (rio->read_rec_start[0] == '!')
332         {
333           rec_io_del(rio);
334           main_shut_down();
335           return ~0U;
336         }
337       rec_io_set_timeout(rio, 10000);
338       rio->read_rec_start[r-1] = '\n';
339       rec_io_write(rio, rio->read_rec_start, r);
340     }
341   return r;
342 }
343
344 static int ehand(struct main_rec_io *rio, int cause)
345 {
346   if (cause < 0 || cause == RIO_EVENT_EOF)
347     {
348       msg(L_ERROR, "Error %d", cause);
349       rec_io_del(rio);
350       main_shut_down();
351       return HOOK_IDLE;
352     }
353   else
354     {
355       msg(L_INFO, "Event %d", cause);
356       return HOOK_RETRY;
357     }
358 }
359
360 int
361 main(void)
362 {
363   log_init(NULL);
364   main_init();
365
366   struct main_rec_io rio = {};
367   rio.read_buf_size = 4;
368   rio.read_handler = rhand;
369   rio.notify_handler = ehand;
370   // rio.read_rec_max = 40;
371   rio.write_buf_size = 4;
372   rio.write_throttle_read = 6;
373   rec_io_add(&rio, 0);
374   rec_io_start_read(&rio);
375   rec_io_set_timeout(&rio, 10000);
376
377   main_debug();
378
379   main_loop();
380   msg(L_INFO, "Finished.");
381
382   if (clist_is_linked(&rio.file.n))
383     rec_io_del(&rio);
384   main_cleanup();
385   return 0;
386 }
387
388 #endif