2 * UCW Library -- Thread Pools and Work Queues
4 * (c) 2006 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
11 #include "lib/workqueue.h"
14 worker_thread_init(void *arg)
16 struct worker_thread *t = arg;
17 struct worker_pool *pool = t->pool;
19 if (pool->init_thread)
21 sem_post(pool->init_cleanup_sem);
25 struct work *w = raw_queue_get(&pool->requests);
27 raw_queue_put(&w->reply_to->finished, w);
34 worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
36 if (t->pool->cleanup_thread)
37 t->pool->cleanup_thread(t);
38 sem_post(t->pool->init_cleanup_sem);
43 worker_pool_init(struct worker_pool *p)
45 clist_init(&p->worker_threads);
46 raw_queue_init(&p->requests);
47 p->init_cleanup_sem = sem_alloc();
50 if (pthread_attr_init(&attr) < 0 ||
51 pthread_attr_setstacksize(&attr, p->stack_size) < 0)
54 for (uns i=0; i < p->num_threads; i++)
56 struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
59 int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
61 die("Unable to create thread: %m");
62 clist_add_tail(&p->worker_threads, &t->n);
63 sem_wait(p->init_cleanup_sem);
66 pthread_attr_destroy(&attr);
70 worker_pool_cleanup(struct worker_pool *p)
72 for (uns i=0; i < p->num_threads; i++)
75 .go = worker_thread_signal_finish
77 raw_queue_put(&p->requests, &w);
78 sem_wait(p->init_cleanup_sem);
81 struct worker_thread *tmp;
82 CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
84 int err = pthread_join(t->thread, NULL);
91 raw_queue_cleanup(&p->requests);
92 sem_free(p->init_cleanup_sem);
96 raw_queue_init(struct raw_queue *q)
98 pthread_mutex_init(&q->queue_mutex, NULL);
99 clist_init(&q->queue);
100 q->queue_sem = sem_alloc();
104 raw_queue_cleanup(struct raw_queue *q)
106 ASSERT(clist_empty(&q->queue));
107 sem_free(q->queue_sem);
108 pthread_mutex_destroy(&q->queue_mutex);
112 raw_queue_put(struct raw_queue *q, struct work *w)
114 pthread_mutex_lock(&q->queue_mutex);
115 clist_add_tail(&q->queue, &w->n);
116 pthread_mutex_unlock(&q->queue_mutex);
117 sem_post(q->queue_sem);
120 static inline struct work *
121 raw_queue_do_get(struct raw_queue *q)
123 pthread_mutex_lock(&q->queue_mutex);
124 struct work *w = clist_head(&q->queue);
127 pthread_mutex_unlock(&q->queue_mutex);
132 raw_queue_get(struct raw_queue *q)
134 sem_wait(q->queue_sem);
135 return raw_queue_do_get(q);
139 raw_queue_try_get(struct raw_queue *q)
141 if (!sem_trywait(q->queue_sem))
142 return raw_queue_do_get(q);
148 work_queue_init(struct worker_pool *p, struct work_queue *q)
152 raw_queue_init(&q->finished);
156 work_queue_cleanup(struct work_queue *q)
158 ASSERT(!q->nr_running);
159 raw_queue_cleanup(&q->finished);
163 work_submit(struct work_queue *q, struct work *w)
167 raw_queue_put(&q->pool->requests, w);
172 work_do_wait(struct work_queue *q, int try)
174 while (q->nr_running)
176 struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
189 work_wait(struct work_queue *q)
191 return work_do_wait(q, 0);
195 work_try_wait(struct work_queue *q)
197 return work_do_wait(q, 1);
204 static void wt_init(struct worker_thread *t)
206 log(L_INFO, "INIT %d", t->id);
209 static void wt_cleanup(struct worker_thread *t)
211 log(L_INFO, "CLEANUP %d", t->id);
219 static void go(struct worker_thread *t, struct work *w)
221 log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
227 struct worker_pool pool = {
230 .init_thread = wt_init,
231 .cleanup_thread = wt_cleanup
233 worker_pool_init(&pool);
236 work_queue_init(&pool, &q);
237 for (uns i=0; i<500; i++)
239 struct w *w = xmalloc_zero(sizeof(*w));
242 work_submit(&q, &w->w);
246 while (w = (struct w *) work_wait(&q))
247 log(L_INFO, "Finished request %d", w->id);
249 work_queue_cleanup(&q);
250 worker_pool_cleanup(&pool);