2 * UCW Library -- Thread Pools and Work Queues
4 * (c) 2006 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
11 #include "lib/workqueue.h"
14 worker_thread_init(void *arg)
16 struct worker_thread *t = arg;
17 struct worker_pool *pool = t->pool;
19 if (pool->init_thread)
21 sem_post(pool->init_cleanup_sem);
25 struct work *w = raw_queue_get(&pool->requests);
27 raw_queue_put(&w->reply_to->finished, w);
34 worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
36 if (t->pool->cleanup_thread)
37 t->pool->cleanup_thread(t);
38 sem_post(t->pool->init_cleanup_sem);
43 worker_pool_init(struct worker_pool *p)
45 clist_init(&p->worker_threads);
46 raw_queue_init(&p->requests);
47 p->init_cleanup_sem = sem_alloc();
50 if (pthread_attr_init(&attr) < 0 ||
51 pthread_attr_setstacksize(&attr, p->stack_size) < 0)
54 for (uns i=0; i < p->num_threads; i++)
56 struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
59 int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
61 die("Unable to create thread: %m");
62 clist_add_tail(&p->worker_threads, &t->n);
63 sem_wait(p->init_cleanup_sem);
66 pthread_attr_destroy(&attr);
70 worker_pool_cleanup(struct worker_pool *p)
72 for (uns i=0; i < p->num_threads; i++)
75 .go = worker_thread_signal_finish
77 raw_queue_put(&p->requests, &w);
78 sem_wait(p->init_cleanup_sem);
81 struct worker_thread *tmp;
82 CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
84 int err = pthread_join(t->thread, NULL);
91 raw_queue_cleanup(&p->requests);
92 sem_free(p->init_cleanup_sem);
96 raw_queue_init(struct raw_queue *q)
98 pthread_mutex_init(&q->queue_mutex, NULL);
99 clist_init(&q->queue);
100 q->queue_sem = sem_alloc();
104 raw_queue_cleanup(struct raw_queue *q)
106 ASSERT(clist_empty(&q->queue));
107 sem_free(q->queue_sem);
108 pthread_mutex_destroy(&q->queue_mutex);
112 raw_queue_put(struct raw_queue *q, struct work *w)
114 pthread_mutex_lock(&q->queue_mutex);
115 clist_add_tail(&q->queue, &w->n);
116 pthread_mutex_unlock(&q->queue_mutex);
117 sem_post(q->queue_sem);
121 raw_queue_get(struct raw_queue *q)
123 sem_wait(q->queue_sem);
124 pthread_mutex_lock(&q->queue_mutex);
125 struct work *w = clist_head(&q->queue);
128 pthread_mutex_unlock(&q->queue_mutex);
133 work_queue_init(struct worker_pool *p, struct work_queue *q)
137 raw_queue_init(&q->finished);
141 work_queue_cleanup(struct work_queue *q)
143 ASSERT(!q->nr_running);
144 raw_queue_cleanup(&q->finished);
148 work_submit(struct work_queue *q, struct work *w)
152 raw_queue_put(&q->pool->requests, w);
157 work_wait(struct work_queue *q)
159 while (q->nr_running)
161 struct work *w = raw_queue_get(&q->finished);
175 static void wt_init(struct worker_thread *t)
177 log(L_INFO, "INIT %d", t->id);
180 static void wt_cleanup(struct worker_thread *t)
182 log(L_INFO, "CLEANUP %d", t->id);
190 static void go(struct worker_thread *t, struct work *w)
192 log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
198 struct worker_pool pool = {
201 .init_thread = wt_init,
202 .cleanup_thread = wt_cleanup
204 worker_pool_init(&pool);
207 work_queue_init(&pool, &q);
208 for (uns i=0; i<500; i++)
210 struct w *w = xmalloc_zero(sizeof(*w));
213 work_submit(&q, &w->w);
217 while (w = (struct w *) work_wait(&q))
218 log(L_INFO, "Finished request %d", w->id);
220 work_queue_cleanup(&q);
221 worker_pool_cleanup(&pool);