2 * UCW Library -- Thread Pools and Work Queues
4 * (c) 2006 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
11 #include "lib/threads.h"
12 #include "lib/workqueue.h"
15 worker_thread_init(void *arg)
17 struct worker_thread *t = arg;
18 struct worker_pool *pool = t->pool;
20 if (pool->init_thread)
22 sem_post(pool->init_cleanup_sem);
26 struct work *w = raw_queue_get(&pool->requests);
28 raw_queue_put(&w->reply_to->finished, w);
35 worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
37 if (t->pool->cleanup_thread)
38 t->pool->cleanup_thread(t);
39 sem_post(t->pool->init_cleanup_sem);
44 worker_pool_init(struct worker_pool *p)
46 clist_init(&p->worker_threads);
47 raw_queue_init(&p->requests);
48 p->init_cleanup_sem = sem_alloc();
51 if (pthread_attr_init(&attr) < 0 ||
52 pthread_attr_setstacksize(&attr, p->stack_size ? : default_thread_stack_size) < 0)
55 for (uns i=0; i < p->num_threads; i++)
57 struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
60 int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
62 die("Unable to create thread: %m");
63 clist_add_tail(&p->worker_threads, &t->n);
64 sem_wait(p->init_cleanup_sem);
67 pthread_attr_destroy(&attr);
71 worker_pool_cleanup(struct worker_pool *p)
73 for (uns i=0; i < p->num_threads; i++)
76 .go = worker_thread_signal_finish
78 raw_queue_put(&p->requests, &w);
79 sem_wait(p->init_cleanup_sem);
82 struct worker_thread *tmp;
83 CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
85 int err = pthread_join(t->thread, NULL);
92 raw_queue_cleanup(&p->requests);
93 sem_free(p->init_cleanup_sem);
97 raw_queue_init(struct raw_queue *q)
99 pthread_mutex_init(&q->queue_mutex, NULL);
100 clist_init(&q->queue);
101 q->queue_sem = sem_alloc();
105 raw_queue_cleanup(struct raw_queue *q)
107 ASSERT(clist_empty(&q->queue));
108 sem_free(q->queue_sem);
109 pthread_mutex_destroy(&q->queue_mutex);
113 raw_queue_put(struct raw_queue *q, struct work *w)
115 pthread_mutex_lock(&q->queue_mutex);
116 clist_add_tail(&q->queue, &w->n);
117 pthread_mutex_unlock(&q->queue_mutex);
118 sem_post(q->queue_sem);
121 static inline struct work *
122 raw_queue_do_get(struct raw_queue *q)
124 pthread_mutex_lock(&q->queue_mutex);
125 struct work *w = clist_head(&q->queue);
128 pthread_mutex_unlock(&q->queue_mutex);
133 raw_queue_get(struct raw_queue *q)
135 sem_wait(q->queue_sem);
136 return raw_queue_do_get(q);
140 raw_queue_try_get(struct raw_queue *q)
142 if (!sem_trywait(q->queue_sem))
143 return raw_queue_do_get(q);
149 work_queue_init(struct worker_pool *p, struct work_queue *q)
153 raw_queue_init(&q->finished);
157 work_queue_cleanup(struct work_queue *q)
159 ASSERT(!q->nr_running);
160 raw_queue_cleanup(&q->finished);
164 work_submit(struct work_queue *q, struct work *w)
168 raw_queue_put(&q->pool->requests, w);
173 work_do_wait(struct work_queue *q, int try)
175 while (q->nr_running)
177 struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
190 work_wait(struct work_queue *q)
192 return work_do_wait(q, 0);
196 work_try_wait(struct work_queue *q)
198 return work_do_wait(q, 1);
205 static void wt_init(struct worker_thread *t)
207 log(L_INFO, "INIT %d", t->id);
210 static void wt_cleanup(struct worker_thread *t)
212 log(L_INFO, "CLEANUP %d", t->id);
220 static void go(struct worker_thread *t, struct work *w)
222 log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
228 struct worker_pool pool = {
231 .init_thread = wt_init,
232 .cleanup_thread = wt_cleanup
234 worker_pool_init(&pool);
237 work_queue_init(&pool, &q);
238 for (uns i=0; i<500; i++)
240 struct w *w = xmalloc_zero(sizeof(*w));
243 work_submit(&q, &w->w);
247 while (w = (struct w *) work_wait(&q))
248 log(L_INFO, "Finished request %d", w->id);
250 work_queue_cleanup(&q);
251 worker_pool_cleanup(&pool);