2 * UCW Library -- Thread Pools and Work Queues
4 * (c) 2006 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
11 #include "lib/threads.h"
12 #include "lib/workqueue.h"
16 worker_thread_init(void *arg)
18 struct worker_thread *t = arg;
19 struct worker_pool *pool = t->pool;
21 if (pool->init_thread)
23 sem_post(pool->init_cleanup_sem);
27 struct work *w = raw_queue_get(&pool->requests);
29 raw_queue_put(&w->reply_to->finished, w);
36 worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
38 if (t->pool->cleanup_thread)
39 t->pool->cleanup_thread(t);
40 sem_post(t->pool->init_cleanup_sem);
45 worker_pool_init(struct worker_pool *p)
47 clist_init(&p->worker_threads);
48 raw_queue_init(&p->requests);
49 p->init_cleanup_sem = sem_alloc();
52 if (pthread_attr_init(&attr) < 0 ||
53 pthread_attr_setstacksize(&attr, p->stack_size ? : default_thread_stack_size) < 0)
56 for (uns i=0; i < p->num_threads; i++)
58 struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
61 int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
63 die("Unable to create thread: %m");
64 clist_add_tail(&p->worker_threads, &t->n);
65 sem_wait(p->init_cleanup_sem);
68 pthread_attr_destroy(&attr);
72 worker_pool_cleanup(struct worker_pool *p)
74 for (uns i=0; i < p->num_threads; i++)
77 .go = worker_thread_signal_finish
79 raw_queue_put(&p->requests, &w);
80 sem_wait(p->init_cleanup_sem);
83 struct worker_thread *tmp;
84 CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
86 int err = pthread_join(t->thread, NULL);
93 raw_queue_cleanup(&p->requests);
94 sem_free(p->init_cleanup_sem);
98 raw_queue_init(struct raw_queue *q)
100 pthread_mutex_init(&q->queue_mutex, NULL);
101 clist_init(&q->pri0_queue);
102 q->queue_sem = sem_alloc();
104 q->heap_cnt = q->heap_max = 0;
108 raw_queue_cleanup(struct raw_queue *q)
110 ASSERT(clist_empty(&q->pri0_queue));
111 ASSERT(!q->heap_cnt);
113 sem_free(q->queue_sem);
114 pthread_mutex_destroy(&q->queue_mutex);
117 #define PRI_LESS(x,y) ((x)->priority > (y)->priority)
120 raw_queue_put(struct raw_queue *q, struct work *w)
122 pthread_mutex_lock(&q->queue_mutex);
124 clist_add_tail(&q->pri0_queue, &w->n);
127 if (unlikely(q->heap_cnt >= q->heap_max))
129 struct work **old_heap = q->pri_heap;
130 q->heap_max = (q->heap_max ? 2*q->heap_max : 16);
131 q->pri_heap = xrealloc(old_heap, q->heap_max * sizeof(struct work *));
133 struct work **heap = q->pri_heap;
134 heap[++q->heap_cnt] = w;
135 HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
137 pthread_mutex_unlock(&q->queue_mutex);
138 sem_post(q->queue_sem);
141 static inline struct work *
142 raw_queue_do_get(struct raw_queue *q)
144 pthread_mutex_lock(&q->queue_mutex);
148 w = clist_head(&q->pri0_queue);
154 struct work **heap = q->pri_heap;
156 HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
158 pthread_mutex_unlock(&q->queue_mutex);
163 raw_queue_get(struct raw_queue *q)
165 sem_wait(q->queue_sem);
166 return raw_queue_do_get(q);
170 raw_queue_try_get(struct raw_queue *q)
172 if (!sem_trywait(q->queue_sem))
173 return raw_queue_do_get(q);
179 work_queue_init(struct worker_pool *p, struct work_queue *q)
183 raw_queue_init(&q->finished);
187 work_queue_cleanup(struct work_queue *q)
189 ASSERT(!q->nr_running);
190 raw_queue_cleanup(&q->finished);
194 work_submit(struct work_queue *q, struct work *w)
198 raw_queue_put(&q->pool->requests, w);
203 work_do_wait(struct work_queue *q, int try)
207 struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
215 work_wait(struct work_queue *q)
217 return work_do_wait(q, 0);
221 work_try_wait(struct work_queue *q)
223 return work_do_wait(q, 1);
230 static void wt_init(struct worker_thread *t)
232 log(L_INFO, "INIT %d", t->id);
235 static void wt_cleanup(struct worker_thread *t)
237 log(L_INFO, "CLEANUP %d", t->id);
245 static void go(struct worker_thread *t, struct work *w)
247 log(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
253 struct worker_pool pool = {
256 .init_thread = wt_init,
257 .cleanup_thread = wt_cleanup
259 worker_pool_init(&pool);
262 work_queue_init(&pool, &q);
263 for (uns i=0; i<500; i++)
265 struct w *w = xmalloc_zero(sizeof(*w));
267 w->w.priority = (i < 250 ? i : 0);
269 work_submit(&q, &w->w);
270 log(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
274 while (w = (struct w *) work_wait(&q))
275 log(L_INFO, "Finished request %d", w->id);
277 work_queue_cleanup(&q);
278 worker_pool_cleanup(&pool);