X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=ucw%2Fmainloop.c;h=b2b07c5082476bedbdc023e094f49f6feb02a28b;hb=6efdc514c193f18c9ef840096750c37e78a01bf6;hp=d6d9c367a495af46f48f724a4b40118390789fb2;hpb=066065620026b8a99e77fc82a6009fd2adea86d7;p=libucw.git diff --git a/ucw/mainloop.c b/ucw/mainloop.c index d6d9c367..b2b07c50 100644 --- a/ucw/mainloop.c +++ b/ucw/mainloop.c @@ -1,7 +1,7 @@ /* * UCW Library -- Main Loop * - * (c) 2004--2006 Martin Mares + * (c) 2004--2012 Martin Mares * * This software may be freely distributed and used according to the terms * of the GNU Lesser General Public License. @@ -9,11 +9,17 @@ #undef LOCAL_DEBUG -#include "ucw/lib.h" -#include "ucw/mainloop.h" +#include +#include +#include +#include +#include +#include +#include #include #include +#include #include #include #include @@ -23,312 +29,407 @@ #include #include -timestamp_t main_now; -ucw_time_t main_now_seconds; -timestamp_t main_idle_time; -uns main_shutdown; - -clist main_timer_list, main_file_list, main_hook_list, main_process_list; -static uns main_file_cnt; -static uns main_poll_table_obsolete, main_poll_table_size; -static struct pollfd *main_poll_table; -static uns main_sigchld_set_up; -static volatile sig_atomic_t chld_received = 0; - -#ifdef O_CLOEXEC -// On recent Linux systems, O_CLOEXEC flag is available and we can get around -// the race condition of poll(). -#define USE_SELF_PIPE -static int sig_pipe_recv, sig_pipe_send; +#ifdef CONFIG_UCW_THREADS +#include +#define THREAD_SIGMASK pthread_sigmask +#else +#define THREAD_SIGMASK sigprocmask #endif -void -main_get_time(void) +#ifdef CONFIG_UCW_EPOLL +#include +#endif + +#define MAIN_TIMER_LESS(x,y) ((x)->expires < (y)->expires) +#define MAIN_TIMER_SWAP(heap,a,b,t) (t=heap[a], heap[a]=heap[b], heap[b]=t, heap[a]->index=(a), heap[b]->index=(b)) + +#define EPOLL_BUF_SIZE 256 + +static void file_del_ctx(struct main_context *m, struct main_file *fi); +static void signal_del_ctx(struct main_context *m, struct main_signal *ms); + +static void +main_get_time_ctx(struct main_context *m) { - struct timeval tv; - gettimeofday(&tv, NULL); - main_now_seconds = tv.tv_sec; - main_now = (timestamp_t)tv.tv_sec * 1000 + tv.tv_usec / 1000; - // DBG("It's %lld o'clock", (long long) main_now); + m->now = get_timestamp(); } -void -main_init(void) +static struct main_context * +main_current_nocheck(void) { - DBG("MAIN: Initializing"); - clist_init(&main_timer_list); - clist_init(&main_file_list); - clist_init(&main_hook_list); - clist_init(&main_process_list); - main_file_cnt = 0; - main_poll_table_obsolete = 1; - main_get_time(); + return ucwlib_thread_context()->main_context; } -void -timer_add(struct main_timer *tm, timestamp_t expires) +struct main_context * +main_current(void) { - if (expires) - DBG("MAIN: Setting timer %p (expire at now+%lld)", tm, (long long)(expires-main_now)); + struct main_context *m = main_current_nocheck(); + ASSERT(m); + return m; +} + +static int +main_is_current(struct main_context *m) +{ + return (m == main_current_nocheck()); +} + +static inline uns +count_timers(struct main_context *m) +{ + if (m->timer_table) + return GARY_SIZE(m->timer_table) - 1; else - DBG("MAIN: Clearing timer %p", tm); - if (tm->expires) - clist_remove(&tm->n); - tm->expires = expires; - if (expires) + return 0; +} + +struct main_context * +main_new(void) +{ + struct main_context *m = xmalloc_zero(sizeof(*m)); + + DBG("MAIN: New context"); + clist_init(&m->file_list); + clist_init(&m->file_active_list); + clist_init(&m->hook_list); + clist_init(&m->hook_done_list); + clist_init(&m->process_list); + clist_init(&m->signal_list); +#ifdef CONFIG_UCW_EPOLL + m->epoll_fd = epoll_create(64); + if (m->epoll_fd < 0) + die("epoll_create() failed: %m"); + m->epoll_events = xmalloc(EPOLL_BUF_SIZE * sizeof(struct epoll_event)); + clist_init(&m->file_recalc_list); +#else + m->poll_table_obsolete = 1; +#endif + main_get_time_ctx(m); + sigemptyset(&m->want_signals); + m->sig_pipe_recv = m->sig_pipe_send = -1; + + return m; +} + +static void +main_prepare_delete(struct main_context *m) +{ + /* + * If the context is current, deactivate it first. But beware, + * we must not call functions that depend on the current context. + */ + if (main_is_current(m)) + main_switch_context(NULL); + + // Close epoll descriptor early enough, it might be shared after fork! +#ifdef CONFIG_UCW_EPOLL + xfree(m->epoll_events); + close(m->epoll_fd); + m->epoll_fd = -1; +#else + GARY_FREE(m->poll_table); + GARY_FREE(m->poll_file_table); +#endif + + if (m->sigchld_handler) + { + signal_del_ctx(m, m->sigchld_handler); + xfree(m->sigchld_handler); + } + if (m->sig_pipe_file) + { + file_del_ctx(m, m->sig_pipe_file); + xfree(m->sig_pipe_file); + } + if (m->sig_pipe_recv >= 0) { - cnode *t = main_timer_list.head.next; - while (t != &main_timer_list.head && ((struct main_timer *) t)->expires < expires) - t = t->next; - clist_insert_before(&tm->n, t); + close(m->sig_pipe_recv); + close(m->sig_pipe_send); } } -void -timer_del(struct main_timer *tm) +static void +main_do_delete(struct main_context *m) { - timer_add(tm, 0); + GARY_FREE(m->timer_table); + xfree(m); } -static void -file_timer_expired(struct main_timer *tm) +void +main_delete(struct main_context *m) { - struct main_file *fi = tm->data; - timer_del(&fi->timer); - if (fi->error_handler) - fi->error_handler(fi, MFERR_TIMEOUT); + if (!m) + return; + + main_prepare_delete(m); + ASSERT(clist_empty(&m->file_list)); + ASSERT(clist_empty(&m->file_active_list)); +#ifdef CONFIG_UCW_EPOLL + ASSERT(clist_empty(&m->file_recalc_list)); +#endif + ASSERT(clist_empty(&m->hook_list)); + ASSERT(clist_empty(&m->hook_done_list)); + ASSERT(clist_empty(&m->process_list)); + ASSERT(clist_empty(&m->signal_list)); + ASSERT(!count_timers(m)); + main_do_delete(m); } void -file_add(struct main_file *fi) +main_destroy(struct main_context *m) { - DBG("MAIN: Adding file %p (fd=%d)", fi, fi->fd); - ASSERT(!fi->n.next); - clist_add_tail(&main_file_list, &fi->n); - fi->timer.handler = file_timer_expired; - fi->timer.data = fi; - main_file_cnt++; - main_poll_table_obsolete = 1; - if (fcntl(fi->fd, F_SETFL, O_NONBLOCK) < 0) - msg(L_ERROR, "Error setting fd %d to non-blocking mode: %m. Keep fingers crossed.", fi->fd); + if (!m) + return; + main_prepare_delete(m); + + // Close all files + clist_insert_list_after(&m->file_active_list, m->file_list.head.prev); +#ifdef CONFIG_UCW_EPOLL + clist_insert_list_after(&m->file_recalc_list, m->file_list.head.prev); +#endif + CLIST_FOR_EACH(struct main_file *, f, m->file_list) + close(f->fd); + + main_do_delete(m); } -void -file_chg(struct main_file *fi) +struct main_context * +main_switch_context(struct main_context *m) { - struct pollfd *p = fi->pollfd; - if (p) - { - p->events = 0; - if (fi->read_handler) - p->events |= POLLIN | POLLHUP | POLLERR; - if (fi->write_handler) - p->events |= POLLOUT | POLLERR; - } + struct ucwlib_context *c = ucwlib_thread_context(); + struct main_context *m0 = c->main_context; + + /* + * Not only we need to switch the signal sets of the two contexts, + * but it is also necessary to avoid invoking a signal handler + * in the middle of changing c->main_context. + */ + if (m0 && !clist_empty(&m0->signal_list)) + THREAD_SIGMASK(SIG_BLOCK, &m0->want_signals, NULL); + c->main_context = m; + if (m && !clist_empty(&m->signal_list)) + THREAD_SIGMASK(SIG_UNBLOCK, &m->want_signals, NULL); + + return m0; } void -file_del(struct main_file *fi) +main_init(void) { - DBG("MAIN: Deleting file %p (fd=%d)", fi, fi->fd); - ASSERT(fi->n.next); - timer_del(&fi->timer); - clist_remove(&fi->n); - main_file_cnt--; - main_poll_table_obsolete = 1; - fi->n.next = fi->n.prev = NULL; + struct main_context *m = main_switch_context(main_new()); + ASSERT(!m); } -static int -file_read_handler(struct main_file *fi) +void +main_cleanup(void) { - while (fi->rpos < fi->rlen) - { - int l = read(fi->fd, fi->rbuf + fi->rpos, fi->rlen - fi->rpos); - DBG("MAIN: FD %d: read %d", fi->fd, l); - if (l < 0) - { - if (errno != EINTR && errno != EAGAIN && fi->error_handler) - fi->error_handler(fi, MFERR_READ); - return 0; - } - else if (!l) - break; - fi->rpos += l; - } - DBG("MAIN: FD %d done read %d of %d", fi->fd, fi->rpos, fi->rlen); - fi->read_handler = NULL; - file_chg(fi); - fi->read_done(fi); - return 1; + main_delete(main_current_nocheck()); } -static int -file_write_handler(struct main_file *fi) +void +main_teardown(void) { - while (fi->wpos < fi->wlen) - { - int l = write(fi->fd, fi->wbuf + fi->wpos, fi->wlen - fi->wpos); - DBG("MAIN: FD %d: write %d", fi->fd, l); - if (l < 0) - { - if (errno != EINTR && errno != EAGAIN && fi->error_handler) - fi->error_handler(fi, MFERR_WRITE); - return 0; - } - fi->wpos += l; - } - DBG("MAIN: FD %d done write %d", fi->fd, fi->wpos); - fi->write_handler = NULL; - file_chg(fi); - fi->write_done(fi); - return 1; + main_destroy(main_current_nocheck()); } void -file_read(struct main_file *fi, void *buf, uns len) +main_get_time(void) { - ASSERT(fi->n.next); - if (len) - { - fi->read_handler = file_read_handler; - fi->rbuf = buf; - fi->rpos = 0; - fi->rlen = len; - } - else - { - fi->read_handler = NULL; - fi->rbuf = NULL; - fi->rpos = fi->rlen = 0; - } - file_chg(fi); + main_get_time_ctx(main_current()); } void -file_write(struct main_file *fi, void *buf, uns len) +timer_add(struct main_timer *tm, timestamp_t expires) { - ASSERT(fi->n.next); - if (len) + struct main_context *m = main_current(); + + if (!m->timer_table) { - fi->write_handler = file_write_handler; - fi->wbuf = buf; - fi->wpos = 0; - fi->wlen = len; + GARY_INIT(m->timer_table, 1); + m->timer_table[0] = NULL; } + + if (expires) + DBG("MAIN: Setting timer %p (expire at now+%lld)", tm, (long long)(expires - m->now)); else + DBG("MAIN: Clearing timer %p", tm); + uns num_timers = count_timers(m); + if (tm->expires < expires) { - fi->write_handler = NULL; - fi->wbuf = NULL; - fi->wpos = fi->wlen = 0; + if (!tm->expires) + { + tm->expires = expires; + tm->index = num_timers + 1; + *GARY_PUSH(m->timer_table, 1) = tm; + HEAP_INSERT(struct main_timer *, m->timer_table, tm->index, MAIN_TIMER_LESS, MAIN_TIMER_SWAP); + } + else + { + tm->expires = expires; + HEAP_INCREASE(struct main_timer *, m->timer_table, num_timers, MAIN_TIMER_LESS, MAIN_TIMER_SWAP, tm->index); + } + } + else if (tm->expires > expires) + { + if (!expires) + { + ASSERT(tm->index && tm->index <= num_timers); + HEAP_DELETE(struct main_timer *, m->timer_table, num_timers, MAIN_TIMER_LESS, MAIN_TIMER_SWAP, tm->index); + tm->index = 0; + tm->expires = 0; + GARY_POP(m->timer_table, 1); + } + else + { + tm->expires = expires; + HEAP_DECREASE(struct main_timer *, m->timer_table, num_timers, MAIN_TIMER_LESS, MAIN_TIMER_SWAP, tm->index); + } } - file_chg(fi); } void -file_set_timeout(struct main_file *fi, timestamp_t expires) +timer_add_rel(struct main_timer *tm, timestamp_t expires_delta) { - ASSERT(fi->n.next); - timer_add(&fi->timer, expires); + struct main_context *m = main_current(); + return timer_add(tm, m->now + expires_delta); } void -file_close_all(void) +timer_del(struct main_timer *tm) { - CLIST_FOR_EACH(struct main_file *, f, main_file_list) - close(f->fd); + timer_add(tm, 0); +} + +static uns +file_want_events(struct main_file *fi) +{ + uns events = 0; + if (fi->read_handler) + events |= POLLIN; + if (fi->write_handler) + events |= POLLOUT; + return events; } void -hook_add(struct main_hook *ho) +file_add(struct main_file *fi) { - DBG("MAIN: Adding hook %p", ho); - ASSERT(!ho->n.next); - clist_add_tail(&main_hook_list, &ho->n); + struct main_context *m = main_current(); + + DBG("MAIN: Adding file %p (fd=%d)", fi, fi->fd); + ASSERT(!file_is_active(fi)); + clist_add_tail(&m->file_list, &fi->n); + m->file_cnt++; +#ifdef CONFIG_UCW_EPOLL + struct epoll_event evt = { + .events = file_want_events(fi), + .data.ptr = fi, + }; + if (epoll_ctl(m->epoll_fd, EPOLL_CTL_ADD, fi->fd, &evt) < 0) + die("epoll_ctl() failed: %m"); + fi->last_want_events = evt.events; +#else + m->poll_table_obsolete = 1; +#endif + if (fcntl(fi->fd, F_SETFL, O_NONBLOCK) < 0) + msg(L_ERROR, "Error setting fd %d to non-blocking mode: %m. Keep fingers crossed.", fi->fd); } void -hook_del(struct main_hook *ho) +file_chg(struct main_file *fi) { - DBG("MAIN: Deleting hook %p", ho); - ASSERT(ho->n.next); - clist_remove(&ho->n); - ho->n.next = ho->n.prev = NULL; +#ifdef CONFIG_UCW_EPOLL + clist_remove(&fi->n); + clist_add_tail(&main_current()->file_recalc_list, &fi->n); +#else + struct pollfd *p = fi->pollfd; + if (p) + p->events = file_want_events(fi); +#endif } -#ifdef USE_SELF_PIPE static void -main_sigchld_handler(int x UNUSED) +file_del_ctx(struct main_context *m, struct main_file *fi) { - int old_errno = errno; - DBG("SIGCHLD received"); - chld_received = 1; - ssize_t result; - while((result = write(sig_pipe_send, "c", 1)) == -1 && errno == EINTR); - if(result == -1 && errno != EAGAIN) - msg(L_SIGHANDLER|L_ERROR, "Could not write to self-pipe: %m"); - errno = old_errno; + // XXX: Can be called on a non-current context + DBG("MAIN: Deleting file %p (fd=%d)", fi, fi->fd); + + if (!file_is_active(fi)) + return; + clist_unlink(&fi->n); + m->file_cnt--; +#ifdef CONFIG_UCW_EPOLL + if (m->epoll_fd >= 0 && epoll_ctl(m->epoll_fd, EPOLL_CTL_DEL, fi->fd, NULL) < 0) + die("epoll_ctl() failed: %m"); +#else + m->poll_table_obsolete = 1; +#endif } -static int -dummy_read_handler(struct main_file *mp) +void +file_del(struct main_file *fi) { - char buffer[1024]; - ssize_t result = read(mp->fd, buffer, 1024); - if(result == -1 && errno != EAGAIN) - msg(L_ERROR, "Could not read from selfpipe: %m"); - file_chg(mp); - return result == 1024; + file_del_ctx(main_current(), fi); } -static void -pipe_configure(int fd) +void +hook_add(struct main_hook *ho) { - int flags; - if((flags = fcntl(fd, F_GETFL)) == -1 || fcntl(fd, F_SETFL, flags|O_NONBLOCK)) - die("Could not set file descriptor %d to non-blocking: %m", fd); - if((flags = fcntl(fd, F_GETFD)) == -1 || fcntl(fd, F_SETFD, flags|O_CLOEXEC)) - die("Could not set file descriptor %d to close-on-exec: %m", fd); + struct main_context *m = main_current(); + + DBG("MAIN: Adding hook %p", ho); + if (hook_is_active(ho)) + clist_unlink(&ho->n); + clist_add_tail(&m->hook_list, &ho->n); } -#else + +void +hook_del(struct main_hook *ho) +{ + DBG("MAIN: Deleting hook %p", ho); + if (hook_is_active(ho)) + clist_unlink(&ho->n); +} + static void -main_sigchld_handler(int x UNUSED) +sigchld_received(struct main_signal *sg UNUSED) { - DBG("SIGCHLD received"); + struct main_context *m = main_current(); + int stat; + pid_t pid; + + while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) + { + DBG("MAIN: Child %d exited with status %x", pid, stat); + CLIST_FOR_EACH(struct main_process *, pr, m->process_list) + if (pr->pid == pid) + { + pr->status = stat; + process_del(pr); + format_exit_status(pr->status_msg, pr->status); + DBG("MAIN: Calling process exit handler"); + pr->handler(pr); + break; + } + } } -#endif void process_add(struct main_process *mp) { + struct main_context *m = main_current(); + DBG("MAIN: Adding process %p (pid=%d)", mp, mp->pid); - ASSERT(!mp->n.next); + ASSERT(!process_is_active(mp)); ASSERT(mp->handler); - clist_add_tail(&main_process_list, &mp->n); - if (!main_sigchld_set_up) + clist_add_tail(&m->process_list, &mp->n); + if (!m->sigchld_handler) { -#ifdef USE_SELF_PIPE - int pipe_result[2]; - if(pipe(pipe_result) == -1) - die("Could not create selfpipe:%m"); - pipe_configure(pipe_result[0]); - pipe_configure(pipe_result[1]); - sig_pipe_recv = pipe_result[0]; - sig_pipe_send = pipe_result[1]; - static struct main_file self_pipe; - self_pipe = (struct main_file) { - .fd = sig_pipe_recv, - .read_handler = dummy_read_handler - }; - file_add(&self_pipe); -#endif - struct sigaction sa; - bzero(&sa, sizeof(sa)); - sa.sa_handler = main_sigchld_handler; - sa.sa_flags = SA_NOCLDSTOP | SA_RESTART; - sigaction(SIGCHLD, &sa, NULL); - main_sigchld_set_up = 1; - chld_received = 1; // The signal may have come before the handler + struct main_signal *sg = xmalloc_zero(sizeof(*sg)); + m->sigchld_handler = sg; + sg->signum = SIGCHLD; + sg->handler = sigchld_received; + signal_add(sg); } } @@ -336,9 +437,8 @@ void process_del(struct main_process *mp) { DBG("MAIN: Deleting process %p (pid=%d)", mp, mp->pid); - ASSERT(mp->n.next); - clist_remove(&mp->n); - mp->n.next = NULL; + if (process_is_active(mp)) + clist_unlink(&mp->n); } int @@ -364,251 +464,427 @@ process_fork(struct main_process *mp) } } -void -main_debug(void) +static int +pipe_read_handler(struct main_file *mf UNUSED) { -#ifdef CONFIG_DEBUG - msg(L_DEBUG, "### Main loop status on %lld", (long long)main_now); - msg(L_DEBUG, "\tActive timers:"); - struct main_timer *tm; - CLIST_WALK(tm, main_timer_list) - msg(L_DEBUG, "\t\t%p (expires %lld, data %p)", tm, (long long)(tm->expires ? tm->expires-main_now : 999999), tm->data); - struct main_file *fi; - msg(L_DEBUG, "\tActive files:"); - CLIST_WALK(fi, main_file_list) - msg(L_DEBUG, "\t\t%p (fd %d, rh %p, wh %p, eh %p, expires %lld, data %p)", - fi, fi->fd, fi->read_handler, fi->write_handler, fi->error_handler, - (long long)(fi->timer.expires ? fi->timer.expires-main_now : 999999), fi->data); - msg(L_DEBUG, "\tActive hooks:"); - struct main_hook *ho; - CLIST_WALK(ho, main_hook_list) - msg(L_DEBUG, "\t\t%p (func %p, data %p)", ho, ho->handler, ho->data); - msg(L_DEBUG, "\tActive processes:"); - struct main_process *pr; - CLIST_WALK(pr, main_process_list) - msg(L_DEBUG, "\t\t%p (pid %d, data %p)", pr, pr->pid, pr->data); -#endif -} + struct main_context *m = main_current(); + int signum; + int n = read(m->sig_pipe_recv, &signum, sizeof(signum)); -static void -main_rebuild_poll_table(void) -{ - struct main_file *fi; - if (main_poll_table_size < main_file_cnt) + if (n < 0) { - if (main_poll_table) - xfree(main_poll_table); - else - main_poll_table_size = 1; - while (main_poll_table_size < main_file_cnt) - main_poll_table_size *= 2; - main_poll_table = xmalloc(sizeof(struct pollfd) * main_poll_table_size); + if (errno != EAGAIN) + msg(L_ERROR, "Error reading signal pipe: %m"); + return 0; } - struct pollfd *p = main_poll_table; - DBG("MAIN: Rebuilding poll table: %d of %d entries set", main_file_cnt, main_poll_table_size); - CLIST_WALK(fi, main_file_list) + ASSERT(n == sizeof(signum)); + + DBG("MAIN: Sigpipe: received signal %d", signum); + struct main_signal iter = { .signum = -1 }; + struct main_signal *sg = clist_head(&m->signal_list); + while (sg) { - p->fd = fi->fd; - fi->pollfd = p++; - file_chg(fi); + if (sg->signum == signum) + { + DBG("MAIN: Sigpipe: invoking handler %p", sg); + clist_insert_after(&iter.n, &sg->n); + sg->handler(sg); + sg = clist_next(&m->signal_list, &iter.n); + clist_remove(&iter.n); + } + else + sg = clist_next(&m->signal_list, &sg->n); } - main_poll_table_obsolete = 0; + + return 1; +} + +static void +pipe_configure(int fd) +{ + int flags; + if ((flags = fcntl(fd, F_GETFL)) < 0 || fcntl(fd, F_SETFL, flags|O_NONBLOCK) < 0) + die("Could not set file descriptor %d to non-blocking: %m", fd); +} + +static void +pipe_setup(struct main_context *m) +{ + DBG("MAIN: Sigpipe: Setting up the pipe"); + + int pipe_result[2]; + if (pipe(pipe_result) == -1) + die("Could not create signal pipe: %m"); + pipe_configure(pipe_result[0]); + pipe_configure(pipe_result[1]); + m->sig_pipe_recv = pipe_result[0]; + m->sig_pipe_send = pipe_result[1]; + + struct main_file *f = xmalloc_zero(sizeof(*f)); + m->sig_pipe_file = f; + f->fd = m->sig_pipe_recv; + f->read_handler = pipe_read_handler; + file_add(f); +} + +static void +signal_handler_pipe(int signum) +{ + struct main_context *m = main_current(); +#ifdef LOCAL_DEBUG + msg(L_DEBUG | L_SIGHANDLER, "MAIN: Sigpipe: sending signal %d down the drain", signum); +#endif + write(m->sig_pipe_send, &signum, sizeof(signum)); } void -main_loop(void) +signal_add(struct main_signal *ms) { - DBG("MAIN: Entering main_loop"); - ASSERT(main_timer_list.head.next); + struct main_context *m = main_current(); - struct main_file *fi; - struct main_hook *ho; - struct main_timer *tm; - struct main_process *pr; - cnode *tmp; + DBG("MAIN: Adding signal %p (sig=%d)", ms, ms->signum); - main_get_time(); - for (;;) + ASSERT(!signal_is_active(ms)); + // Adding at the head of the list is better if we are in the middle of walking the list. + clist_add_head(&m->signal_list, &ms->n); + if (m->sig_pipe_recv < 0) + pipe_setup(m); + + struct sigaction sa = { + .sa_handler = signal_handler_pipe, + .sa_flags = SA_NOCLDSTOP | SA_RESTART, + }; + sigaction(ms->signum, &sa, NULL); + + sigset_t ss; + sigemptyset(&ss); + sigaddset(&ss, ms->signum); + THREAD_SIGMASK(SIG_UNBLOCK, &ss, NULL); + sigaddset(&m->want_signals, ms->signum); +} + +static void +signal_del_ctx(struct main_context *m, struct main_signal *ms) +{ + // XXX: Can be called on a non-current context + DBG("MAIN: Deleting signal %p (sig=%d)", ms, ms->signum); + + if (!signal_is_active(ms)) + return; + clist_unlink(&ms->n); + + int another = 0; + CLIST_FOR_EACH(struct main_signal *, s, m->signal_list) + if (s->signum == ms->signum) + another++; + if (!another) { - timestamp_t wake = main_now + 1000000000; - while ((tm = clist_head(&main_timer_list)) && tm->expires <= main_now) - { - DBG("MAIN: Timer %p expired at now-%lld", tm, (long long)(main_now - tm->expires)); - tm->handler(tm); - } - int hook_min = HOOK_RETRY; - int hook_max = HOOK_SHUTDOWN; - CLIST_WALK_DELSAFE(ho, main_hook_list, tmp) - { - DBG("MAIN: Hook %p", ho); - int ret = ho->handler(ho); - hook_min = MIN(hook_min, ret); - hook_max = MAX(hook_max, ret); - } - if (hook_min == HOOK_SHUTDOWN || - hook_min == HOOK_DONE && hook_max == HOOK_DONE || - main_shutdown) - { - DBG("MAIN: Shut down by %s", main_shutdown ? "main_shutdown" : "a hook"); - return; - } - if (hook_max == HOOK_RETRY) - wake = 0; - if (main_poll_table_obsolete) - main_rebuild_poll_table(); -#ifndef USE_SELF_PIPE - // We don't have a reliable flag without the self-pipe. - chld_received = 1; -#endif - if (chld_received && !clist_empty(&main_process_list)) - { - int stat; - pid_t pid; - wake = MIN(wake, main_now + 10000); - chld_received = 0; - while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) - { - DBG("MAIN: Child %d exited with status %x", pid, stat); - CLIST_WALK(pr, main_process_list) - if (pr->pid == pid) - { - pr->status = stat; - process_del(pr); - format_exit_status(pr->status_msg, pr->status); - DBG("MAIN: Calling process exit handler"); - pr->handler(pr); - break; - } - wake = 0; - } - } - if ((tm = clist_head(&main_timer_list)) && tm->expires < wake) - wake = tm->expires; - main_get_time(); - int timeout = (wake ? wake - main_now : 0); - DBG("MAIN: Poll for %d fds and timeout %d ms", main_file_cnt, timeout); - int p = poll(main_poll_table, main_file_cnt, timeout); - timestamp_t old_now = main_now; - main_get_time(); - main_idle_time += main_now - old_now; - if (p > 0) + if (main_is_current(m)) { - struct pollfd *p = main_poll_table; - CLIST_WALK(fi, main_file_list) - { - if (p->revents & (POLLIN | POLLHUP | POLLERR)) - { - do - DBG("MAIN: Read event on fd %d", p->fd); - while (fi->read_handler && fi->read_handler(fi) && !main_poll_table_obsolete); - if (main_poll_table_obsolete) /* File entries have been inserted or deleted => better not risk continuing to nowhere */ - break; - } - if (p->revents & (POLLOUT | POLLERR)) - { - do - DBG("MAIN: Write event on fd %d", p->fd); - while (fi->write_handler && fi->write_handler(fi) && !main_poll_table_obsolete); - if (main_poll_table_obsolete) - break; - } - p++; - } + sigset_t ss; + sigemptyset(&ss); + sigaddset(&ss, ms->signum); + THREAD_SIGMASK(SIG_BLOCK, &ss, NULL); } + sigdelset(&m->want_signals, ms->signum); } } -#ifdef TEST +void +signal_del(struct main_signal *ms) +{ + signal_del_ctx(main_current(), ms); +} -static struct main_process mp; -static struct main_file fin, fout; -static struct main_hook hook; -static struct main_timer tm; +#ifdef CONFIG_UCW_DEBUG -static byte rb[16]; +void +file_debug(struct main_file *fi) +{ + msg(L_DEBUG, "\t\t%p (fd %d, rh %p, wh %p, data %p)", + fi, fi->fd, fi->read_handler, fi->write_handler, fi->data); +} -static void dread(struct main_file *fi) +void +hook_debug(struct main_hook *ho) { - if (fi->rpos < fi->rlen) - { - msg(L_INFO, "Read EOF"); - file_del(fi); - } - else - { - msg(L_INFO, "Read done"); - file_read(fi, rb, sizeof(rb)); - } + msg(L_DEBUG, "\t\t%p (func %p, data %p)", ho, ho->handler, ho->data); } -static void derror(struct main_file *fi, int cause) +void +signal_debug(struct main_signal *sg) { - msg(L_INFO, "Error: %m !!! (cause %d)", cause); - file_del(fi); + if (sg->signum < 0) + msg(L_DEBUG, "\t\t(placeholder)"); + else + msg(L_DEBUG, "\t\t%p (sig %d, func %p, data %p)", sg, sg->signum, sg->handler, sg->data); } -static void dwrite(struct main_file *fi UNUSED) +static void +timer_debug_ctx(struct main_context *m, struct main_timer *tm) { - msg(L_INFO, "Write done"); + msg(L_DEBUG, "\t\t%p (expires %lld, data %p)", tm, (long long)(tm->expires - m->now), tm->data); } -static int dhook(struct main_hook *ho UNUSED) +void +timer_debug(struct main_timer *tm) { - msg(L_INFO, "Hook called"); - return 0; + timer_debug_ctx(main_current(), tm); } -static void dtimer(struct main_timer *tm) +void +process_debug(struct main_process *pr) { - msg(L_INFO, "Timer tick"); - timer_add(tm, main_now + 10000); + msg(L_DEBUG, "\t\t%p (pid %d, func %p, data %p)", pr, pr->pid, pr->handler, pr->data); } -static void dentry(void) +void +main_debug_context(struct main_context *m UNUSED) { - msg(L_INFO, "*** SUBPROCESS START ***"); - sleep(2); - msg(L_INFO, "*** SUBPROCESS FINISH ***"); - exit(0); + msg(L_DEBUG, "### Main loop status on %lld", (long long) m->now); + msg(L_DEBUG, "\tActive timers:"); + uns num_timers = count_timers(m); + for (uns i = 1; i <= num_timers; i++) + timer_debug(m->timer_table[i]); + msg(L_DEBUG, "\tActive files:"); + CLIST_FOR_EACH(struct main_file *, fi, m->file_list) + file_debug(fi); + CLIST_FOR_EACH(struct main_file *, fi, m->file_active_list) + file_debug(fi); +#ifdef CONFIG_UCW_EPOLL + CLIST_FOR_EACH(struct main_file *, fi, m->file_recalc_list) + file_debug(fi); +#endif + msg(L_DEBUG, "\tActive hooks:"); + CLIST_FOR_EACH(struct main_hook *, ho, m->hook_done_list) + hook_debug(ho); + CLIST_FOR_EACH(struct main_hook *, ho, m->hook_list) + hook_debug(ho); + msg(L_DEBUG, "\tActive processes:"); + CLIST_FOR_EACH(struct main_process *, pr, m->process_list) + process_debug(pr); + msg(L_DEBUG, "\tActive signal catchers:"); + CLIST_FOR_EACH(struct main_signal *, sg, m->signal_list) + signal_debug(sg); } -static void dexit(struct main_process *pr) +#else + +// Stubs +void file_debug(struct main_file *fi UNUSED) { } +void hook_debug(struct main_hook *ho UNUSED) { } +void signal_debug(struct main_signal *sg UNUSED) { } +void timer_debug(struct main_timer *tm UNUSED) { } +void process_debug(struct main_process *pr UNUSED) { } +void main_debug_context(struct main_context *m UNUSED) { } + +#endif + +static void +process_timers(struct main_context *m) { - msg(L_INFO, "Subprocess %d exited with status %x", pr->pid, pr->status); + struct main_timer *tm; + while (count_timers(m) && (tm = m->timer_table[1])->expires <= m->now) + { + DBG("MAIN: Timer %p expired at now-%lld", tm, (long long)(m->now - tm->expires)); + tm->handler(tm); + } } -int -main(void) +static enum main_hook_return +process_hooks(struct main_context *m) { - log_init(NULL); - main_init(); + int hook_min = HOOK_RETRY; + int hook_max = HOOK_SHUTDOWN; + struct main_hook *ho; - fin.fd = 0; - fin.read_done = dread; - fin.error_handler = derror; - file_add(&fin); - file_read(&fin, rb, sizeof(rb)); + while (ho = clist_remove_head(&m->hook_list)) + { + clist_add_tail(&m->hook_done_list, &ho->n); + DBG("MAIN: Hook %p", ho); + int ret = ho->handler(ho); + hook_min = MIN(hook_min, ret); + hook_max = MAX(hook_max, ret); + } + clist_move(&m->hook_list, &m->hook_done_list); + if (hook_min == HOOK_SHUTDOWN || + hook_min == HOOK_DONE && hook_max == HOOK_DONE || + m->shutdown) + { + DBG("MAIN: Shut down by %s", m->shutdown ? "main_shut_down" : "a hook"); + return HOOK_SHUTDOWN; + } + if (hook_max == HOOK_RETRY) + return HOOK_RETRY; + else + return HOOK_IDLE; +} - fout.fd = 1; - fout.write_done = dwrite; - fout.error_handler = derror; - file_add(&fout); - file_write(&fout, "Hello, world!\n", 14); +#ifdef CONFIG_UCW_EPOLL - hook.handler = dhook; - hook_add(&hook); +static void +recalc_files(struct main_context *m) +{ + struct main_file *fi; - tm.handler = dtimer; - timer_add(&tm, main_now + 1000); + while (fi = clist_remove_head(&m->file_recalc_list)) + { + struct epoll_event evt = { + .events = file_want_events(fi), + .data.ptr = fi, + }; + if (evt.events != fi->last_want_events) + { + DBG("MAIN: Changing requested events for fd %d to %x", fi->fd, evt.events); + fi->last_want_events = evt.events; + if (epoll_ctl(main_current()->epoll_fd, EPOLL_CTL_MOD, fi->fd, &evt) < 0) + die("epoll_ctl() failed: %m"); + } + clist_add_tail(&m->file_list, &fi->n); + } +} - mp.handler = dexit; - if (!process_fork(&mp)) - dentry(); +#else - main_debug(); +static void +rebuild_poll_table(struct main_context *m) +{ + GARY_INIT_OR_RESIZE(m->poll_table, m->file_cnt); + GARY_INIT_OR_RESIZE(m->poll_file_table, m->file_cnt); + DBG("MAIN: Rebuilding poll table: %d entries", m->file_cnt); - main_loop(); - msg(L_INFO, "Finished."); + struct pollfd *p = m->poll_table; + struct main_file **pf = m->poll_file_table; + CLIST_FOR_EACH(struct main_file *, fi, m->file_list) + { + p->fd = fi->fd; + p->events = file_want_events(fi); + fi->pollfd = p++; + *pf++ = fi; + } + m->poll_table_obsolete = 0; } #endif + +void +main_loop(void) +{ + DBG("MAIN: Entering main_loop"); + struct main_context *m = main_current(); + + main_get_time_ctx(m); + m->shutdown = 0; + + for (;;) + { + timestamp_t wake = m->now + 1000000000; + process_timers(m); + switch (process_hooks(m)) + { + case HOOK_SHUTDOWN: + return; + case HOOK_RETRY: + wake = 0; + break; + default: ; + } + + int timeout = 0; + if (!m->single_step) + { + if (count_timers(m)) + wake = MIN(wake, m->timer_table[1]->expires); + main_get_time_ctx(m); + timeout = ((wake > m->now) ? wake - m->now : 0); + } + +#ifdef CONFIG_UCW_EPOLL + recalc_files(m); + DBG("MAIN: Epoll for %d fds and timeout %d ms", m->file_cnt, timeout); + int n = epoll_wait(m->epoll_fd, m->epoll_events, EPOLL_BUF_SIZE, timeout); +#else + if (m->poll_table_obsolete) + rebuild_poll_table(m); + DBG("MAIN: Poll for %d fds and timeout %d ms", m->file_cnt, timeout); + int n = poll(m->poll_table, m->file_cnt, timeout); +#endif + + DBG("\t-> %d events", n); + if (n < 0 && errno != EAGAIN && errno != EINTR) + die("(e)poll failed: %m"); + timestamp_t old_now = m->now; + main_get_time_ctx(m); + m->idle_time += m->now - old_now; + + if (n <= 0) + { + if (m->single_step) + return; + else + continue; + } + + // Relink all files with a pending event to file_active_list +#ifdef CONFIG_UCW_EPOLL + for (int i=0; iepoll_events[i]; + struct main_file *fi = e->data.ptr; + clist_remove(&fi->n); + clist_add_tail(&m->file_active_list, &fi->n); + fi->events = e->events; + } +#else + struct pollfd *p = m->poll_table; + struct main_file **pf = m->poll_file_table; + for (uns i=0; i < m->file_cnt; i++) + if (p[i].revents) + { + struct main_file *fi = pf[i]; + clist_remove(&fi->n); + clist_add_tail(&m->file_active_list, &fi->n); + fi->events = p[i].revents; + } +#endif + + /* + * Process the buffered file events. This is pretty tricky, since + * user callbacks can modify the file structure or even destroy it. + * In such cases, we detect that the structure was relinked and stop + * processing its events, leaving them for the next iteration of the + * main loop. + */ + struct main_file *fi; + while (fi = clist_head(&m->file_active_list)) + { + if (fi->read_handler && (fi->events & (POLLIN | POLLHUP))) + { + fi->events &= ~(POLLIN | POLLHUP); + do + DBG("MAIN: Read event on fd %d", fi->fd); + while (fi->read_handler && fi->read_handler(fi)); + continue; + } + if (fi->write_handler && (fi->events & (POLLOUT | POLLHUP | POLLERR))) + { + fi->events &= ~(POLLOUT | POLLHUP | POLLERR); + do + DBG("MAIN: Write event on fd %d", fi->fd); + while (fi->write_handler && fi->write_handler(fi)); + continue; + } + clist_remove(&fi->n); + clist_add_tail(&m->file_list, &fi->n); + } + } +} + +void +main_step(void) +{ + struct main_context *m = main_current(); + m->single_step = 1; + main_loop(); + m->single_step = 0; +}