From: Martin Mares Date: Mon, 11 Dec 2006 22:38:05 +0000 (+0100) Subject: Added a library module for distributing work between threads. X-Git-Tag: holmes-import~507^2~19 X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=e6b2a1b6b54dd9bf49f47431769225028dc8aa8d;p=libucw.git Added a library module for distributing work between threads. (Ported from the dev-sorter branch.) --- diff --git a/lib/workqueue.c b/lib/workqueue.c new file mode 100644 index 00000000..3d87a74e --- /dev/null +++ b/lib/workqueue.c @@ -0,0 +1,255 @@ +/* + * UCW Library -- Thread Pools and Work Queues + * + * (c) 2006 Martin Mares + * + * This software may be freely distributed and used according to the terms + * of the GNU Lesser General Public License. + */ + +#include "lib/lib.h" +#include "lib/threads.h" +#include "lib/workqueue.h" + +static void * +worker_thread_init(void *arg) +{ + struct worker_thread *t = arg; + struct worker_pool *pool = t->pool; + + if (pool->init_thread) + pool->init_thread(t); + sem_post(pool->init_cleanup_sem); + + for (;;) + { + struct work *w = raw_queue_get(&pool->requests); + w->go(t, w); + raw_queue_put(&w->reply_to->finished, w); + } + + return NULL; +} + +static void +worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED) +{ + if (t->pool->cleanup_thread) + t->pool->cleanup_thread(t); + sem_post(t->pool->init_cleanup_sem); + pthread_exit(NULL); +} + +void +worker_pool_init(struct worker_pool *p) +{ + clist_init(&p->worker_threads); + raw_queue_init(&p->requests); + p->init_cleanup_sem = sem_alloc(); + + pthread_attr_t attr; + if (pthread_attr_init(&attr) < 0 || + pthread_attr_setstacksize(&attr, p->stack_size ? : default_thread_stack_size) < 0) + ASSERT(0); + + for (uns i=0; i < p->num_threads; i++) + { + struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t))); + t->pool = p; + t->id = i; + int err = pthread_create(&t->thread, &attr, worker_thread_init, t); + if (err) + die("Unable to create thread: %m"); + clist_add_tail(&p->worker_threads, &t->n); + sem_wait(p->init_cleanup_sem); + } + + pthread_attr_destroy(&attr); +} + +void +worker_pool_cleanup(struct worker_pool *p) +{ + for (uns i=0; i < p->num_threads; i++) + { + struct work w = { + .go = worker_thread_signal_finish + }; + raw_queue_put(&p->requests, &w); + sem_wait(p->init_cleanup_sem); + } + + struct worker_thread *tmp; + CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp) + { + int err = pthread_join(t->thread, NULL); + ASSERT(!err); + if (p->free_thread) + p->free_thread(t); + else + xfree(t); + } + raw_queue_cleanup(&p->requests); + sem_free(p->init_cleanup_sem); +} + +void +raw_queue_init(struct raw_queue *q) +{ + pthread_mutex_init(&q->queue_mutex, NULL); + clist_init(&q->queue); + q->queue_sem = sem_alloc(); +} + +void +raw_queue_cleanup(struct raw_queue *q) +{ + ASSERT(clist_empty(&q->queue)); + sem_free(q->queue_sem); + pthread_mutex_destroy(&q->queue_mutex); +} + +void +raw_queue_put(struct raw_queue *q, struct work *w) +{ + pthread_mutex_lock(&q->queue_mutex); + clist_add_tail(&q->queue, &w->n); + pthread_mutex_unlock(&q->queue_mutex); + sem_post(q->queue_sem); +} + +static inline struct work * +raw_queue_do_get(struct raw_queue *q) +{ + pthread_mutex_lock(&q->queue_mutex); + struct work *w = clist_head(&q->queue); + ASSERT(w); + clist_remove(&w->n); + pthread_mutex_unlock(&q->queue_mutex); + return w; +} + +struct work * +raw_queue_get(struct raw_queue *q) +{ + sem_wait(q->queue_sem); + return raw_queue_do_get(q); +} + +struct work * +raw_queue_try_get(struct raw_queue *q) +{ + if (!sem_trywait(q->queue_sem)) + return raw_queue_do_get(q); + else + return NULL; +} + +void +work_queue_init(struct worker_pool *p, struct work_queue *q) +{ + q->pool = p; + q->nr_running = 0; + raw_queue_init(&q->finished); +} + +void +work_queue_cleanup(struct work_queue *q) +{ + ASSERT(!q->nr_running); + raw_queue_cleanup(&q->finished); +} + +void +work_submit(struct work_queue *q, struct work *w) +{ + ASSERT(w->go); + w->reply_to = q; + raw_queue_put(&q->pool->requests, w); + q->nr_running++; +} + +static struct work * +work_do_wait(struct work_queue *q, int try) +{ + while (q->nr_running) + { + struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished); + if (!w) + return NULL; + q->nr_running--; + if (w->returned) + w->returned(q, w); + else + return w; + } + return NULL; +} + +struct work * +work_wait(struct work_queue *q) +{ + return work_do_wait(q, 0); +} + +struct work * +work_try_wait(struct work_queue *q) +{ + return work_do_wait(q, 1); +} + +#ifdef TEST + +#include + +static void wt_init(struct worker_thread *t) +{ + log(L_INFO, "INIT %d", t->id); +} + +static void wt_cleanup(struct worker_thread *t) +{ + log(L_INFO, "CLEANUP %d", t->id); +} + +struct w { + struct work w; + uns id; +}; + +static void go(struct worker_thread *t, struct work *w) +{ + log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id); + usleep(1); +} + +int main(void) +{ + struct worker_pool pool = { + .num_threads = 10, + .stack_size = 65536, + .init_thread = wt_init, + .cleanup_thread = wt_cleanup + }; + worker_pool_init(&pool); + + struct work_queue q; + work_queue_init(&pool, &q); + for (uns i=0; i<500; i++) + { + struct w *w = xmalloc_zero(sizeof(*w)); + w->w.go = go; + w->id = i; + work_submit(&q, &w->w); + } + + struct w *w; + while (w = (struct w *) work_wait(&q)) + log(L_INFO, "Finished request %d", w->id); + + work_queue_cleanup(&q); + worker_pool_cleanup(&pool); + return 0; +} + +#endif diff --git a/lib/workqueue.h b/lib/workqueue.h new file mode 100644 index 00000000..c16904ab --- /dev/null +++ b/lib/workqueue.h @@ -0,0 +1,87 @@ +/* + * UCW Library -- Thread Pools and Work Queues + * + * (c) 2006 Martin Mares + * + * This software may be freely distributed and used according to the terms + * of the GNU Lesser General Public License. + */ + +#ifndef _UCW_WORKQUEUE_H +#define _UCW_WORKQUEUE_H + +/* + * A thread pool is a set of threads receiving work requests from a common queue, + * each work request contains a pointer to a function inside the thread. + * + * A work queue is an interface for submitting work requests. It's bound to a single + * thread pool, it remembers running requests and gathers replies. A single work queue + * should not be used by multiple threads simultaneously. + * + * When a thread pool is initialized, new_thread() is called for every thread first, + * allocating struct worker_thread (and user-defined thread context following it) for + * each thread. Then the threads are fired and each of them executes the init_thread() + * callback. These callbacks are serialized and worker_pool_init() function waits + * until all of them finish. + */ + +#include "lib/semaphore.h" +#include "lib/clists.h" + +#include + +struct worker_thread { // One of threads serving requests + cnode n; + pthread_t thread; + struct worker_pool *pool; + int id; // Inside the pool + /* user-defined data can follow */ +}; + +struct raw_queue { // Generic queue with locking + pthread_mutex_t queue_mutex; + clist queue; + sem_t *queue_sem; // Number of requests queued +}; + +struct worker_pool { + struct raw_queue requests; + uns num_threads; + uns stack_size; // 0 for default + struct worker_thread *(*new_thread)(void); // default: xmalloc the struct + void (*free_thread)(struct worker_thread *t); // default: xfree + void (*init_thread)(struct worker_thread *t); // default: empty + void (*cleanup_thread)(struct worker_thread *t); // default: empty + clist worker_threads; + sem_t *init_cleanup_sem; +}; + +struct work_queue { + struct worker_pool *pool; + uns nr_running; // Number of requests in service + struct raw_queue finished; // Finished requests queue up here +}; + +struct work { // A single request + cnode n; + struct work_queue *reply_to; // Where to queue the request when it's finished + void (*go)(struct worker_thread *t, struct work *w); // Called inside the worker thread + void (*returned)(struct work_queue *q, struct work *w); // Called when returned back, NULL if work_wait should return +}; + +void worker_pool_init(struct worker_pool *p); +void worker_pool_cleanup(struct worker_pool *p); + +void raw_queue_init(struct raw_queue *q); +void raw_queue_cleanup(struct raw_queue *q); +void raw_queue_put(struct raw_queue *q, struct work *w); +struct work *raw_queue_get(struct raw_queue *q); +struct work *raw_queue_try_get(struct raw_queue *q); + +void work_queue_init(struct worker_pool *p, struct work_queue *q); +void work_queue_cleanup(struct work_queue *q); +void work_submit(struct work_queue *q, struct work *w); +struct work *work_wait(struct work_queue *q); +struct work *work_try_wait(struct work_queue *q); + +#endif /* !_UCW_WORKQUEUE_H */