X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=ucw%2Fmainloop.c;h=7eb8c8acdf35fe59861dc0fccb76a0a6ac0b9971;hb=0f73a9b90ec53017512c34f7dab56be3a50d87b1;hp=6bdf1e014203e160bd37fda173dbd15ecd0bcc07;hpb=a27367df4d44e08ed719f6450bce960ca4b9cccc;p=libucw.git diff --git a/ucw/mainloop.c b/ucw/mainloop.c index 6bdf1e01..7eb8c8ac 100644 --- a/ucw/mainloop.c +++ b/ucw/mainloop.c @@ -1,7 +1,7 @@ /* * UCW Library -- Main Loop * - * (c) 2004--2010 Martin Mares + * (c) 2004--2011 Martin Mares * * This software may be freely distributed and used according to the terms * of the GNU Lesser General Public License. @@ -34,11 +34,20 @@ #define THREAD_SIGMASK sigprocmask #endif +#ifdef CONFIG_UCW_EPOLL +#include +#endif + #define MAIN_TIMER_LESS(x,y) ((x)->expires < (y)->expires) #define MAIN_TIMER_SWAP(heap,a,b,t) (t=heap[a], heap[a]=heap[b], heap[b]=t, heap[a]->index=(a), heap[b]->index=(b)) +#define EPOLL_BUF_SIZE 256 + +static void file_del_ctx(struct main_context *m, struct main_file *fi); +static void signal_del_ctx(struct main_context *m, struct main_signal *ms); + static void -do_main_get_time(struct main_context *m) +main_get_time_ctx(struct main_context *m) { struct timeval tv; gettimeofday(&tv, NULL); @@ -46,6 +55,35 @@ do_main_get_time(struct main_context *m) m->now = (timestamp_t)tv.tv_sec * 1000 + tv.tv_usec / 1000; } +static struct main_context * +main_current_nocheck(void) +{ + return ucwlib_thread_context()->main_context; +} + +struct main_context * +main_current(void) +{ + struct main_context *m = main_current_nocheck(); + ASSERT(m); + return m; +} + +static int +main_is_current(struct main_context *m) +{ + return (m == main_current_nocheck()); +} + +static inline uns +count_timers(struct main_context *m) +{ + if (m->timer_table) + return GARY_SIZE(m->timer_table) - 1; + else + return 0; +} + struct main_context * main_new(void) { @@ -53,39 +91,107 @@ main_new(void) DBG("MAIN: New context"); clist_init(&m->file_list); + clist_init(&m->file_active_list); clist_init(&m->hook_list); clist_init(&m->hook_done_list); clist_init(&m->process_list); clist_init(&m->signal_list); +#ifdef CONFIG_UCW_EPOLL + m->epoll_fd = epoll_create(64); + if (m->epoll_fd < 0) + die("epoll_create() failed: %m"); + m->epoll_events = xmalloc(EPOLL_BUF_SIZE * sizeof(struct epoll_event *)); + clist_init(&m->file_recalc_list); +#else m->poll_table_obsolete = 1; - do_main_get_time(m); +#endif + main_get_time_ctx(m); sigemptyset(&m->want_signals); m->sig_pipe_recv = m->sig_pipe_send = -1; return m; } -void -main_delete(struct main_context *m) +static void +main_prepare_delete(struct main_context *m) { + /* + * If the context is current, deactivate it first. But beware, + * we must not call functions that depend on the current context. + */ + if (main_is_current(m)) + main_switch_context(NULL); + + // Close epoll descriptor early enough, it might be shared after fork! +#ifdef CONFIG_UCW_EPOLL + xfree(m->epoll_events); + close(m->epoll_fd); + m->epoll_fd = -1; +#else + GARY_FREE(m->poll_table); + GARY_FREE(m->poll_file_table); +#endif + if (m->sigchld_handler) - signal_del(m->sigchld_handler); + { + signal_del_ctx(m, m->sigchld_handler); + xfree(m->sigchld_handler); + } if (m->sig_pipe_file) - file_del(m->sig_pipe_file); + { + file_del_ctx(m, m->sig_pipe_file); + xfree(m->sig_pipe_file); + } if (m->sig_pipe_recv >= 0) { close(m->sig_pipe_recv); close(m->sig_pipe_send); } +} + +static void +main_do_delete(struct main_context *m) +{ + GARY_FREE(m->timer_table); + xfree(m); +} + +void +main_delete(struct main_context *m) +{ + if (!m) + return; + + main_prepare_delete(m); ASSERT(clist_empty(&m->file_list)); + ASSERT(clist_empty(&m->file_active_list)); +#ifdef CONFIG_UCW_EPOLL + ASSERT(clist_empty(&m->file_recalc_list)); +#endif ASSERT(clist_empty(&m->hook_list)); ASSERT(clist_empty(&m->hook_done_list)); ASSERT(clist_empty(&m->process_list)); ASSERT(clist_empty(&m->signal_list)); - GARY_FREE(m->timer_table); - GARY_FREE(m->poll_table); - xfree(m); - // FIXME: Some mechanism for cleaning up after fork() + ASSERT(!count_timers(m)); + main_do_delete(m); +} + +void +main_destroy(struct main_context *m) +{ + if (!m) + return; + main_prepare_delete(m); + + // Close all files + clist_insert_list_after(&m->file_active_list, m->file_list.head.prev); +#ifdef CONFIG_UCW_EPOLL + clist_insert_list_after(&m->file_recalc_list, m->file_list.head.prev); +#endif + CLIST_FOR_EACH(struct main_file *, f, m->file_list) + close(f->fd); + + main_do_delete(m); } struct main_context * @@ -108,15 +214,6 @@ main_switch_context(struct main_context *m) return m0; } -struct main_context * -main_current(void) -{ - struct ucwlib_context *c = ucwlib_thread_context(); - struct main_context *m = c->main_context; - ASSERT(m); - return m; -} - void main_init(void) { @@ -127,20 +224,19 @@ main_init(void) void main_cleanup(void) { - struct main_context *m = main_switch_context(NULL); - main_delete(m); + main_delete(main_current_nocheck()); } void -main_get_time(void) +main_teardown(void) { - do_main_get_time(main_current()); + main_destroy(main_current_nocheck()); } -static inline uns -count_timers(struct main_context *m) +void +main_get_time(void) { - return GARY_SIZE(m->timer_table) - 1; + main_get_time_ctx(main_current()); } void @@ -205,6 +301,17 @@ timer_del(struct main_timer *tm) timer_add(tm, 0); } +static uns +file_want_events(struct main_file *fi) +{ + uns events = 0; + if (fi->read_handler) + events |= POLLIN | POLLHUP | POLLERR; + if (fi->write_handler) + events |= POLLOUT | POLLERR; + return events; +} + void file_add(struct main_file *fi) { @@ -214,7 +321,17 @@ file_add(struct main_file *fi) ASSERT(!clist_is_linked(&fi->n)); clist_add_tail(&m->file_list, &fi->n); m->file_cnt++; +#ifdef CONFIG_UCW_EPOLL + struct epoll_event evt = { + .events = file_want_events(fi), + .data.ptr = fi, + }; + if (epoll_ctl(m->epoll_fd, EPOLL_CTL_ADD, fi->fd, &evt) < 0) + die("epoll_ctl() failed: %m"); + fi->last_want_events = evt.events; +#else m->poll_table_obsolete = 1; +#endif if (fcntl(fi->fd, F_SETFL, O_NONBLOCK) < 0) msg(L_ERROR, "Error setting fd %d to non-blocking mode: %m. Keep fingers crossed.", fi->fd); } @@ -222,36 +339,37 @@ file_add(struct main_file *fi) void file_chg(struct main_file *fi) { +#ifdef CONFIG_UCW_EPOLL + clist_remove(&fi->n); + clist_add_tail(&main_current()->file_recalc_list, &fi->n); +#else struct pollfd *p = fi->pollfd; if (p) - { - p->events = 0; - if (fi->read_handler) - p->events |= POLLIN | POLLHUP | POLLERR; - if (fi->write_handler) - p->events |= POLLOUT | POLLERR; - } + p->events = file_want_events(fi); +#endif } -void -file_del(struct main_file *fi) +static void +file_del_ctx(struct main_context *m, struct main_file *fi) { - struct main_context *m = main_current(); - + // XXX: Can be called on a non-current context DBG("MAIN: Deleting file %p (fd=%d)", fi, fi->fd); + ASSERT(clist_is_linked(&fi->n)); clist_unlink(&fi->n); m->file_cnt--; +#ifdef CONFIG_UCW_EPOLL + if (m->epoll_fd >= 0 && epoll_ctl(m->epoll_fd, EPOLL_CTL_DEL, fi->fd, NULL) < 0) + die("epoll_ctl() failed: %m"); +#else m->poll_table_obsolete = 1; +#endif } void -file_close_all(void) +file_del(struct main_file *fi) { - struct main_context *m = main_current(); - - CLIST_FOR_EACH(struct main_file *, f, m->file_list) - close(f->fd); + file_del_ctx(main_current(), fi); } void @@ -444,11 +562,10 @@ signal_add(struct main_signal *ms) sigaddset(&m->want_signals, ms->signum); } -void -signal_del(struct main_signal *ms) +static void +signal_del_ctx(struct main_context *m, struct main_signal *ms) { - struct main_context *m = main_current(); - + // XXX: Can be called on a non-current context DBG("MAIN: Deleting signal %p (sig=%d)", ms, ms->signum); ASSERT(clist_is_linked(&ms->n)); @@ -460,69 +577,112 @@ signal_del(struct main_signal *ms) another++; if (!another) { - sigset_t ss; - sigemptyset(&ss); - sigaddset(&ss, ms->signum); - THREAD_SIGMASK(SIG_BLOCK, &ss, NULL); + if (main_is_current(m)) + { + sigset_t ss; + sigemptyset(&ss); + sigaddset(&ss, ms->signum); + THREAD_SIGMASK(SIG_BLOCK, &ss, NULL); + } sigdelset(&m->want_signals, ms->signum); } } void -main_debug_context(struct main_context *m UNUSED) +signal_del(struct main_signal *ms) { + signal_del_ctx(main_current(), ms); +} + #ifdef CONFIG_DEBUG + +void +file_debug(struct main_file *fi) +{ + msg(L_DEBUG, "\t\t%p (fd %d, rh %p, wh %p, data %p)", + fi, fi->fd, fi->read_handler, fi->write_handler, fi->data); +} + +void +hook_debug(struct main_hook *ho) +{ + msg(L_DEBUG, "\t\t%p (func %p, data %p)", ho, ho->handler, ho->data); +} + +void +signal_debug(struct main_signal *sg) +{ + if (sg->signum < 0) + msg(L_DEBUG, "\t\t(placeholder)"); + else + msg(L_DEBUG, "\t\t%p (sig %d, func %p, data %p)", sg, sg->signum, sg->handler, sg->data); +} + +static void +timer_debug_ctx(struct main_context *m, struct main_timer *tm) +{ + msg(L_DEBUG, "\t\t%p (expires %lld, data %p)", tm, (long long)(tm->expires - m->now), tm->data); +} + +void +timer_debug(struct main_timer *tm) +{ + timer_debug_ctx(main_current(), tm); +} + +void +process_debug(struct main_process *pr) +{ + msg(L_DEBUG, "\t\t%p (pid %d, func %p, data %p)", pr, pr->pid, pr->handler, pr->data); +} + +void +main_debug_context(struct main_context *m UNUSED) +{ msg(L_DEBUG, "### Main loop status on %lld", (long long) m->now); msg(L_DEBUG, "\tActive timers:"); uns num_timers = count_timers(m); for (uns i = 1; i <= num_timers; i++) - { - struct main_timer *tm = m->timer_table[i]; - msg(L_DEBUG, "\t\t%p (expires %lld, data %p)", tm, (long long)(tm->expires ? tm->expires - m->now : 999999), tm->data); - } + timer_debug(m->timer_table[i]); msg(L_DEBUG, "\tActive files:"); CLIST_FOR_EACH(struct main_file *, fi, m->file_list) - msg(L_DEBUG, "\t\t%p (fd %d, rh %p, wh %p, data %p)", - fi, fi->fd, fi->read_handler, fi->write_handler, fi->data); - // FIXME: Can we display status of block_io requests somehow? + file_debug(fi); + CLIST_FOR_EACH(struct main_file *, fi, m->file_active_list) + file_debug(fi); +#ifdef CONFIG_UCW_EPOLL + CLIST_FOR_EACH(struct main_file *, fi, m->file_recalc_list) + file_debug(fi); +#endif msg(L_DEBUG, "\tActive hooks:"); CLIST_FOR_EACH(struct main_hook *, ho, m->hook_done_list) - msg(L_DEBUG, "\t\t%p (func %p, data %p)", ho, ho->handler, ho->data); + hook_debug(ho); CLIST_FOR_EACH(struct main_hook *, ho, m->hook_list) - msg(L_DEBUG, "\t\t%p (func %p, data %p)", ho, ho->handler, ho->data); + hook_debug(ho); msg(L_DEBUG, "\tActive processes:"); CLIST_FOR_EACH(struct main_process *, pr, m->process_list) - msg(L_DEBUG, "\t\t%p (pid %d, func %p, data %p)", pr, pr->pid, pr->handler, pr->data); + process_debug(pr); msg(L_DEBUG, "\tActive signal catchers:"); CLIST_FOR_EACH(struct main_signal *, sg, m->signal_list) - if (sg->signum < 0) - msg(L_DEBUG, "\t\t(placeholder)"); - else - msg(L_DEBUG, "\t\t%p (sig %d, func %p, data %p)", sg, sg->signum, sg->handler, sg->data); -#endif + signal_debug(sg); } -static void -main_rebuild_poll_table(struct main_context *m) -{ - GARY_INIT_OR_RESIZE(m->poll_table, m->file_cnt); - DBG("MAIN: Rebuilding poll table: %d entries", m->file_cnt); +#else - struct pollfd *p = m->poll_table; - CLIST_FOR_EACH(struct main_file *, fi, m->file_list) - { - p->fd = fi->fd; - fi->pollfd = p++; - file_chg(fi); - } - m->poll_table_obsolete = 0; -} +// Stubs +void file_debug(struct main_file *fi UNUSED) { } +void hook_debug(struct main_hook *ho UNUSED) { } +void signal_debug(struct main_signal *sg UNUSED) { } +void timer_debug(struct main_timer *tm UNUSED) { } +void process_debug(struct main_process *pr UNUSED) { } +void main_debug_context(struct main_context *m UNUSED) { } + +#endif static void process_timers(struct main_context *m) { struct main_timer *tm; - while (GARY_SIZE(m->timer_table) > 1 && (tm = m->timer_table[1])->expires <= m->now) + while (count_timers(m) && (tm = m->timer_table[1])->expires <= m->now) { DBG("MAIN: Timer %p expired at now-%lld", tm, (long long)(m->now - tm->expires)); tm->handler(tm); @@ -549,7 +709,7 @@ process_hooks(struct main_context *m) hook_min == HOOK_DONE && hook_max == HOOK_DONE || m->shutdown) { - DBG("MAIN: Shut down by %s", m->shutdown ? "main_shutdown" : "a hook"); + DBG("MAIN: Shut down by %s", m->shutdown ? "main_shut_down" : "a hook"); return HOOK_SHUTDOWN; } if (hook_max == HOOK_RETRY) @@ -558,13 +718,62 @@ process_hooks(struct main_context *m) return HOOK_IDLE; } +#ifdef CONFIG_UCW_EPOLL + +static void +recalc_files(struct main_context *m) +{ + struct main_file *fi; + + while (fi = clist_remove_head(&m->file_recalc_list)) + { + struct epoll_event evt = { + .events = file_want_events(fi), + .data.ptr = fi, + }; + if (evt.events != fi->last_want_events) + { + DBG("MAIN: Changing requested events for fd %d to %x", fi->fd, evt.events); + fi->last_want_events = evt.events; + if (epoll_ctl(main_current()->epoll_fd, EPOLL_CTL_MOD, fi->fd, &evt) < 0) + die("epoll_ctl() failed: %m"); + } + clist_add_tail(&m->file_list, &fi->n); + } +} + +#else + +static void +rebuild_poll_table(struct main_context *m) +{ + GARY_INIT_OR_RESIZE(m->poll_table, m->file_cnt); + GARY_INIT_OR_RESIZE(m->poll_file_table, m->file_cnt); + DBG("MAIN: Rebuilding poll table: %d entries", m->file_cnt); + + struct pollfd *p = m->poll_table; + struct main_file **pf = m->poll_file_table; + CLIST_FOR_EACH(struct main_file *, fi, m->file_list) + { + p->fd = fi->fd; + p->events = file_want_events(fi); + fi->pollfd = p++; + *pf++ = fi; + } + m->poll_table_obsolete = 0; +} + +#endif + void main_loop(void) { DBG("MAIN: Entering main_loop"); struct main_context *m = main_current(); - do_main_get_time(m); + main_get_time_ctx(m); + m->shutdown = 0; + for (;;) { timestamp_t wake = m->now + 1000000000; @@ -578,144 +787,102 @@ main_loop(void) break; default: ; } + + int timeout = 0; + if (!m->single_step) + { + if (count_timers(m)) + wake = MIN(wake, m->timer_table[1]->expires); + main_get_time_ctx(m); + timeout = ((wake > m->now) ? wake - m->now : 0); + } + +#ifdef CONFIG_UCW_EPOLL + recalc_files(m); + DBG("MAIN: Epoll for %d fds and timeout %d ms", m->file_cnt, timeout); + int n = epoll_wait(m->epoll_fd, m->epoll_events, EPOLL_BUF_SIZE, timeout); +#else if (m->poll_table_obsolete) - main_rebuild_poll_table(m); - if (count_timers(m)) - wake = MIN(wake, m->timer_table[1]->expires); - do_main_get_time(m); - int timeout = ((wake > m->now) ? wake - m->now : 0); + rebuild_poll_table(m); DBG("MAIN: Poll for %d fds and timeout %d ms", m->file_cnt, timeout); - int p = poll(m->poll_table, m->file_cnt, timeout); + int n = poll(m->poll_table, m->file_cnt, timeout); +#endif + + DBG("\t-> %d events", n); + if (n < 0 && errno != EAGAIN && errno != EINTR) + die("(e)poll failed: %m"); timestamp_t old_now = m->now; - do_main_get_time(m); + main_get_time_ctx(m); m->idle_time += m->now - old_now; - if (p > 0) + + if (n <= 0) { - struct pollfd *p = m->poll_table; - CLIST_FOR_EACH(struct main_file *, fi, m->file_list) - { - if (p->revents & (POLLIN | POLLHUP | POLLERR)) - { - do - DBG("MAIN: Read event on fd %d", p->fd); - while (fi->read_handler && fi->read_handler(fi) && !m->poll_table_obsolete); - if (m->poll_table_obsolete) /* File entries have been inserted or deleted => better not risk continuing to nowhere */ - break; - } - if (p->revents & (POLLOUT | POLLERR)) - { - do - DBG("MAIN: Write event on fd %d", p->fd); - while (fi->write_handler && fi->write_handler(fi) && !m->poll_table_obsolete); - if (m->poll_table_obsolete) - break; - } - p++; - } + if (m->single_step) + return; + else + continue; } - } -} - -#ifdef TEST - -static struct main_process mp; -static struct main_block_io fin, fout; -static struct main_hook hook; -static struct main_timer tm; -static struct main_signal sg; -static byte rb[16]; + // Relink all files with a pending event to file_active_list +#ifdef CONFIG_UCW_EPOLL + for (int i=0; iepoll_events[i]; + struct main_file *fi = e->data.ptr; + clist_remove(&fi->n); + clist_add_tail(&m->file_active_list, &fi->n); + fi->events = e->events; + } +#else + struct pollfd *p = m->poll_table; + struct main_file **pf = m->poll_file_table; + for (uns i=0; i < m->file_cnt; i++) + if (p[i].revents) + { + struct main_file *fi = pf[i]; + clist_remove(&fi->n); + clist_add_tail(&m->file_active_list, &fi->n); + fi->events = p[i].revents; + } +#endif -static void dread(struct main_block_io *bio) -{ - if (bio->rpos < bio->rlen) - { - msg(L_INFO, "Read EOF"); - block_io_del(bio); - } - else - { - msg(L_INFO, "Read done"); - block_io_read(bio, rb, sizeof(rb)); + /* + * Process the buffered file events. This is pretty tricky, since + * user callbacks can modify the file structure or even destroy it. + * In such cases, we detect that the structure was relinked and stop + * processing its events, leaving them for the next iteration of the + * main loop. + */ + struct main_file *fi; + while (fi = clist_head(&m->file_active_list)) + { + if (fi->events & (POLLIN | POLLHUP | POLLERR)) + { + fi->events &= ~(POLLIN | POLLHUP | POLLERR); + do + DBG("MAIN: Read event on fd %d", fi->fd); + while (fi->read_handler && fi->read_handler(fi)); + continue; + } + if (fi->events & (POLLOUT | POLLERR)) + { + fi->events &= ~(POLLOUT | POLLERR); + do + DBG("MAIN: Write event on fd %d", fi->fd); + while (fi->write_handler && fi->write_handler(fi)); + continue; + } + clist_remove(&fi->n); + clist_add_tail(&m->file_list, &fi->n); + } } } -static void derror(struct main_block_io *bio, int cause) -{ - msg(L_INFO, "Error: %m !!! (cause %d)", cause); - block_io_del(bio); -} - -static void dwrite(struct main_block_io *bio UNUSED) -{ - msg(L_INFO, "Write done"); -} - -static int dhook(struct main_hook *ho UNUSED) -{ - msg(L_INFO, "Hook called"); - return 0; -} - -static void dtimer(struct main_timer *tm) -{ - msg(L_INFO, "Timer tick"); - timer_add_rel(tm, 11000); - timer_add_rel(tm, 10000); -} - -static void dentry(void) -{ - msg(L_INFO, "*** SUBPROCESS START ***"); - sleep(2); - msg(L_INFO, "*** SUBPROCESS FINISH ***"); - exit(0); -} - -static void dexit(struct main_process *pr) -{ - msg(L_INFO, "Subprocess %d exited with status %x", pr->pid, pr->status); -} - -static void dsignal(struct main_signal *sg UNUSED) -{ - msg(L_INFO, "SIGINT received (use Ctrl-\\ to really quit)"); -} - -int -main(void) +void +main_step(void) { - log_init(NULL); - main_init(); - - fin.read_done = dread; - fin.error_handler = derror; - block_io_add(&fin, 0); - block_io_read(&fin, rb, sizeof(rb)); - - fout.write_done = dwrite; - fout.error_handler = derror; - block_io_add(&fout, 1); - block_io_write(&fout, "Hello, world!\n", 14); - - hook.handler = dhook; - hook_add(&hook); - - tm.handler = dtimer; - timer_add_rel(&tm, 1000); - - sg.signum = SIGINT; - sg.handler = dsignal; - signal_add(&sg); - - mp.handler = dexit; - if (!process_fork(&mp)) - dentry(); - - main_debug(); - + struct main_context *m = main_current(); + m->single_step = 1; main_loop(); - msg(L_INFO, "Finished."); + m->single_step = 0; } - -#endif