]> mj.ucw.cz Git - libucw.git/commitdiff
Added a library module for distributing work between threads.
authorMartin Mares <mj@ucw.cz>
Mon, 11 Dec 2006 22:38:05 +0000 (23:38 +0100)
committerMartin Mares <mj@ucw.cz>
Mon, 11 Dec 2006 22:38:05 +0000 (23:38 +0100)
(Ported from the dev-sorter branch.)

lib/workqueue.c [new file with mode: 0644]
lib/workqueue.h [new file with mode: 0644]

diff --git a/lib/workqueue.c b/lib/workqueue.c
new file mode 100644 (file)
index 0000000..3d87a74
--- /dev/null
@@ -0,0 +1,255 @@
+/*
+ *     UCW Library -- Thread Pools and Work Queues
+ *
+ *     (c) 2006 Martin Mares <mj@ucw.cz>
+ *
+ *     This software may be freely distributed and used according to the terms
+ *     of the GNU Lesser General Public License.
+ */
+
+#include "lib/lib.h"
+#include "lib/threads.h"
+#include "lib/workqueue.h"
+
+static void *
+worker_thread_init(void *arg)
+{
+  struct worker_thread *t = arg;
+  struct worker_pool *pool = t->pool;
+
+  if (pool->init_thread)
+    pool->init_thread(t);
+  sem_post(pool->init_cleanup_sem);
+
+  for (;;)
+    {
+      struct work *w = raw_queue_get(&pool->requests);
+      w->go(t, w);
+      raw_queue_put(&w->reply_to->finished, w);
+    }
+
+  return NULL;
+}
+
+static void
+worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
+{
+  if (t->pool->cleanup_thread)
+    t->pool->cleanup_thread(t);
+  sem_post(t->pool->init_cleanup_sem);
+  pthread_exit(NULL);
+}
+
+void
+worker_pool_init(struct worker_pool *p)
+{
+  clist_init(&p->worker_threads);
+  raw_queue_init(&p->requests);
+  p->init_cleanup_sem = sem_alloc();
+
+  pthread_attr_t attr;
+  if (pthread_attr_init(&attr) < 0 ||
+      pthread_attr_setstacksize(&attr, p->stack_size ? : default_thread_stack_size) < 0)
+    ASSERT(0);
+
+  for (uns i=0; i < p->num_threads; i++)
+    {
+      struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
+      t->pool = p;
+      t->id = i;
+      int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
+      if (err)
+       die("Unable to create thread: %m");
+      clist_add_tail(&p->worker_threads, &t->n);
+      sem_wait(p->init_cleanup_sem);
+    }
+
+  pthread_attr_destroy(&attr);
+}
+
+void
+worker_pool_cleanup(struct worker_pool *p)
+{
+  for (uns i=0; i < p->num_threads; i++)
+    {
+      struct work w = {
+       .go = worker_thread_signal_finish
+      };
+      raw_queue_put(&p->requests, &w);
+      sem_wait(p->init_cleanup_sem);
+    }
+
+  struct worker_thread *tmp;
+  CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
+    {
+      int err = pthread_join(t->thread, NULL);
+      ASSERT(!err);
+      if (p->free_thread)
+       p->free_thread(t);
+      else
+       xfree(t);
+    }
+  raw_queue_cleanup(&p->requests);
+  sem_free(p->init_cleanup_sem);
+}
+
+void
+raw_queue_init(struct raw_queue *q)
+{
+  pthread_mutex_init(&q->queue_mutex, NULL);
+  clist_init(&q->queue);
+  q->queue_sem = sem_alloc();
+}
+
+void
+raw_queue_cleanup(struct raw_queue *q)
+{
+  ASSERT(clist_empty(&q->queue));
+  sem_free(q->queue_sem);
+  pthread_mutex_destroy(&q->queue_mutex);
+}
+
+void
+raw_queue_put(struct raw_queue *q, struct work *w)
+{
+  pthread_mutex_lock(&q->queue_mutex);
+  clist_add_tail(&q->queue, &w->n);
+  pthread_mutex_unlock(&q->queue_mutex);
+  sem_post(q->queue_sem);
+}
+
+static inline struct work *
+raw_queue_do_get(struct raw_queue *q)
+{
+  pthread_mutex_lock(&q->queue_mutex);
+  struct work *w = clist_head(&q->queue);
+  ASSERT(w);
+  clist_remove(&w->n);
+  pthread_mutex_unlock(&q->queue_mutex);
+  return w;
+}
+
+struct work *
+raw_queue_get(struct raw_queue *q)
+{
+  sem_wait(q->queue_sem);
+  return raw_queue_do_get(q);
+}
+
+struct work *
+raw_queue_try_get(struct raw_queue *q)
+{
+  if (!sem_trywait(q->queue_sem))
+    return raw_queue_do_get(q);
+  else
+    return NULL;
+}
+
+void
+work_queue_init(struct worker_pool *p, struct work_queue *q)
+{
+  q->pool = p;
+  q->nr_running = 0;
+  raw_queue_init(&q->finished);
+}
+
+void
+work_queue_cleanup(struct work_queue *q)
+{
+  ASSERT(!q->nr_running);
+  raw_queue_cleanup(&q->finished);
+}
+
+void
+work_submit(struct work_queue *q, struct work *w)
+{
+  ASSERT(w->go);
+  w->reply_to = q;
+  raw_queue_put(&q->pool->requests, w);
+  q->nr_running++;
+}
+
+static struct work *
+work_do_wait(struct work_queue *q, int try)
+{
+  while (q->nr_running)
+    {
+      struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
+      if (!w)
+       return NULL;
+      q->nr_running--;
+      if (w->returned)
+       w->returned(q, w);
+      else
+       return w;
+    }
+  return NULL;
+}
+
+struct work *
+work_wait(struct work_queue *q)
+{
+  return work_do_wait(q, 0);
+}
+
+struct work *
+work_try_wait(struct work_queue *q)
+{
+  return work_do_wait(q, 1);
+}
+
+#ifdef TEST
+
+#include <unistd.h>
+
+static void wt_init(struct worker_thread *t)
+{
+  log(L_INFO, "INIT %d", t->id);
+}
+
+static void wt_cleanup(struct worker_thread *t)
+{
+  log(L_INFO, "CLEANUP %d", t->id);
+}
+
+struct w {
+  struct work w;
+  uns id;
+};
+
+static void go(struct worker_thread *t, struct work *w)
+{
+  log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
+  usleep(1);
+}
+
+int main(void)
+{
+  struct worker_pool pool = {
+    .num_threads = 10,
+    .stack_size = 65536,
+    .init_thread = wt_init,
+    .cleanup_thread = wt_cleanup
+  };
+  worker_pool_init(&pool);
+
+  struct work_queue q;
+  work_queue_init(&pool, &q);
+  for (uns i=0; i<500; i++)
+    {
+      struct w *w = xmalloc_zero(sizeof(*w));
+      w->w.go = go;
+      w->id = i;
+      work_submit(&q, &w->w);
+    }
+
+  struct w *w;
+  while (w = (struct w *) work_wait(&q))
+    log(L_INFO, "Finished request %d", w->id);
+
+  work_queue_cleanup(&q);
+  worker_pool_cleanup(&pool);
+  return 0;
+}
+
+#endif
diff --git a/lib/workqueue.h b/lib/workqueue.h
new file mode 100644 (file)
index 0000000..c16904a
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ *     UCW Library -- Thread Pools and Work Queues
+ *
+ *     (c) 2006 Martin Mares <mj@ucw.cz>
+ *
+ *     This software may be freely distributed and used according to the terms
+ *     of the GNU Lesser General Public License.
+ */
+
+#ifndef _UCW_WORKQUEUE_H
+#define _UCW_WORKQUEUE_H
+
+/*
+ *  A thread pool is a set of threads receiving work requests from a common queue,
+ *  each work request contains a pointer to a function inside the thread.
+ *
+ *  A work queue is an interface for submitting work requests. It's bound to a single
+ *  thread pool, it remembers running requests and gathers replies. A single work queue
+ *  should not be used by multiple threads simultaneously.
+ *
+ *  When a thread pool is initialized, new_thread() is called for every thread first,
+ *  allocating struct worker_thread (and user-defined thread context following it) for
+ *  each thread. Then the threads are fired and each of them executes the init_thread()
+ *  callback. These callbacks are serialized and worker_pool_init() function waits
+ *  until all of them finish.
+ */
+
+#include "lib/semaphore.h"
+#include "lib/clists.h"
+
+#include <pthread.h>
+
+struct worker_thread {                         // One of threads serving requests
+  cnode n;
+  pthread_t thread;
+  struct worker_pool *pool;
+  int id;                                      // Inside the pool
+  /* user-defined data can follow */
+};
+
+struct raw_queue {                             // Generic queue with locking
+  pthread_mutex_t queue_mutex;
+  clist queue;
+  sem_t *queue_sem;                            // Number of requests queued
+};
+
+struct worker_pool {
+  struct raw_queue requests;
+  uns num_threads;
+  uns stack_size;                              // 0 for default
+  struct worker_thread *(*new_thread)(void);   // default: xmalloc the struct
+  void (*free_thread)(struct worker_thread *t);        // default: xfree
+  void (*init_thread)(struct worker_thread *t);        // default: empty
+  void (*cleanup_thread)(struct worker_thread *t); // default: empty
+  clist worker_threads;
+  sem_t *init_cleanup_sem;
+};
+
+struct work_queue {
+  struct worker_pool *pool;
+  uns nr_running;                              // Number of requests in service
+  struct raw_queue finished;                   // Finished requests queue up here
+};
+
+struct work {                                  // A single request
+  cnode n;
+  struct work_queue *reply_to;                 // Where to queue the request when it's finished
+  void (*go)(struct worker_thread *t, struct work *w);         // Called inside the worker thread
+  void (*returned)(struct work_queue *q, struct work *w);      // Called when returned back, NULL if work_wait should return
+};
+
+void worker_pool_init(struct worker_pool *p);
+void worker_pool_cleanup(struct worker_pool *p);
+
+void raw_queue_init(struct raw_queue *q);
+void raw_queue_cleanup(struct raw_queue *q);
+void raw_queue_put(struct raw_queue *q, struct work *w);
+struct work *raw_queue_get(struct raw_queue *q);
+struct work *raw_queue_try_get(struct raw_queue *q);
+
+void work_queue_init(struct worker_pool *p, struct work_queue *q);
+void work_queue_cleanup(struct work_queue *q);
+void work_submit(struct work_queue *q, struct work *w);
+struct work *work_wait(struct work_queue *q);
+struct work *work_try_wait(struct work_queue *q);
+
+#endif /* !_UCW_WORKQUEUE_H */