]> mj.ucw.cz Git - moe.git/blob - ucw/workqueue.c
MO-P: mo-cms-users knows how to tie users to machines
[moe.git] / ucw / workqueue.c
1 /*
2  *      UCW Library -- Thread Pools and Work Queues
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "ucw/lib.h"
11 #include "ucw/threads.h"
12 #include "ucw/workqueue.h"
13 #include "ucw/heap.h"
14
15 static void *
16 worker_thread_init(void *arg)
17 {
18   struct worker_thread *t = arg;
19   struct worker_pool *pool = t->pool;
20
21   if (pool->init_thread)
22     pool->init_thread(t);
23   sem_post(pool->init_cleanup_sem);
24
25   for (;;)
26     {
27       struct work *w = raw_queue_get(&pool->requests);
28       w->go(t, w);
29       raw_queue_put(&w->reply_to->finished, w);
30     }
31
32   return NULL;
33 }
34
35 static void
36 worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
37 {
38   if (t->pool->cleanup_thread)
39     t->pool->cleanup_thread(t);
40   sem_post(t->pool->init_cleanup_sem);
41   pthread_exit(NULL);
42 }
43
44 void
45 worker_pool_init(struct worker_pool *p)
46 {
47   clist_init(&p->worker_threads);
48   raw_queue_init(&p->requests);
49   p->init_cleanup_sem = sem_alloc();
50
51   pthread_attr_t attr;
52   if (pthread_attr_init(&attr) < 0 ||
53       pthread_attr_setstacksize(&attr, p->stack_size ? : ucwlib_thread_stack_size) < 0)
54     ASSERT(0);
55
56   for (uns i=0; i < p->num_threads; i++)
57     {
58       struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
59       t->pool = p;
60       t->id = i;
61       int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
62       if (err)
63         die("Unable to create thread: %m");
64       clist_add_tail(&p->worker_threads, &t->n);
65       sem_wait(p->init_cleanup_sem);
66     }
67
68   pthread_attr_destroy(&attr);
69 }
70
71 void
72 worker_pool_cleanup(struct worker_pool *p)
73 {
74   for (uns i=0; i < p->num_threads; i++)
75     {
76       struct work w = {
77         .go = worker_thread_signal_finish
78       };
79       raw_queue_put(&p->requests, &w);
80       sem_wait(p->init_cleanup_sem);
81     }
82
83   struct worker_thread *tmp;
84   CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
85     {
86       int err = pthread_join(t->thread, NULL);
87       ASSERT(!err);
88       if (p->free_thread)
89         p->free_thread(t);
90       else
91         xfree(t);
92     }
93   raw_queue_cleanup(&p->requests);
94   sem_free(p->init_cleanup_sem);
95 }
96
97 void
98 raw_queue_init(struct raw_queue *q)
99 {
100   pthread_mutex_init(&q->queue_mutex, NULL);
101   clist_init(&q->pri0_queue);
102   q->queue_sem = sem_alloc();
103   q->pri_heap = NULL;
104   q->heap_cnt = q->heap_max = 0;
105 }
106
107 void
108 raw_queue_cleanup(struct raw_queue *q)
109 {
110   ASSERT(clist_empty(&q->pri0_queue));
111   ASSERT(!q->heap_cnt);
112   xfree(q->pri_heap);
113   sem_free(q->queue_sem);
114   pthread_mutex_destroy(&q->queue_mutex);
115 }
116
117 #define PRI_LESS(x,y) ((x)->priority > (y)->priority)
118
119 void
120 raw_queue_put(struct raw_queue *q, struct work *w)
121 {
122   pthread_mutex_lock(&q->queue_mutex);
123   if (!w->priority)
124     clist_add_tail(&q->pri0_queue, &w->n);
125   else
126     {
127       if (unlikely(q->heap_cnt >= q->heap_max))
128         {
129           struct work **old_heap = q->pri_heap;
130           q->heap_max = (q->heap_max ? 2*q->heap_max : 16);
131           q->pri_heap = xrealloc(old_heap, (q->heap_max + 1) * sizeof(struct work *));
132         }
133       struct work **heap = q->pri_heap;
134       heap[++q->heap_cnt] = w;
135       HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
136     }
137   pthread_mutex_unlock(&q->queue_mutex);
138   sem_post(q->queue_sem);
139 }
140
141 static inline struct work *
142 raw_queue_do_get(struct raw_queue *q)
143 {
144   pthread_mutex_lock(&q->queue_mutex);
145   struct work *w;
146   if (!q->heap_cnt)
147     {
148       w = clist_head(&q->pri0_queue);
149       ASSERT(w);
150       clist_remove(&w->n);
151     }
152   else
153     {
154       struct work **heap = q->pri_heap;
155       w = heap[1];
156       HEAP_DELMIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
157     }
158   pthread_mutex_unlock(&q->queue_mutex);
159   return w;
160 }
161
162 struct work *
163 raw_queue_get(struct raw_queue *q)
164 {
165   sem_wait(q->queue_sem);
166   return raw_queue_do_get(q);
167 }
168
169 struct work *
170 raw_queue_try_get(struct raw_queue *q)
171 {
172   if (!sem_trywait(q->queue_sem))
173     return raw_queue_do_get(q);
174   else
175     return NULL;
176 }
177
178 void
179 work_queue_init(struct worker_pool *p, struct work_queue *q)
180 {
181   q->pool = p;
182   q->nr_running = 0;
183   raw_queue_init(&q->finished);
184 }
185
186 void
187 work_queue_cleanup(struct work_queue *q)
188 {
189   ASSERT(!q->nr_running);
190   raw_queue_cleanup(&q->finished);
191 }
192
193 void
194 work_submit(struct work_queue *q, struct work *w)
195 {
196   ASSERT(w->go);
197   w->reply_to = q;
198   raw_queue_put(&q->pool->requests, w);
199   q->nr_running++;
200 }
201
202 static struct work *
203 work_do_wait(struct work_queue *q, int try)
204 {
205   if (!q->nr_running)
206     return NULL;
207   struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
208   if (!w)
209     return NULL;
210   q->nr_running--;
211   return w;
212 }
213
214 struct work *
215 work_wait(struct work_queue *q)
216 {
217   return work_do_wait(q, 0);
218 }
219
220 struct work *
221 work_try_wait(struct work_queue *q)
222 {
223   return work_do_wait(q, 1);
224 }
225
226 #ifdef TEST
227
228 #include <unistd.h>
229
230 static void wt_init(struct worker_thread *t)
231 {
232   msg(L_INFO, "INIT %d", t->id);
233 }
234
235 static void wt_cleanup(struct worker_thread *t)
236 {
237   msg(L_INFO, "CLEANUP %d", t->id);
238 }
239
240 struct w {
241   struct work w;
242   uns id;
243 };
244
245 static void go(struct worker_thread *t, struct work *w)
246 {
247   msg(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
248   usleep(1);
249 }
250
251 int main(void)
252 {
253   struct worker_pool pool = {
254     .num_threads = 10,
255     .stack_size = 65536,
256     .init_thread = wt_init,
257     .cleanup_thread = wt_cleanup
258   };
259   worker_pool_init(&pool);
260
261   struct work_queue q;
262   work_queue_init(&pool, &q);
263   for (uns i=0; i<500; i++)
264     {
265       struct w *w = xmalloc_zero(sizeof(*w));
266       w->w.go = go;
267       w->w.priority = (i < 250 ? i : 0);
268       w->id = i;
269       work_submit(&q, &w->w);
270       msg(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
271     }
272
273   struct w *w;
274   while (w = (struct w *) work_wait(&q))
275     msg(L_INFO, "Finished request %d", w->id);
276
277   work_queue_cleanup(&q);
278   worker_pool_cleanup(&pool);
279   return 0;
280 }
281
282 #endif