]> mj.ucw.cz Git - libucw.git/blob - ucw/main-rec.c
3c4c0678b73fa9f783f771e970c0aaf70ca0b2f6
[libucw.git] / ucw / main-rec.c
1 /*
2  *      UCW Library -- Main Loop: Record I/O
3  *
4  *      (c) 2011 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "ucw/lib.h"
13 #include "ucw/mainloop.h"
14
15 #include <stdio.h>
16 #include <string.h>
17 #include <stdlib.h>
18 #include <unistd.h>
19 #include <errno.h>
20
21 struct rio_buffer {
22   cnode n;
23   uns full;
24   uns written;
25   byte buf[];
26 };
27
28 static void
29 rec_io_timer_expired(struct main_timer *tm)
30 {
31   struct main_rec_io *rio = tm->data;
32   timer_del(&rio->timer);
33   rio->notify_handler(rio, RIO_ERR_TIMEOUT);
34 }
35
36 void
37 rec_io_add(struct main_rec_io *rio, int fd)
38 {
39   rio->file.fd = fd;
40   file_add(&rio->file);
41   rio->timer.handler = rec_io_timer_expired;
42   rio->timer.data = rio;
43   clist_init(&rio->idle_write_buffers);
44   clist_init(&rio->busy_write_buffers);
45 }
46
47 void
48 rec_io_del(struct main_rec_io *rio)
49 {
50   timer_del(&rio->timer);
51   file_del(&rio->file);
52
53   if (rio->read_buf)
54     {
55       DBG("RIO: Freeing read buffer");
56       xfree(rio->read_buf);
57       rio->read_buf = NULL;
58     }
59
60   struct rio_buffer *b;
61   while ((b = clist_remove_head(&rio->idle_write_buffers)) || (b = clist_remove_head(&rio->busy_write_buffers)))
62     {
63       DBG("RIO: Freeing write buffer");
64       xfree(b);
65     }
66 }
67
68 static int
69 rec_io_read_handler(struct main_file *fi)
70 {
71   struct main_rec_io *rio = (struct main_rec_io *) fi;
72
73   if (rio->read_rec_max && rio->read_avail >= rio->read_rec_max)
74     {
75       rec_io_stop_read(rio);
76       rio->notify_handler(rio, RIO_ERR_RECORD_TOO_LARGE);
77       return HOOK_IDLE;
78     }
79
80 restart: ;
81   uns rec_start_pos = rio->read_rec_start - rio->read_buf;
82   uns rec_end_pos = rec_start_pos + rio->read_avail;
83   uns free_space = rio->read_buf_size - rec_end_pos;
84   DBG("RIO READ: rec_start=%u avail=%u prev_avail=%u free=%u/%u",
85     rec_start_pos, rio->read_avail, rio->read_prev_avail,
86     free_space, rio->read_buf_size);
87   // FIXME: Constants?
88   if (free_space <= rio->read_buf_size/8)
89     {
90       if (rec_start_pos && rec_start_pos >= rio->read_buf_size/2)
91         {
92           // Moving the partial record to the start of the buffer
93           DBG("RIO READ: Moving partial record to start");
94           memmove(rio->read_buf, rio->read_rec_start, rio->read_avail);
95           rio->read_rec_start = rio->read_buf;
96         }
97       else
98         {
99           DBG("RIO READ: Resizing buffer");
100           rio->read_buf_size *= 2;
101           rio->read_buf = xrealloc(rio->read_buf, rio->read_buf_size);
102           rio->read_rec_start = rio->read_buf + rec_start_pos;
103         }
104       goto restart;
105     }
106
107   int l = read(fi->fd, rio->read_buf + rec_end_pos, free_space);
108   DBG("RIO READ: Read %d bytes", l);
109   if (l < 0)
110     {
111       if (errno != EINTR && errno != EAGAIN)
112         {
113           DBG("RIO READ: Signalling error");
114           rec_io_stop_read(rio);
115           rio->notify_handler(rio, RIO_ERR_READ);
116         }
117       return HOOK_IDLE;
118     }
119   if (!l)
120     {
121       DBG("RIO READ: Signalling EOF");
122       rec_io_stop_read(rio);
123       rio->notify_handler(rio, RIO_EVENT_EOF);
124       return HOOK_IDLE;
125     }
126   rio->read_prev_avail = rio->read_avail;
127   rio->read_avail += l;
128   DBG("RIO READ: Available: %u bytes", rio->read_avail);
129
130   uns got;
131   while (rio->read_running && (got = rio->read_handler(rio)))
132     {
133       DBG("RIO READ: Ate %u bytes", got);
134       rio->read_rec_start += got;
135       rio->read_avail -= got;
136       rio->read_prev_avail = 0;
137       if (!rio->read_avail)
138         {
139           DBG("RIO READ: Resetting buffer");
140           rio->read_rec_start = rio->read_buf;
141           break;
142         }
143     }
144   DBG("RIO READ: Want more");
145   return (rio->read_running ? HOOK_RETRY : HOOK_IDLE);
146 }
147
148 static void
149 rec_io_recalc_read(struct main_rec_io *rio)
150 {
151   uns flow = !rio->write_throttle_read || rio->write_watermark < rio->write_throttle_read;
152   uns run = rio->read_started && flow;
153   DBG("RIO: Recalc read (flow=%u, start=%u) -> %u", flow, rio->read_started, run);
154   if (run != rio->read_running)
155     {
156       if (run)
157         {
158           if (!rio->read_buf)
159             {
160               if (!rio->read_buf_size)
161                 rio->read_buf_size = 256;
162               rio->read_buf = xmalloc(rio->read_buf_size);
163               DBG("RIO: Created buffer (%u bytes)", rio->read_buf_size);
164               rio->read_rec_start = rio->read_buf;
165             }
166           rio->file.read_handler = rec_io_read_handler;
167           file_chg(&rio->file);
168           DBG("RIO: Reading started");
169         }
170       else
171         {
172           rio->file.read_handler = NULL;
173           file_chg(&rio->file);
174           DBG("RIO: Reading stopped");
175         }
176       rio->read_running = run;
177     }
178 }
179
180 void
181 rec_io_start_read(struct main_rec_io *rio)
182 {
183   ASSERT(clist_is_linked(&rio->file.n));
184   rio->read_started = 1;
185   rec_io_recalc_read(rio);
186 }
187
188 void
189 rec_io_stop_read(struct main_rec_io *rio)
190 {
191   ASSERT(clist_is_linked(&rio->file.n));
192   rio->read_started = 0;
193   rec_io_recalc_read(rio);
194 }
195
196 static void
197 rec_io_stop_write(struct main_rec_io *rio)
198 {
199   DBG("RIO WRITE: Stopping write");
200   ASSERT(!rio->write_watermark);
201   rio->file.write_handler = NULL;
202   file_chg(&rio->file);
203 }
204
205 static int
206 rec_io_write_handler(struct main_file *fi)
207 {
208   struct main_rec_io *rio = (struct main_rec_io *) fi;
209   struct rio_buffer *b = clist_head(&rio->busy_write_buffers);
210   if (!b)
211     {
212       rec_io_stop_write(rio);
213       return HOOK_IDLE;
214     }
215
216   int l = write(fi->fd, b->buf + b->written, b->full - b->written);
217   DBG("RIO WRITE: Written %d bytes", l);
218   if (l < 0)
219     {
220       if (errno != EINTR && errno != EAGAIN)
221         {
222           rec_io_stop_write(rio);
223           rio->notify_handler(rio, RIO_ERR_WRITE);
224         }
225       return HOOK_IDLE;
226     }
227   b->written += l;
228   if (b->written == b->full)
229     {
230       DBG("RIO WRITE: Written full buffer");
231       clist_remove(&b->n);
232       clist_add_tail(&rio->idle_write_buffers, &b->n);
233     }
234
235   rio->write_watermark -= l;
236   int ret = HOOK_RETRY;
237   if (!rio->write_watermark)
238     {
239       ret = HOOK_IDLE;
240       rec_io_stop_write(rio);
241     }
242   rec_io_recalc_read(rio);
243
244   // Call the hook, but carefully, because it can delete the RIO structure
245   if (rio->notify_handler(rio, rio->write_watermark ? RIO_EVENT_PART_WRITTEN : RIO_EVENT_ALL_WRITTEN) == HOOK_IDLE)
246     ret = HOOK_IDLE;
247   return ret;
248 }
249
250 static struct rio_buffer *
251 rec_io_get_buffer(struct main_rec_io *rio)
252 {
253   struct rio_buffer *b = clist_remove_tail(&rio->idle_write_buffers);
254   if (b)
255     DBG("RIO WRITE: Recycled old buffer");
256   else
257     {
258       if (!rio->write_buf_size)
259         rio->write_buf_size = 1024;
260       b = xmalloc(sizeof(struct rio_buffer) + rio->write_buf_size);
261       DBG("RIO WRITE: Allocated new buffer");
262     }
263   b->full = b->written = 0;
264   return b;
265 }
266
267 void
268 rec_io_write(struct main_rec_io *rio, void *data, uns len)
269 {
270   byte *bdata = data;
271   ASSERT(clist_is_linked(&rio->file.n));
272   if (!len)
273     return;
274
275   while (len)
276     {
277       struct rio_buffer *b = clist_tail(&rio->busy_write_buffers);
278       if (!b || b->full >= rio->write_buf_size)
279         {
280           b = rec_io_get_buffer(rio);
281           clist_add_tail(&rio->busy_write_buffers, &b->n);
282         }
283       uns l = MIN(len, rio->write_buf_size - b->full);
284       memcpy(b->buf + b->full, bdata, l);
285       b->full += l;
286       bdata += l;
287       len -= l;
288       rio->write_watermark += l;
289       DBG("RIO WRITE: Buffered %u bytes of data (total %u)", l, rio->write_watermark);
290       rec_io_recalc_read(rio);
291     }
292
293   if (!rio->file.write_handler)
294     {
295       DBG("RIO WRITE: Starting write");
296       rio->file.write_handler = rec_io_write_handler;
297       file_chg(&rio->file);
298     }
299 }
300
301 void
302 rec_io_set_timeout(struct main_rec_io *rio, timestamp_t expires_delta)
303 {
304   DBG("RIO: Setting timeout %u", (uns) expires_delta);
305   if (!expires_delta)
306     timer_del(&rio->timer);
307   else
308     timer_add_rel(&rio->timer, expires_delta);
309 }
310
311 uns
312 rec_io_parse_line(struct main_rec_io *rio)
313 {
314   for (uns i = rio->read_prev_avail; i < rio->read_avail; i++)
315     if (rio->read_rec_start[i] == '\n')
316       return i+1;
317   return 0;
318 }
319
320 #ifdef TEST
321
322 static uns rhand(struct main_rec_io *rio)
323 {
324   uns r = rec_io_parse_line(rio);
325   if (r)
326     {
327       rio->read_rec_start[r-1] = 0;
328       printf("Read <%s>\n", rio->read_rec_start);
329       rec_io_set_timeout(rio, 10000);
330       rio->read_rec_start[r-1] = '\n';
331       rec_io_write(rio, rio->read_rec_start, r);
332     }
333   return r;
334 }
335
336 static int ehand(struct main_rec_io *rio, int cause)
337 {
338   if (cause < 0 || cause == RIO_EVENT_EOF)
339     {
340       msg(L_ERROR, "Error %d", cause);
341       rec_io_del(rio);
342       main_shut_down();
343       return HOOK_IDLE;
344     }
345   else
346     {
347       msg(L_INFO, "Event %d", cause);
348       return HOOK_RETRY;
349     }
350 }
351
352 int
353 main(void)
354 {
355   log_init(NULL);
356   main_init();
357
358   struct main_rec_io rio = {};
359   rio.read_buf_size = 4;
360   rio.read_handler = rhand;
361   rio.notify_handler = ehand;
362   // rio.read_rec_max = 40;
363   rio.write_buf_size = 4;
364   rio.write_throttle_read = 6;
365   rec_io_add(&rio, 0);
366   rec_io_start_read(&rio);
367   rec_io_set_timeout(&rio, 10000);
368
369   main_debug();
370
371   main_loop();
372   msg(L_INFO, "Finished.");
373
374   if (clist_is_linked(&rio.file.n))
375     rec_io_del(&rio);
376   main_cleanup();
377   return 0;
378 }
379
380 #endif