]> mj.ucw.cz Git - libucw.git/blob - ucw/mainloop.c
Fix the selfpipe in mainloop
[libucw.git] / ucw / mainloop.c
1 /*
2  *      UCW Library -- Main Loop
3  *
4  *      (c) 2004--2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "ucw/lib.h"
13 #include "ucw/mainloop.h"
14
15 #include <stdio.h>
16 #include <string.h>
17 #include <unistd.h>
18 #include <signal.h>
19 #include <fcntl.h>
20 #include <errno.h>
21 #include <time.h>
22 #include <sys/poll.h>
23 #include <sys/wait.h>
24 #include <sys/time.h>
25
26 timestamp_t main_now;
27 ucw_time_t main_now_seconds;
28 timestamp_t main_idle_time;
29 uns main_shutdown;
30
31 clist main_timer_list, main_file_list, main_hook_list, main_process_list;
32 static uns main_file_cnt;
33 static uns main_poll_table_obsolete, main_poll_table_size;
34 static struct pollfd *main_poll_table;
35 static uns main_sigchld_set_up;
36 static volatile sig_atomic_t chld_received = 0;
37 static int sig_pipe_recv, sig_pipe_send;
38
39 void
40 main_get_time(void)
41 {
42   struct timeval tv;
43   gettimeofday(&tv, NULL);
44   main_now_seconds = tv.tv_sec;
45   main_now = (timestamp_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
46   // DBG("It's %lld o'clock", (long long) main_now);
47 }
48
49 void
50 main_init(void)
51 {
52   DBG("MAIN: Initializing");
53   clist_init(&main_timer_list);
54   clist_init(&main_file_list);
55   clist_init(&main_hook_list);
56   clist_init(&main_process_list);
57   main_file_cnt = 0;
58   main_poll_table_obsolete = 1;
59   main_get_time();
60 }
61
62 void
63 timer_add(struct main_timer *tm, timestamp_t expires)
64 {
65   if (expires)
66     DBG("MAIN: Setting timer %p (expire at now+%lld)", tm, (long long)(expires-main_now));
67   else
68     DBG("MAIN: Clearing timer %p", tm);
69   if (tm->expires)
70     clist_remove(&tm->n);
71   tm->expires = expires;
72   if (expires)
73     {
74       cnode *t = main_timer_list.head.next;
75       while (t != &main_timer_list.head && ((struct main_timer *) t)->expires < expires)
76         t = t->next;
77       clist_insert_before(&tm->n, t);
78     }
79 }
80
81 void
82 timer_del(struct main_timer *tm)
83 {
84   timer_add(tm, 0);
85 }
86
87 static void
88 file_timer_expired(struct main_timer *tm)
89 {
90   struct main_file *fi = tm->data;
91   timer_del(&fi->timer);
92   if (fi->error_handler)
93     fi->error_handler(fi, MFERR_TIMEOUT);
94 }
95
96 void
97 file_add(struct main_file *fi)
98 {
99   DBG("MAIN: Adding file %p (fd=%d)", fi, fi->fd);
100   ASSERT(!fi->n.next);
101   clist_add_tail(&main_file_list, &fi->n);
102   fi->timer.handler = file_timer_expired;
103   fi->timer.data = fi;
104   main_file_cnt++;
105   main_poll_table_obsolete = 1;
106   if (fcntl(fi->fd, F_SETFL, O_NONBLOCK) < 0)
107     msg(L_ERROR, "Error setting fd %d to non-blocking mode: %m. Keep fingers crossed.", fi->fd);
108 }
109
110 void
111 file_chg(struct main_file *fi)
112 {
113   struct pollfd *p = fi->pollfd;
114   if (p)
115     {
116       p->events = 0;
117       if (fi->read_handler)
118         p->events |= POLLIN | POLLHUP | POLLERR;
119       if (fi->write_handler)
120         p->events |= POLLOUT | POLLERR;
121     }
122 }
123
124 void
125 file_del(struct main_file *fi)
126 {
127   DBG("MAIN: Deleting file %p (fd=%d)", fi, fi->fd);
128   ASSERT(fi->n.next);
129   timer_del(&fi->timer);
130   clist_remove(&fi->n);
131   main_file_cnt--;
132   main_poll_table_obsolete = 1;
133   fi->n.next = fi->n.prev = NULL;
134 }
135
136 static int
137 file_read_handler(struct main_file *fi)
138 {
139   while (fi->rpos < fi->rlen)
140     {
141       int l = read(fi->fd, fi->rbuf + fi->rpos, fi->rlen - fi->rpos);
142       DBG("MAIN: FD %d: read %d", fi->fd, l);
143       if (l < 0)
144         {
145           if (errno != EINTR && errno != EAGAIN && fi->error_handler)
146             fi->error_handler(fi, MFERR_READ);
147           return 0;
148         }
149       else if (!l)
150         break;
151       fi->rpos += l;
152     }
153   DBG("MAIN: FD %d done read %d of %d", fi->fd, fi->rpos, fi->rlen);
154   fi->read_handler = NULL;
155   file_chg(fi);
156   fi->read_done(fi);
157   return 1;
158 }
159
160 static int
161 file_write_handler(struct main_file *fi)
162 {
163   while (fi->wpos < fi->wlen)
164     {
165       int l = write(fi->fd, fi->wbuf + fi->wpos, fi->wlen - fi->wpos);
166       DBG("MAIN: FD %d: write %d", fi->fd, l);
167       if (l < 0)
168         {
169           if (errno != EINTR && errno != EAGAIN && fi->error_handler)
170             fi->error_handler(fi, MFERR_WRITE);
171           return 0;
172         }
173       fi->wpos += l;
174     }
175   DBG("MAIN: FD %d done write %d", fi->fd, fi->wpos);
176   fi->write_handler = NULL;
177   file_chg(fi);
178   fi->write_done(fi);
179   return 1;
180 }
181
182 void
183 file_read(struct main_file *fi, void *buf, uns len)
184 {
185   ASSERT(fi->n.next);
186   if (len)
187     {
188       fi->read_handler = file_read_handler;
189       fi->rbuf = buf;
190       fi->rpos = 0;
191       fi->rlen = len;
192     }
193   else
194     {
195       fi->read_handler = NULL;
196       fi->rbuf = NULL;
197       fi->rpos = fi->rlen = 0;
198     }
199   file_chg(fi);
200 }
201
202 void
203 file_write(struct main_file *fi, void *buf, uns len)
204 {
205   ASSERT(fi->n.next);
206   if (len)
207     {
208       fi->write_handler = file_write_handler;
209       fi->wbuf = buf;
210       fi->wpos = 0;
211       fi->wlen = len;
212     }
213   else
214     {
215       fi->write_handler = NULL;
216       fi->wbuf = NULL;
217       fi->wpos = fi->wlen = 0;
218     }
219   file_chg(fi);
220 }
221
222 void
223 file_set_timeout(struct main_file *fi, timestamp_t expires)
224 {
225   ASSERT(fi->n.next);
226   timer_add(&fi->timer, expires);
227 }
228
229 void
230 file_close_all(void)
231 {
232   CLIST_FOR_EACH(struct main_file *, f, main_file_list)
233     close(f->fd);
234 }
235
236 void
237 hook_add(struct main_hook *ho)
238 {
239   DBG("MAIN: Adding hook %p", ho);
240   ASSERT(!ho->n.next);
241   clist_add_tail(&main_hook_list, &ho->n);
242 }
243
244 void
245 hook_del(struct main_hook *ho)
246 {
247   DBG("MAIN: Deleting hook %p", ho);
248   ASSERT(ho->n.next);
249   clist_remove(&ho->n);
250   ho->n.next = ho->n.prev = NULL;
251 }
252
253 static void
254 main_sigchld_handler(int x UNUSED)
255 {
256   int old_errno = errno;
257   DBG("SIGCHLD received");
258   chld_received = 1;
259   ssize_t result;
260   while((result = write(sig_pipe_send, "c", 1)) == -1 && errno == EINTR);
261   if(result == -1 && errno != EAGAIN)
262     die("Could not write to selfpipe: %m");
263   errno = old_errno;
264 }
265
266 static int
267 dummy_read_handler(struct main_file *mp)
268 {
269   char buffer[1024];
270   ssize_t result = read(mp->fd, buffer, 1024);
271   if(result == -1 && errno != EAGAIN && errno != EINTR)
272     die("Could not read from selfpipe: %m");
273   file_chg(mp);
274   return result == 1024;
275 }
276
277 static void
278 pipe_configure(int fd)
279 {
280   int flags;
281   if((flags = fcntl(fd, F_GETFL)) == -1 || fcntl(fd, F_SETFL, flags|O_NONBLOCK))
282     die("Could not set file descriptor %d to non-blocking: %m", fd);
283   if((flags = fcntl(fd, F_GETFD)) == -1 || fcntl(fd, F_SETFD, flags|O_CLOEXEC))
284     die("Could not set file descriptor %d to close-on-exec: %m", fd);
285 }
286
287 void
288 process_add(struct main_process *mp)
289 {
290   DBG("MAIN: Adding process %p (pid=%d)", mp, mp->pid);
291   ASSERT(!mp->n.next);
292   ASSERT(mp->handler);
293   clist_add_tail(&main_process_list, &mp->n);
294   if (!main_sigchld_set_up)
295     {
296       int pipe_result[2];
297       if(pipe(pipe_result) == -1)
298         die("Could not create selfpipe:%m");
299       pipe_configure(pipe_result[0]);
300       pipe_configure(pipe_result[1]);
301       sig_pipe_recv = pipe_result[0];
302       sig_pipe_send = pipe_result[1];
303       static struct main_file self_pipe;
304       self_pipe = (struct main_file) {
305         .fd = sig_pipe_recv,
306         .read_handler = dummy_read_handler
307       };
308       file_add(&self_pipe);
309       struct sigaction sa;
310       bzero(&sa, sizeof(sa));
311       sa.sa_handler = main_sigchld_handler;
312       sa.sa_flags = SA_NOCLDSTOP | SA_RESTART;
313       sigaction(SIGCHLD, &sa, NULL);
314       main_sigchld_set_up = 1;
315       chld_received = 1; // The signal may have come before the handler
316     }
317 }
318
319 void
320 process_del(struct main_process *mp)
321 {
322   DBG("MAIN: Deleting process %p (pid=%d)", mp, mp->pid);
323   ASSERT(mp->n.next);
324   clist_remove(&mp->n);
325   mp->n.next = NULL;
326 }
327
328 int
329 process_fork(struct main_process *mp)
330 {
331   pid_t pid = fork();
332   if (pid < 0)
333     {
334       DBG("MAIN: Fork failed");
335       mp->status = -1;
336       format_exit_status(mp->status_msg, -1);
337       mp->handler(mp);
338       return 1;
339     }
340   else if (!pid)
341     return 0;
342   else
343     {
344       DBG("MAIN: Forked process %d", (int) pid);
345       mp->pid = pid;
346       process_add(mp);
347       return 1;
348     }
349 }
350
351 void
352 main_debug(void)
353 {
354 #ifdef CONFIG_DEBUG
355   msg(L_DEBUG, "### Main loop status on %lld", (long long)main_now);
356   msg(L_DEBUG, "\tActive timers:");
357   struct main_timer *tm;
358   CLIST_WALK(tm, main_timer_list)
359     msg(L_DEBUG, "\t\t%p (expires %lld, data %p)", tm, (long long)(tm->expires ? tm->expires-main_now : 999999), tm->data);
360   struct main_file *fi;
361   msg(L_DEBUG, "\tActive files:");
362   CLIST_WALK(fi, main_file_list)
363     msg(L_DEBUG, "\t\t%p (fd %d, rh %p, wh %p, eh %p, expires %lld, data %p)",
364         fi, fi->fd, fi->read_handler, fi->write_handler, fi->error_handler,
365         (long long)(fi->timer.expires ? fi->timer.expires-main_now : 999999), fi->data);
366   msg(L_DEBUG, "\tActive hooks:");
367   struct main_hook *ho;
368   CLIST_WALK(ho, main_hook_list)
369     msg(L_DEBUG, "\t\t%p (func %p, data %p)", ho, ho->handler, ho->data);
370   msg(L_DEBUG, "\tActive processes:");
371   struct main_process *pr;
372   CLIST_WALK(pr, main_process_list)
373     msg(L_DEBUG, "\t\t%p (pid %d, data %p)", pr, pr->pid, pr->data);
374 #endif
375 }
376
377 static void
378 main_rebuild_poll_table(void)
379 {
380   struct main_file *fi;
381   if (main_poll_table_size < main_file_cnt)
382     {
383       if (main_poll_table)
384         xfree(main_poll_table);
385       else
386         main_poll_table_size = 1;
387       while (main_poll_table_size < main_file_cnt)
388         main_poll_table_size *= 2;
389       main_poll_table = xmalloc(sizeof(struct pollfd) * main_poll_table_size);
390     }
391   struct pollfd *p = main_poll_table;
392   DBG("MAIN: Rebuilding poll table: %d of %d entries set", main_file_cnt, main_poll_table_size);
393   CLIST_WALK(fi, main_file_list)
394     {
395       p->fd = fi->fd;
396       fi->pollfd = p++;
397       file_chg(fi);
398     }
399   main_poll_table_obsolete = 0;
400 }
401
402 void
403 main_loop(void)
404 {
405   DBG("MAIN: Entering main_loop");
406   ASSERT(main_timer_list.head.next);
407
408   struct main_file *fi;
409   struct main_hook *ho;
410   struct main_timer *tm;
411   struct main_process *pr;
412   cnode *tmp;
413
414   main_get_time();
415   for (;;)
416     {
417       timestamp_t wake = main_now + 1000000000;
418       while ((tm = clist_head(&main_timer_list)) && tm->expires <= main_now)
419         {
420           DBG("MAIN: Timer %p expired at now-%lld", tm, (long long)(main_now - tm->expires));
421           tm->handler(tm);
422         }
423       int hook_min = HOOK_RETRY;
424       int hook_max = HOOK_SHUTDOWN;
425       CLIST_WALK_DELSAFE(ho, main_hook_list, tmp)
426         {
427           DBG("MAIN: Hook %p", ho);
428           int ret = ho->handler(ho);
429           hook_min = MIN(hook_min, ret);
430           hook_max = MAX(hook_max, ret);
431         }
432       if (hook_min == HOOK_SHUTDOWN ||
433           hook_min == HOOK_DONE && hook_max == HOOK_DONE ||
434           main_shutdown)
435         {
436           DBG("MAIN: Shut down by %s", main_shutdown ? "main_shutdown" : "a hook");
437           return;
438         }
439       if (hook_max == HOOK_RETRY)
440         wake = 0;
441       if (main_poll_table_obsolete)
442         main_rebuild_poll_table();
443       if (chld_received && !clist_empty(&main_process_list))
444         {
445           int stat;
446           pid_t pid;
447           wake = MIN(wake, main_now + 10000);
448           chld_received = 0;
449           while ((pid = waitpid(-1, &stat, WNOHANG)) > 0)
450             {
451               DBG("MAIN: Child %d exited with status %x", pid, stat);
452               CLIST_WALK(pr, main_process_list)
453                 if (pr->pid == pid)
454                   {
455                     pr->status = stat;
456                     process_del(pr);
457                     format_exit_status(pr->status_msg, pr->status);
458                     DBG("MAIN: Calling process exit handler");
459                     pr->handler(pr);
460                     break;
461                   }
462               wake = 0;
463             }
464         }
465       if ((tm = clist_head(&main_timer_list)) && tm->expires < wake)
466         wake = tm->expires;
467       main_get_time();
468       int timeout = (wake ? wake - main_now : 0);
469       DBG("MAIN: Poll for %d fds and timeout %d ms", main_file_cnt, timeout);
470       int p = poll(main_poll_table, main_file_cnt, timeout);
471       timestamp_t old_now = main_now;
472       main_get_time();
473       main_idle_time += main_now - old_now;
474       if (p > 0)
475         {
476           struct pollfd *p = main_poll_table;
477           CLIST_WALK(fi, main_file_list)
478             {
479               if (p->revents & (POLLIN | POLLHUP | POLLERR))
480                 {
481                   do
482                     DBG("MAIN: Read event on fd %d", p->fd);
483                   while (fi->read_handler && fi->read_handler(fi) && !main_poll_table_obsolete);
484                   if (main_poll_table_obsolete) /* File entries have been inserted or deleted => better not risk continuing to nowhere */
485                     break;
486                 }
487               if (p->revents & (POLLOUT | POLLERR))
488                 {
489                   do
490                     DBG("MAIN: Write event on fd %d", p->fd);
491                   while (fi->write_handler && fi->write_handler(fi) && !main_poll_table_obsolete);
492                   if (main_poll_table_obsolete)
493                     break;
494                 }
495               p++;
496             }
497         }
498     }
499 }
500
501 #ifdef TEST
502
503 static struct main_process mp;
504 static struct main_file fin, fout;
505 static struct main_hook hook;
506 static struct main_timer tm;
507
508 static byte rb[16];
509
510 static void dread(struct main_file *fi)
511 {
512   if (fi->rpos < fi->rlen)
513     {
514       msg(L_INFO, "Read EOF");
515       file_del(fi);
516     }
517   else
518     {
519       msg(L_INFO, "Read done");
520       file_read(fi, rb, sizeof(rb));
521     }
522 }
523
524 static void derror(struct main_file *fi, int cause)
525 {
526   msg(L_INFO, "Error: %m !!! (cause %d)", cause);
527   file_del(fi);
528 }
529
530 static void dwrite(struct main_file *fi UNUSED)
531 {
532   msg(L_INFO, "Write done");
533 }
534
535 static int dhook(struct main_hook *ho UNUSED)
536 {
537   msg(L_INFO, "Hook called");
538   return 0;
539 }
540
541 static void dtimer(struct main_timer *tm)
542 {
543   msg(L_INFO, "Timer tick");
544   timer_add(tm, main_now + 10000);
545 }
546
547 static void dentry(void)
548 {
549   msg(L_INFO, "*** SUBPROCESS START ***");
550   sleep(2);
551   msg(L_INFO, "*** SUBPROCESS FINISH ***");
552   exit(0);
553 }
554
555 static void dexit(struct main_process *pr)
556 {
557   msg(L_INFO, "Subprocess %d exited with status %x", pr->pid, pr->status);
558 }
559
560 int
561 main(void)
562 {
563   log_init(NULL);
564   main_init();
565
566   fin.fd = 0;
567   fin.read_done = dread;
568   fin.error_handler = derror;
569   file_add(&fin);
570   file_read(&fin, rb, sizeof(rb));
571
572   fout.fd = 1;
573   fout.write_done = dwrite;
574   fout.error_handler = derror;
575   file_add(&fout);
576   file_write(&fout, "Hello, world!\n", 14);
577
578   hook.handler = dhook;
579   hook_add(&hook);
580
581   tm.handler = dtimer;
582   timer_add(&tm, main_now + 1000);
583
584   mp.handler = dexit;
585   if (!process_fork(&mp))
586     dentry();
587
588   main_debug();
589
590   main_loop();
591   msg(L_INFO, "Finished.");
592 }
593
594 #endif