]> mj.ucw.cz Git - libucw.git/blob - ucw/mainloop.c
90ee874d6ab737df1301b302ba472a9435cadb75
[libucw.git] / ucw / mainloop.c
1 /*
2  *      UCW Library -- Main Loop
3  *
4  *      (c) 2004--2010 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "ucw/lib.h"
13 #include "ucw/heap.h"
14 #include "ucw/mainloop.h"
15 #include "ucw/threads.h"
16 #include "ucw/gary.h"
17
18 #include <stdio.h>
19 #include <string.h>
20 #include <stdlib.h>
21 #include <unistd.h>
22 #include <signal.h>
23 #include <fcntl.h>
24 #include <errno.h>
25 #include <time.h>
26 #include <sys/poll.h>
27 #include <sys/wait.h>
28 #include <sys/time.h>
29
30 #define MAIN_TIMER_LESS(x,y) ((x)->expires < (y)->expires)
31 #define MAIN_TIMER_SWAP(heap,a,b,t) (t=heap[a], heap[a]=heap[b], heap[b]=t, heap[a]->index=(a), heap[b]->index=(b))
32
33 // FIXME: Delivery of signals to threads?
34 static uns main_sigchld_set_up;
35 static volatile sig_atomic_t chld_received = 0;
36
37 #ifdef O_CLOEXEC
38 // On recent Linux systems, O_CLOEXEC flag is available and we can get around
39 // the race condition of poll().
40 #define USE_SELF_PIPE
41 static int sig_pipe_recv, sig_pipe_send;
42 #endif
43
44 static void
45 do_main_get_time(struct main_context *m)
46 {
47   struct timeval tv;
48   gettimeofday(&tv, NULL);
49   m->now_seconds = tv.tv_sec;
50   m->now = (timestamp_t)tv.tv_sec * 1000 + tv.tv_usec / 1000;
51 }
52
53 struct main_context *
54 main_new(void)
55 {
56   struct main_context *m = xmalloc_zero(sizeof(*m));
57
58   DBG("MAIN: New context");
59   clist_init(&m->file_list);
60   clist_init(&m->hook_list);
61   clist_init(&m->hook_done_list);
62   clist_init(&m->process_list);
63   m->poll_table_obsolete = 1;
64   do_main_get_time(m);
65
66   return m;
67 }
68
69 void
70 main_delete(struct main_context *m)
71 {
72   ASSERT(clist_empty(&m->file_list));
73   ASSERT(clist_empty(&m->hook_list));
74   ASSERT(clist_empty(&m->hook_done_list));
75   ASSERT(clist_empty(&m->process_list));
76   if (m->timer_table)
77     GARY_FREE(m->timer_table);
78   // FIXME: Free poll table
79   xfree(m);
80 }
81
82 struct main_context *
83 main_switch_context(struct main_context *m)
84 {
85   struct ucwlib_context *c = ucwlib_thread_context();
86   struct main_context *m0 = c->main_context;
87   c->main_context = m;
88   return m0;
89 }
90
91 struct main_context *
92 main_current(void)
93 {
94   struct ucwlib_context *c = ucwlib_thread_context();
95   struct main_context *m = c->main_context;
96   ASSERT(m);
97   return m;
98 }
99
100 void
101 main_init(void)
102 {
103   struct main_context *m = main_switch_context(main_new());
104   ASSERT(!m);
105 }
106
107 void
108 main_cleanup(void)
109 {
110   struct main_context *m = main_switch_context(NULL);
111   main_delete(m);
112 }
113
114 void
115 main_get_time(void)
116 {
117   do_main_get_time(main_current());
118 }
119
120 static inline uns
121 count_timers(struct main_context *m)
122 {
123   return GARY_SIZE(m->timer_table) - 1;
124 }
125
126 void
127 timer_add(struct main_timer *tm, timestamp_t expires)
128 {
129   struct main_context *m = main_current();
130
131   if (!m->timer_table)
132     {
133       GARY_INIT(m->timer_table, 1);
134       m->timer_table[0] = NULL;
135     }
136
137   if (expires)
138     DBG("MAIN: Setting timer %p (expire at now+%lld)", tm, (long long)(expires - m->now));
139   else
140     DBG("MAIN: Clearing timer %p", tm);
141   uns num_timers = count_timers(m);
142   if (tm->expires < expires)
143     {
144       if (!tm->expires)
145         {
146           tm->expires = expires;
147           tm->index = num_timers + 1;
148           *GARY_PUSH(m->timer_table, 1) = tm;
149           HEAP_INSERT(struct main_timer *, m->timer_table, tm->index, MAIN_TIMER_LESS, MAIN_TIMER_SWAP);
150         }
151       else
152         {
153           tm->expires = expires;
154           HEAP_INCREASE(struct main_timer *, m->timer_table, num_timers, MAIN_TIMER_LESS, MAIN_TIMER_SWAP, tm->index);
155         }
156     }
157   else if (tm->expires > expires)
158     {
159       if (!expires)
160         {
161           ASSERT(tm->index && tm->index <= num_timers);
162           HEAP_DELETE(struct main_timer *, m->timer_table, num_timers, MAIN_TIMER_LESS, MAIN_TIMER_SWAP, tm->index);
163           tm->index = 0;
164           tm->expires = 0;
165           GARY_POP(m->timer_table, 1);
166         }
167       else
168         {
169           tm->expires = expires;
170           HEAP_DECREASE(struct main_timer *, m->timer_table, num_timers, MAIN_TIMER_LESS, MAIN_TIMER_SWAP, tm->index);
171         }
172     }
173 }
174
175 void
176 timer_del(struct main_timer *tm)
177 {
178   timer_add(tm, 0);
179 }
180
181 static void
182 file_timer_expired(struct main_timer *tm)
183 {
184   struct main_file *fi = tm->data;
185   timer_del(&fi->timer);
186   if (fi->error_handler)
187     fi->error_handler(fi, MFERR_TIMEOUT);
188 }
189
190 void
191 file_add(struct main_file *fi)
192 {
193   struct main_context *m = main_current();
194
195   DBG("MAIN: Adding file %p (fd=%d)", fi, fi->fd);
196   ASSERT(!fi->n.next);
197   clist_add_tail(&m->file_list, &fi->n);
198   fi->timer.handler = file_timer_expired;
199   fi->timer.data = fi;
200   m->file_cnt++;
201   m->poll_table_obsolete = 1;
202   if (fcntl(fi->fd, F_SETFL, O_NONBLOCK) < 0)
203     msg(L_ERROR, "Error setting fd %d to non-blocking mode: %m. Keep fingers crossed.", fi->fd);
204 }
205
206 void
207 file_chg(struct main_file *fi)
208 {
209   struct pollfd *p = fi->pollfd;
210   if (p)
211     {
212       p->events = 0;
213       if (fi->read_handler)
214         p->events |= POLLIN | POLLHUP | POLLERR;
215       if (fi->write_handler)
216         p->events |= POLLOUT | POLLERR;
217     }
218 }
219
220 void
221 file_del(struct main_file *fi)
222 {
223   struct main_context *m = main_current();
224
225   DBG("MAIN: Deleting file %p (fd=%d)", fi, fi->fd);
226   ASSERT(fi->n.next);
227   timer_del(&fi->timer);
228   clist_remove(&fi->n);
229   m->file_cnt--;
230   m->poll_table_obsolete = 1;
231   fi->n.next = fi->n.prev = NULL;
232 }
233
234 static int
235 file_read_handler(struct main_file *fi)
236 {
237   while (fi->rpos < fi->rlen)
238     {
239       int l = read(fi->fd, fi->rbuf + fi->rpos, fi->rlen - fi->rpos);
240       DBG("MAIN: FD %d: read %d", fi->fd, l);
241       if (l < 0)
242         {
243           if (errno != EINTR && errno != EAGAIN && fi->error_handler)
244             fi->error_handler(fi, MFERR_READ);
245           return 0;
246         }
247       else if (!l)
248         break;
249       fi->rpos += l;
250     }
251   DBG("MAIN: FD %d done read %d of %d", fi->fd, fi->rpos, fi->rlen);
252   fi->read_handler = NULL;
253   file_chg(fi);
254   fi->read_done(fi);
255   return 1;
256 }
257
258 static int
259 file_write_handler(struct main_file *fi)
260 {
261   while (fi->wpos < fi->wlen)
262     {
263       int l = write(fi->fd, fi->wbuf + fi->wpos, fi->wlen - fi->wpos);
264       DBG("MAIN: FD %d: write %d", fi->fd, l);
265       if (l < 0)
266         {
267           if (errno != EINTR && errno != EAGAIN && fi->error_handler)
268             fi->error_handler(fi, MFERR_WRITE);
269           return 0;
270         }
271       fi->wpos += l;
272     }
273   DBG("MAIN: FD %d done write %d", fi->fd, fi->wpos);
274   fi->write_handler = NULL;
275   file_chg(fi);
276   fi->write_done(fi);
277   return 1;
278 }
279
280 void
281 file_read(struct main_file *fi, void *buf, uns len)
282 {
283   ASSERT(fi->n.next);
284   if (len)
285     {
286       fi->read_handler = file_read_handler;
287       fi->rbuf = buf;
288       fi->rpos = 0;
289       fi->rlen = len;
290     }
291   else
292     {
293       fi->read_handler = NULL;
294       fi->rbuf = NULL;
295       fi->rpos = fi->rlen = 0;
296     }
297   file_chg(fi);
298 }
299
300 void
301 file_write(struct main_file *fi, void *buf, uns len)
302 {
303   ASSERT(fi->n.next);
304   if (len)
305     {
306       fi->write_handler = file_write_handler;
307       fi->wbuf = buf;
308       fi->wpos = 0;
309       fi->wlen = len;
310     }
311   else
312     {
313       fi->write_handler = NULL;
314       fi->wbuf = NULL;
315       fi->wpos = fi->wlen = 0;
316     }
317   file_chg(fi);
318 }
319
320 void
321 file_set_timeout(struct main_file *fi, timestamp_t expires)
322 {
323   ASSERT(fi->n.next);
324   timer_add(&fi->timer, expires);
325 }
326
327 void
328 file_close_all(void)
329 {
330   struct main_context *m = main_current();
331
332   CLIST_FOR_EACH(struct main_file *, f, m->file_list)
333     close(f->fd);
334 }
335
336 void
337 hook_add(struct main_hook *ho)
338 {
339   struct main_context *m = main_current();
340
341   DBG("MAIN: Adding hook %p", ho);
342   ASSERT(!ho->n.next);
343   clist_add_tail(&m->hook_list, &ho->n);
344 }
345
346 void
347 hook_del(struct main_hook *ho)
348 {
349   DBG("MAIN: Deleting hook %p", ho);
350   ASSERT(ho->n.next);
351   clist_remove(&ho->n);
352   ho->n.next = ho->n.prev = NULL;
353 }
354
355 #ifdef USE_SELF_PIPE
356 static void
357 main_sigchld_handler(int x UNUSED)
358 {
359   int old_errno = errno;
360   DBG("SIGCHLD received");
361   chld_received = 1;
362   ssize_t result;
363   while((result = write(sig_pipe_send, "c", 1)) == -1 && errno == EINTR);
364   if(result == -1 && errno != EAGAIN)
365     msg(L_SIGHANDLER|L_ERROR, "Could not write to self-pipe: %m");
366   errno = old_errno;
367 }
368
369 static int
370 dummy_read_handler(struct main_file *mp)
371 {
372   char buffer[1024];
373   ssize_t result = read(mp->fd, buffer, 1024);
374   if(result == -1 && errno != EAGAIN)
375     msg(L_ERROR, "Could not read from selfpipe: %m");
376   file_chg(mp);
377   return result == 1024;
378 }
379
380 static void
381 pipe_configure(int fd)
382 {
383   int flags;
384   if((flags = fcntl(fd, F_GETFL)) == -1 || fcntl(fd, F_SETFL, flags|O_NONBLOCK))
385     die("Could not set file descriptor %d to non-blocking: %m", fd);
386   if((flags = fcntl(fd, F_GETFD)) == -1 || fcntl(fd, F_SETFD, flags|O_CLOEXEC))
387     die("Could not set file descriptor %d to close-on-exec: %m", fd);
388 }
389 #else
390 static void
391 main_sigchld_handler(int x UNUSED)
392 {
393   DBG("SIGCHLD received");
394 }
395 #endif
396
397 void
398 process_add(struct main_process *mp)
399 {
400   struct main_context *m = main_current();
401
402   DBG("MAIN: Adding process %p (pid=%d)", mp, mp->pid);
403   ASSERT(!mp->n.next);
404   ASSERT(mp->handler);
405   clist_add_tail(&m->process_list, &mp->n);
406   if (!main_sigchld_set_up)
407     {
408 #ifdef USE_SELF_PIPE
409       int pipe_result[2];
410       if(pipe(pipe_result) == -1)
411         die("Could not create selfpipe:%m");
412       pipe_configure(pipe_result[0]);
413       pipe_configure(pipe_result[1]);
414       sig_pipe_recv = pipe_result[0];
415       sig_pipe_send = pipe_result[1];
416       static struct main_file self_pipe;
417       self_pipe = (struct main_file) {
418         .fd = sig_pipe_recv,
419         .read_handler = dummy_read_handler
420       };
421       file_add(&self_pipe);
422 #endif
423       struct sigaction sa;
424       bzero(&sa, sizeof(sa));
425       sa.sa_handler = main_sigchld_handler;
426       sa.sa_flags = SA_NOCLDSTOP | SA_RESTART;
427       sigaction(SIGCHLD, &sa, NULL);
428       main_sigchld_set_up = 1;
429       chld_received = 1; // The signal may have come before the handler
430     }
431 }
432
433 void
434 process_del(struct main_process *mp)
435 {
436   DBG("MAIN: Deleting process %p (pid=%d)", mp, mp->pid);
437   ASSERT(mp->n.next);
438   clist_remove(&mp->n);
439   mp->n.next = NULL;
440 }
441
442 int
443 process_fork(struct main_process *mp)
444 {
445   pid_t pid = fork();
446   if (pid < 0)
447     {
448       DBG("MAIN: Fork failed");
449       mp->status = -1;
450       format_exit_status(mp->status_msg, -1);
451       mp->handler(mp);
452       return 1;
453     }
454   else if (!pid)
455     return 0;
456   else
457     {
458       DBG("MAIN: Forked process %d", (int) pid);
459       mp->pid = pid;
460       process_add(mp);
461       return 1;
462     }
463 }
464
465 void
466 main_debug_context(struct main_context *m UNUSED)
467 {
468 #ifdef CONFIG_DEBUG
469   msg(L_DEBUG, "### Main loop status on %lld", (long long) m->now);
470   msg(L_DEBUG, "\tActive timers:");
471   uns num_timers = count_timers(m);
472   for (uns i = 1; i <= num_timers; i++)
473     {
474       struct main_timer *tm = m->timer_table[i];
475       msg(L_DEBUG, "\t\t%p (expires %lld, data %p)", tm, (long long)(tm->expires ? tm->expires - m->now : 999999), tm->data);
476     }
477   struct main_file *fi;
478   msg(L_DEBUG, "\tActive files:");
479   CLIST_WALK(fi, m->file_list)
480     msg(L_DEBUG, "\t\t%p (fd %d, rh %p, wh %p, eh %p, expires %lld, data %p)",
481         fi, fi->fd, fi->read_handler, fi->write_handler, fi->error_handler,
482         (long long)(fi->timer.expires ? fi->timer.expires - m->now : 999999), fi->data);
483   msg(L_DEBUG, "\tActive hooks:");
484   struct main_hook *ho;
485   CLIST_WALK(ho, m->hook_done_list)
486     msg(L_DEBUG, "\t\t%p (func %p, data %p)", ho, ho->handler, ho->data);
487   CLIST_WALK(ho, m->hook_list)
488     msg(L_DEBUG, "\t\t%p (func %p, data %p)", ho, ho->handler, ho->data);
489   msg(L_DEBUG, "\tActive processes:");
490   struct main_process *pr;
491   CLIST_WALK(pr, m->process_list)
492     msg(L_DEBUG, "\t\t%p (pid %d, data %p)", pr, pr->pid, pr->data);
493 #endif
494 }
495
496 static void
497 main_rebuild_poll_table(struct main_context *m)
498 {
499   struct main_file *fi;
500   if (m->poll_table_size < m->file_cnt)
501     {
502       if (m->poll_table)
503         xfree(m->poll_table);
504       else
505         m->poll_table_size = 1;
506       while (m->poll_table_size < m->file_cnt)
507         m->poll_table_size *= 2;
508       m->poll_table = xmalloc(sizeof(struct pollfd) * m->poll_table_size);
509     }
510   struct pollfd *p = m->poll_table;
511   DBG("MAIN: Rebuilding poll table: %d of %d entries set", m->file_cnt, m->poll_table_size);
512   CLIST_WALK(fi, m->file_list)
513     {
514       p->fd = fi->fd;
515       fi->pollfd = p++;
516       file_chg(fi);
517     }
518   m->poll_table_obsolete = 0;
519 }
520
521 void
522 main_loop(void)
523 {
524   DBG("MAIN: Entering main_loop");
525   struct main_context *m = main_current();
526
527   struct main_file *fi;
528   struct main_hook *ho;
529   struct main_timer *tm;
530   struct main_process *pr;
531
532   do_main_get_time(m);
533   for (;;)
534     {
535       timestamp_t wake = m->now + 1000000000;
536       while (GARY_SIZE(m->timer_table) > 1 && (tm = m->timer_table[1])->expires <= m->now)
537         {
538           DBG("MAIN: Timer %p expired at now-%lld", tm, (long long)(m->now - tm->expires));
539           tm->handler(tm);
540         }
541       int hook_min = HOOK_RETRY;
542       int hook_max = HOOK_SHUTDOWN;
543       while (ho = clist_remove_head(&m->hook_list))
544         {
545           clist_add_tail(&m->hook_done_list, &ho->n);
546           DBG("MAIN: Hook %p", ho);
547           int ret = ho->handler(ho);
548           hook_min = MIN(hook_min, ret);
549           hook_max = MAX(hook_max, ret);
550         }
551       clist_move(&m->hook_list, &m->hook_done_list);
552       if (hook_min == HOOK_SHUTDOWN ||
553           hook_min == HOOK_DONE && hook_max == HOOK_DONE ||
554           m->shutdown)
555         {
556           DBG("MAIN: Shut down by %s", m->shutdown ? "main_shutdown" : "a hook");
557           return;
558         }
559       if (hook_max == HOOK_RETRY)
560         wake = 0;
561       if (m->poll_table_obsolete)
562         main_rebuild_poll_table(m);
563 #ifndef USE_SELF_PIPE
564       // We don't have a reliable flag without the self-pipe.
565       chld_received = 1;
566 #endif
567       if (chld_received && !clist_empty(&m->process_list))
568         {
569           int stat;
570           pid_t pid;
571           wake = MIN(wake, m->now + 10000);
572           chld_received = 0;
573           while ((pid = waitpid(-1, &stat, WNOHANG)) > 0)
574             {
575               DBG("MAIN: Child %d exited with status %x", pid, stat);
576               CLIST_WALK(pr, m->process_list)
577                 if (pr->pid == pid)
578                   {
579                     pr->status = stat;
580                     process_del(pr);
581                     format_exit_status(pr->status_msg, pr->status);
582                     DBG("MAIN: Calling process exit handler");
583                     pr->handler(pr);
584                     break;
585                   }
586               wake = 0;
587             }
588         }
589       if (count_timers(m) && (tm = m->timer_table[1])->expires < wake)
590         wake = tm->expires;
591       do_main_get_time(m);
592       int timeout = ((wake > m->now) ? wake - m->now : 0);
593       DBG("MAIN: Poll for %d fds and timeout %d ms", m->file_cnt, timeout);
594       int p = poll(m->poll_table, m->file_cnt, timeout);
595       timestamp_t old_now = m->now;
596       do_main_get_time(m);
597       m->idle_time += m->now - old_now;
598       if (p > 0)
599         {
600           struct pollfd *p = m->poll_table;
601           CLIST_WALK(fi, m->file_list)
602             {
603               if (p->revents & (POLLIN | POLLHUP | POLLERR))
604                 {
605                   do
606                     DBG("MAIN: Read event on fd %d", p->fd);
607                   while (fi->read_handler && fi->read_handler(fi) && !m->poll_table_obsolete);
608                   if (m->poll_table_obsolete)   /* File entries have been inserted or deleted => better not risk continuing to nowhere */
609                     break;
610                 }
611               if (p->revents & (POLLOUT | POLLERR))
612                 {
613                   do
614                     DBG("MAIN: Write event on fd %d", p->fd);
615                   while (fi->write_handler && fi->write_handler(fi) && !m->poll_table_obsolete);
616                   if (m->poll_table_obsolete)
617                     break;
618                 }
619               p++;
620             }
621         }
622     }
623 }
624
625 #ifdef TEST
626
627 static struct main_process mp;
628 static struct main_file fin, fout;
629 static struct main_hook hook;
630 static struct main_timer tm;
631
632 static byte rb[16];
633
634 static void dread(struct main_file *fi)
635 {
636   if (fi->rpos < fi->rlen)
637     {
638       msg(L_INFO, "Read EOF");
639       file_del(fi);
640     }
641   else
642     {
643       msg(L_INFO, "Read done");
644       file_read(fi, rb, sizeof(rb));
645     }
646 }
647
648 static void derror(struct main_file *fi, int cause)
649 {
650   msg(L_INFO, "Error: %m !!! (cause %d)", cause);
651   file_del(fi);
652 }
653
654 static void dwrite(struct main_file *fi UNUSED)
655 {
656   msg(L_INFO, "Write done");
657 }
658
659 static int dhook(struct main_hook *ho UNUSED)
660 {
661   msg(L_INFO, "Hook called");
662   return 0;
663 }
664
665 static void dtimer(struct main_timer *tm)
666 {
667   msg(L_INFO, "Timer tick");
668   timer_add(tm, main_get_now() + 10000);
669 }
670
671 static void dentry(void)
672 {
673   msg(L_INFO, "*** SUBPROCESS START ***");
674   sleep(2);
675   msg(L_INFO, "*** SUBPROCESS FINISH ***");
676   exit(0);
677 }
678
679 static void dexit(struct main_process *pr)
680 {
681   msg(L_INFO, "Subprocess %d exited with status %x", pr->pid, pr->status);
682 }
683
684 int
685 main(void)
686 {
687   log_init(NULL);
688   main_init();
689
690   fin.fd = 0;
691   fin.read_done = dread;
692   fin.error_handler = derror;
693   file_add(&fin);
694   file_read(&fin, rb, sizeof(rb));
695
696   fout.fd = 1;
697   fout.write_done = dwrite;
698   fout.error_handler = derror;
699   file_add(&fout);
700   file_write(&fout, "Hello, world!\n", 14);
701
702   hook.handler = dhook;
703   hook_add(&hook);
704
705   tm.handler = dtimer;
706   timer_add(&tm, main_get_now() + 1000);
707
708   mp.handler = dexit;
709   if (!process_fork(&mp))
710     dentry();
711
712   main_debug();
713
714   main_loop();
715   msg(L_INFO, "Finished.");
716 }
717
718 #endif