X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=ucw%2Fmainloop.c;h=4138aae1ec237aa42d79d90a56f6833f901b42b4;hb=a1fdee4766b969fd0786e3286ce878a2a1142bbd;hp=4c8e2cd43bd3835330c81f8a7881f6de4d9f45ca;hpb=e8060fa4647c5c4ccf07efb91a35f65c543dfc4f;p=libucw.git diff --git a/ucw/mainloop.c b/ucw/mainloop.c index 4c8e2cd4..4138aae1 100644 --- a/ucw/mainloop.c +++ b/ucw/mainloop.c @@ -10,6 +10,7 @@ #undef LOCAL_DEBUG #include "ucw/lib.h" +#include "ucw/heap.h" #include "ucw/mainloop.h" #include @@ -25,13 +26,30 @@ timestamp_t main_now; ucw_time_t main_now_seconds; +timestamp_t main_idle_time; uns main_shutdown; -clist main_timer_list, main_file_list, main_hook_list, main_process_list; +#define GBUF_PREFIX(x) main_timer_table_##x +#define GBUF_TYPE struct main_timer * +#include "ucw/gbuf.h" +static uns main_timer_cnt; +static main_timer_table_t main_timer_table; +#define MAIN_TIMER_LESS(x,y) ((x)->expires < (y)->expires) +#define MAIN_TIMER_SWAP(heap,a,b,t) (t=heap[a], heap[a]=heap[b], heap[b]=t, heap[a]->index=(a), heap[b]->index=(b)) + +clist main_file_list, main_hook_list, main_hook_done_list, main_process_list; static uns main_file_cnt; static uns main_poll_table_obsolete, main_poll_table_size; static struct pollfd *main_poll_table; static uns main_sigchld_set_up; +static volatile sig_atomic_t chld_received = 0; + +#ifdef O_CLOEXEC +// On recent Linux systems, O_CLOEXEC flag is available and we can get around +// the race condition of poll(). +#define USE_SELF_PIPE +static int sig_pipe_recv, sig_pipe_send; +#endif void main_get_time(void) @@ -47,9 +65,10 @@ void main_init(void) { DBG("MAIN: Initializing"); - clist_init(&main_timer_list); + main_timer_cnt = 0; clist_init(&main_file_list); clist_init(&main_hook_list); + clist_init(&main_hook_done_list); clist_init(&main_process_list); main_file_cnt = 0; main_poll_table_obsolete = 1; @@ -63,15 +82,36 @@ timer_add(struct main_timer *tm, timestamp_t expires) DBG("MAIN: Setting timer %p (expire at now+%lld)", tm, (long long)(expires-main_now)); else DBG("MAIN: Clearing timer %p", tm); - if (tm->expires) - clist_remove(&tm->n); - tm->expires = expires; - if (expires) + if (tm->expires < expires) + { + if (!tm->expires) + { + tm->expires = expires; + tm->index = ++main_timer_cnt; + main_timer_table_grow(&main_timer_table, tm->index + 1); + main_timer_table.ptr[tm->index] = tm; + HEAP_INSERT(struct main_timer *, main_timer_table.ptr, main_timer_cnt, MAIN_TIMER_LESS, MAIN_TIMER_SWAP); + } + else + { + tm->expires = expires; + HEAP_INCREASE(struct main_timer *, main_timer_table.ptr, main_timer_cnt, MAIN_TIMER_LESS, MAIN_TIMER_SWAP, tm->index); + } + } + else if (tm->expires > expires) { - cnode *t = main_timer_list.head.next; - while (t != &main_timer_list.head && ((struct main_timer *) t)->expires < expires) - t = t->next; - clist_insert_before(&tm->n, t); + if (!expires) + { + ASSERT(tm->index && tm->index <= main_timer_cnt); + HEAP_DELETE(struct main_timer *, main_timer_table.ptr, main_timer_cnt, MAIN_TIMER_LESS, MAIN_TIMER_SWAP, tm->index); + tm->index = 0; + tm->expires = 0; + } + else + { + tm->expires = expires; + HEAP_DECREASE(struct main_timer *, main_timer_table.ptr, main_timer_cnt, MAIN_TIMER_LESS, MAIN_TIMER_SWAP, tm->index); + } } } @@ -247,11 +287,47 @@ hook_del(struct main_hook *ho) ho->n.next = ho->n.prev = NULL; } +#ifdef USE_SELF_PIPE +static void +main_sigchld_handler(int x UNUSED) +{ + int old_errno = errno; + DBG("SIGCHLD received"); + chld_received = 1; + ssize_t result; + while((result = write(sig_pipe_send, "c", 1)) == -1 && errno == EINTR); + if(result == -1 && errno != EAGAIN) + msg(L_SIGHANDLER|L_ERROR, "Could not write to self-pipe: %m"); + errno = old_errno; +} + +static int +dummy_read_handler(struct main_file *mp) +{ + char buffer[1024]; + ssize_t result = read(mp->fd, buffer, 1024); + if(result == -1 && errno != EAGAIN) + msg(L_ERROR, "Could not read from selfpipe: %m"); + file_chg(mp); + return result == 1024; +} + +static void +pipe_configure(int fd) +{ + int flags; + if((flags = fcntl(fd, F_GETFL)) == -1 || fcntl(fd, F_SETFL, flags|O_NONBLOCK)) + die("Could not set file descriptor %d to non-blocking: %m", fd); + if((flags = fcntl(fd, F_GETFD)) == -1 || fcntl(fd, F_SETFD, flags|O_CLOEXEC)) + die("Could not set file descriptor %d to close-on-exec: %m", fd); +} +#else static void main_sigchld_handler(int x UNUSED) { DBG("SIGCHLD received"); } +#endif void process_add(struct main_process *mp) @@ -262,12 +338,28 @@ process_add(struct main_process *mp) clist_add_tail(&main_process_list, &mp->n); if (!main_sigchld_set_up) { +#ifdef USE_SELF_PIPE + int pipe_result[2]; + if(pipe(pipe_result) == -1) + die("Could not create selfpipe:%m"); + pipe_configure(pipe_result[0]); + pipe_configure(pipe_result[1]); + sig_pipe_recv = pipe_result[0]; + sig_pipe_send = pipe_result[1]; + static struct main_file self_pipe; + self_pipe = (struct main_file) { + .fd = sig_pipe_recv, + .read_handler = dummy_read_handler + }; + file_add(&self_pipe); +#endif struct sigaction sa; bzero(&sa, sizeof(sa)); sa.sa_handler = main_sigchld_handler; sa.sa_flags = SA_NOCLDSTOP | SA_RESTART; sigaction(SIGCHLD, &sa, NULL); main_sigchld_set_up = 1; + chld_received = 1; // The signal may have come before the handler } } @@ -309,9 +401,11 @@ main_debug(void) #ifdef CONFIG_DEBUG msg(L_DEBUG, "### Main loop status on %lld", (long long)main_now); msg(L_DEBUG, "\tActive timers:"); - struct main_timer *tm; - CLIST_WALK(tm, main_timer_list) - msg(L_DEBUG, "\t\t%p (expires %lld, data %p)", tm, (long long)(tm->expires ? tm->expires-main_now : 999999), tm->data); + for (uns i = 1; i <= main_timer_cnt; i++) + { + struct main_timer *tm = main_timer_table.ptr[i]; + msg(L_DEBUG, "\t\t%p (expires %lld, data %p)", tm, (long long)(tm->expires ? tm->expires-main_now : 999999), tm->data); + } struct main_file *fi; msg(L_DEBUG, "\tActive files:"); CLIST_WALK(fi, main_file_list) @@ -320,6 +414,8 @@ main_debug(void) (long long)(fi->timer.expires ? fi->timer.expires-main_now : 999999), fi->data); msg(L_DEBUG, "\tActive hooks:"); struct main_hook *ho; + CLIST_WALK(ho, main_hook_done_list) + msg(L_DEBUG, "\t\t%p (func %p, data %p)", ho, ho->handler, ho->data); CLIST_WALK(ho, main_hook_list) msg(L_DEBUG, "\t\t%p (func %p, data %p)", ho, ho->handler, ho->data); msg(L_DEBUG, "\tActive processes:"); @@ -358,32 +454,33 @@ void main_loop(void) { DBG("MAIN: Entering main_loop"); - ASSERT(main_timer_list.head.next); + ASSERT(main_hook_list.head.next); struct main_file *fi; struct main_hook *ho; struct main_timer *tm; struct main_process *pr; - cnode *tmp; + main_get_time(); for (;;) { - main_get_time(); timestamp_t wake = main_now + 1000000000; - while ((tm = clist_head(&main_timer_list)) && tm->expires <= main_now) + while (main_timer_cnt && (tm = main_timer_table.ptr[1])->expires <= main_now) { DBG("MAIN: Timer %p expired at now-%lld", tm, (long long)(main_now - tm->expires)); tm->handler(tm); } int hook_min = HOOK_RETRY; int hook_max = HOOK_SHUTDOWN; - CLIST_WALK_DELSAFE(ho, main_hook_list, tmp) + while (ho = clist_remove_head(&main_hook_list)) { + clist_add_tail(&main_hook_done_list, &ho->n); DBG("MAIN: Hook %p", ho); int ret = ho->handler(ho); hook_min = MIN(hook_min, ret); hook_max = MAX(hook_max, ret); } + clist_move(&main_hook_list, &main_hook_done_list); if (hook_min == HOOK_SHUTDOWN || hook_min == HOOK_DONE && hook_max == HOOK_DONE || main_shutdown) @@ -395,11 +492,16 @@ main_loop(void) wake = 0; if (main_poll_table_obsolete) main_rebuild_poll_table(); - if (!clist_empty(&main_process_list)) +#ifndef USE_SELF_PIPE + // We don't have a reliable flag without the self-pipe. + chld_received = 1; +#endif + if (chld_received && !clist_empty(&main_process_list)) { int stat; pid_t pid; wake = MIN(wake, main_now + 10000); + chld_received = 0; while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) { DBG("MAIN: Child %d exited with status %x", pid, stat); @@ -416,15 +518,18 @@ main_loop(void) wake = 0; } } - /* FIXME: Here is a small race window where SIGCHLD can come unnoticed. */ - if ((tm = clist_head(&main_timer_list)) && tm->expires < wake) + if (main_timer_cnt && (tm = main_timer_table.ptr[1])->expires < wake) wake = tm->expires; - int timeout = (wake ? wake - main_now : 0); + main_get_time(); + int timeout = ((wake > main_now) ? wake - main_now : 0); DBG("MAIN: Poll for %d fds and timeout %d ms", main_file_cnt, timeout); - if (poll(main_poll_table, main_file_cnt, timeout)) + int p = poll(main_poll_table, main_file_cnt, timeout); + timestamp_t old_now = main_now; + main_get_time(); + main_idle_time += main_now - old_now; + if (p > 0) { struct pollfd *p = main_poll_table; - main_get_time(); CLIST_WALK(fi, main_file_list) { if (p->revents & (POLLIN | POLLHUP | POLLERR)) @@ -462,50 +567,50 @@ static void dread(struct main_file *fi) { if (fi->rpos < fi->rlen) { - log(L_INFO, "Read EOF"); + msg(L_INFO, "Read EOF"); file_del(fi); } else { - log(L_INFO, "Read done"); + msg(L_INFO, "Read done"); file_read(fi, rb, sizeof(rb)); } } static void derror(struct main_file *fi, int cause) { - log(L_INFO, "Error: %m !!! (cause %d)", cause); + msg(L_INFO, "Error: %m !!! (cause %d)", cause); file_del(fi); } static void dwrite(struct main_file *fi UNUSED) { - log(L_INFO, "Write done"); + msg(L_INFO, "Write done"); } static int dhook(struct main_hook *ho UNUSED) { - log(L_INFO, "Hook called"); + msg(L_INFO, "Hook called"); return 0; } static void dtimer(struct main_timer *tm) { - log(L_INFO, "Timer tick"); + msg(L_INFO, "Timer tick"); timer_add(tm, main_now + 10000); } static void dentry(void) { - log(L_INFO, "*** SUBPROCESS START ***"); + msg(L_INFO, "*** SUBPROCESS START ***"); sleep(2); - log(L_INFO, "*** SUBPROCESS FINISH ***"); + msg(L_INFO, "*** SUBPROCESS FINISH ***"); exit(0); } static void dexit(struct main_process *pr) { - log(L_INFO, "Subprocess %d exited with status %x", pr->pid, pr->status); + msg(L_INFO, "Subprocess %d exited with status %x", pr->pid, pr->status); } int @@ -539,7 +644,7 @@ main(void) main_debug(); main_loop(); - log(L_INFO, "Finished."); + msg(L_INFO, "Finished."); } #endif