2 * UCW Library -- Thread Pools and Work Queues
4 * (c) 2006 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
11 #include <ucw/threads.h>
12 #include <ucw/workqueue.h>
16 worker_thread_init(void *arg)
18 struct worker_thread *t = arg;
19 struct worker_pool *pool = t->pool;
21 if (pool->init_thread)
23 sem_post(pool->init_cleanup_sem);
27 struct work *w = raw_queue_get(&pool->requests);
29 raw_queue_put(&w->reply_to->finished, w);
36 worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
38 if (t->pool->cleanup_thread)
39 t->pool->cleanup_thread(t);
40 sem_post(t->pool->init_cleanup_sem);
45 worker_pool_init(struct worker_pool *p)
47 clist_init(&p->worker_threads);
48 raw_queue_init(&p->requests);
49 p->init_cleanup_sem = sem_alloc();
52 if (pthread_attr_init(&attr) < 0 ||
53 pthread_attr_setstacksize(&attr, p->stack_size ? : ucwlib_thread_stack_size) < 0)
56 for (uns i=0; i < p->num_threads; i++)
58 struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
61 int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
63 die("Unable to create thread: %m");
64 clist_add_tail(&p->worker_threads, &t->n);
65 sem_wait(p->init_cleanup_sem);
68 pthread_attr_destroy(&attr);
72 worker_pool_cleanup(struct worker_pool *p)
74 for (uns i=0; i < p->num_threads; i++)
77 .go = worker_thread_signal_finish
79 raw_queue_put(&p->requests, &w);
80 sem_wait(p->init_cleanup_sem);
83 struct worker_thread *tmp;
84 CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
86 int err = pthread_join(t->thread, NULL);
93 raw_queue_cleanup(&p->requests);
94 sem_free(p->init_cleanup_sem);
98 raw_queue_init(struct raw_queue *q)
100 pthread_mutex_init(&q->queue_mutex, NULL);
101 clist_init(&q->pri0_queue);
102 q->queue_sem = sem_alloc();
104 q->heap_cnt = q->heap_max = 0;
108 raw_queue_cleanup(struct raw_queue *q)
110 ASSERT(clist_empty(&q->pri0_queue));
111 ASSERT(!q->heap_cnt);
113 sem_free(q->queue_sem);
114 pthread_mutex_destroy(&q->queue_mutex);
117 #define PRI_LESS(x,y) ((x)->priority > (y)->priority)
120 raw_queue_put(struct raw_queue *q, struct work *w)
122 pthread_mutex_lock(&q->queue_mutex);
124 clist_add_tail(&q->pri0_queue, &w->n);
127 if (unlikely(q->heap_cnt >= q->heap_max))
129 struct work **old_heap = q->pri_heap;
130 q->heap_max = (q->heap_max ? 2*q->heap_max : 16);
131 q->pri_heap = xrealloc(old_heap, (q->heap_max + 1) * sizeof(struct work *));
133 struct work **heap = q->pri_heap;
134 HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP, w);
136 pthread_mutex_unlock(&q->queue_mutex);
137 sem_post(q->queue_sem);
140 static inline struct work *
141 raw_queue_do_get(struct raw_queue *q)
143 pthread_mutex_lock(&q->queue_mutex);
147 w = clist_head(&q->pri0_queue);
153 struct work **heap = q->pri_heap;
155 HEAP_DELETE_MIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
157 pthread_mutex_unlock(&q->queue_mutex);
162 raw_queue_get(struct raw_queue *q)
164 sem_wait(q->queue_sem);
165 return raw_queue_do_get(q);
169 raw_queue_try_get(struct raw_queue *q)
171 if (!sem_trywait(q->queue_sem))
172 return raw_queue_do_get(q);
178 work_queue_init(struct worker_pool *p, struct work_queue *q)
182 raw_queue_init(&q->finished);
186 work_queue_cleanup(struct work_queue *q)
188 ASSERT(!q->nr_running);
189 raw_queue_cleanup(&q->finished);
193 work_submit(struct work_queue *q, struct work *w)
197 raw_queue_put(&q->pool->requests, w);
202 work_do_wait(struct work_queue *q, int try)
206 struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
214 work_wait(struct work_queue *q)
216 return work_do_wait(q, 0);
220 work_try_wait(struct work_queue *q)
222 return work_do_wait(q, 1);
229 static void wt_init(struct worker_thread *t)
231 msg(L_INFO, "INIT %d", t->id);
234 static void wt_cleanup(struct worker_thread *t)
236 msg(L_INFO, "CLEANUP %d", t->id);
244 static void go(struct worker_thread *t, struct work *w)
246 msg(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
252 struct worker_pool pool = {
255 .init_thread = wt_init,
256 .cleanup_thread = wt_cleanup
258 worker_pool_init(&pool);
261 work_queue_init(&pool, &q);
262 for (uns i=0; i<500; i++)
264 struct w *w = xmalloc_zero(sizeof(*w));
266 w->w.priority = (i < 250 ? i : 0);
268 work_submit(&q, &w->w);
269 msg(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
273 while (w = (struct w *) work_wait(&q))
274 msg(L_INFO, "Finished request %d", w->id);
276 work_queue_cleanup(&q);
277 worker_pool_cleanup(&pool);