]> mj.ucw.cz Git - libucw.git/blob - ucw/workqueue.c
mainloop.h: Typo fix.
[libucw.git] / ucw / workqueue.c
1 /*
2  *      UCW Library -- Thread Pools and Work Queues
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include <ucw/lib.h>
11 #include <ucw/threads.h>
12 #include <ucw/workqueue.h>
13 #include <ucw/heap.h>
14
15 static void *
16 worker_thread_init(void *arg)
17 {
18   struct worker_thread *t = arg;
19   struct worker_pool *pool = t->pool;
20
21   if (pool->init_thread)
22     pool->init_thread(t);
23   sem_post(pool->init_cleanup_sem);
24
25   for (;;)
26     {
27       struct work *w = raw_queue_get(&pool->requests);
28       w->go(t, w);
29       raw_queue_put(&w->reply_to->finished, w);
30     }
31
32   return NULL;
33 }
34
35 static void
36 worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
37 {
38   if (t->pool->cleanup_thread)
39     t->pool->cleanup_thread(t);
40   sem_post(t->pool->init_cleanup_sem);
41   pthread_exit(NULL);
42 }
43
44 void
45 worker_pool_init(struct worker_pool *p)
46 {
47   clist_init(&p->worker_threads);
48   raw_queue_init(&p->requests);
49   p->init_cleanup_sem = sem_alloc();
50
51   pthread_attr_t attr;
52   if (pthread_attr_init(&attr) < 0 ||
53       pthread_attr_setstacksize(&attr, p->stack_size ? : ucwlib_thread_stack_size) < 0)
54     ASSERT(0);
55
56   for (uns i=0; i < p->num_threads; i++)
57     {
58       struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
59       t->pool = p;
60       t->id = i;
61       int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
62       if (err)
63         die("Unable to create thread: %m");
64       clist_add_tail(&p->worker_threads, &t->n);
65       sem_wait(p->init_cleanup_sem);
66     }
67
68   pthread_attr_destroy(&attr);
69 }
70
71 void
72 worker_pool_cleanup(struct worker_pool *p)
73 {
74   for (uns i=0; i < p->num_threads; i++)
75     {
76       struct work w = {
77         .go = worker_thread_signal_finish
78       };
79       raw_queue_put(&p->requests, &w);
80       sem_wait(p->init_cleanup_sem);
81     }
82
83   struct worker_thread *tmp;
84   CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
85     {
86       int err = pthread_join(t->thread, NULL);
87       ASSERT(!err);
88       if (p->free_thread)
89         p->free_thread(t);
90       else
91         xfree(t);
92     }
93   raw_queue_cleanup(&p->requests);
94   sem_free(p->init_cleanup_sem);
95 }
96
97 void
98 raw_queue_init(struct raw_queue *q)
99 {
100   pthread_mutex_init(&q->queue_mutex, NULL);
101   clist_init(&q->pri0_queue);
102   q->queue_sem = sem_alloc();
103   q->pri_heap = NULL;
104   q->heap_cnt = q->heap_max = 0;
105 }
106
107 void
108 raw_queue_cleanup(struct raw_queue *q)
109 {
110   ASSERT(clist_empty(&q->pri0_queue));
111   ASSERT(!q->heap_cnt);
112   xfree(q->pri_heap);
113   sem_free(q->queue_sem);
114   pthread_mutex_destroy(&q->queue_mutex);
115 }
116
117 #define PRI_LESS(x,y) ((x)->priority > (y)->priority)
118
119 void
120 raw_queue_put(struct raw_queue *q, struct work *w)
121 {
122   pthread_mutex_lock(&q->queue_mutex);
123   if (!w->priority)
124     clist_add_tail(&q->pri0_queue, &w->n);
125   else
126     {
127       if (unlikely(q->heap_cnt >= q->heap_max))
128         {
129           struct work **old_heap = q->pri_heap;
130           q->heap_max = (q->heap_max ? 2*q->heap_max : 16);
131           q->pri_heap = xrealloc(old_heap, (q->heap_max + 1) * sizeof(struct work *));
132         }
133       struct work **heap = q->pri_heap;
134       HEAP_INSERT(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP, w);
135     }
136   pthread_mutex_unlock(&q->queue_mutex);
137   sem_post(q->queue_sem);
138 }
139
140 static inline struct work *
141 raw_queue_do_get(struct raw_queue *q)
142 {
143   pthread_mutex_lock(&q->queue_mutex);
144   struct work *w;
145   if (!q->heap_cnt)
146     {
147       w = clist_head(&q->pri0_queue);
148       ASSERT(w);
149       clist_remove(&w->n);
150     }
151   else
152     {
153       struct work **heap = q->pri_heap;
154       w = heap[1];
155       HEAP_DELETE_MIN(struct work *, heap, q->heap_cnt, PRI_LESS, HEAP_SWAP);
156     }
157   pthread_mutex_unlock(&q->queue_mutex);
158   return w;
159 }
160
161 struct work *
162 raw_queue_get(struct raw_queue *q)
163 {
164   sem_wait(q->queue_sem);
165   return raw_queue_do_get(q);
166 }
167
168 struct work *
169 raw_queue_try_get(struct raw_queue *q)
170 {
171   if (!sem_trywait(q->queue_sem))
172     return raw_queue_do_get(q);
173   else
174     return NULL;
175 }
176
177 void
178 work_queue_init(struct worker_pool *p, struct work_queue *q)
179 {
180   q->pool = p;
181   q->nr_running = 0;
182   raw_queue_init(&q->finished);
183 }
184
185 void
186 work_queue_cleanup(struct work_queue *q)
187 {
188   ASSERT(!q->nr_running);
189   raw_queue_cleanup(&q->finished);
190 }
191
192 void
193 work_submit(struct work_queue *q, struct work *w)
194 {
195   ASSERT(w->go);
196   w->reply_to = q;
197   raw_queue_put(&q->pool->requests, w);
198   q->nr_running++;
199 }
200
201 static struct work *
202 work_do_wait(struct work_queue *q, int try)
203 {
204   if (!q->nr_running)
205     return NULL;
206   struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
207   if (!w)
208     return NULL;
209   q->nr_running--;
210   return w;
211 }
212
213 struct work *
214 work_wait(struct work_queue *q)
215 {
216   return work_do_wait(q, 0);
217 }
218
219 struct work *
220 work_try_wait(struct work_queue *q)
221 {
222   return work_do_wait(q, 1);
223 }
224
225 #ifdef TEST
226
227 #include <unistd.h>
228
229 static void wt_init(struct worker_thread *t)
230 {
231   msg(L_INFO, "INIT %d", t->id);
232 }
233
234 static void wt_cleanup(struct worker_thread *t)
235 {
236   msg(L_INFO, "CLEANUP %d", t->id);
237 }
238
239 struct w {
240   struct work w;
241   uns id;
242 };
243
244 static void go(struct worker_thread *t, struct work *w)
245 {
246   msg(L_INFO, "GO %d: request %d (pri %d)", t->id, ((struct w *)w)->id, w->priority);
247   usleep(1);
248 }
249
250 int main(void)
251 {
252   struct worker_pool pool = {
253     .num_threads = 10,
254     .stack_size = 65536,
255     .init_thread = wt_init,
256     .cleanup_thread = wt_cleanup
257   };
258   worker_pool_init(&pool);
259
260   struct work_queue q;
261   work_queue_init(&pool, &q);
262   for (uns i=0; i<500; i++)
263     {
264       struct w *w = xmalloc_zero(sizeof(*w));
265       w->w.go = go;
266       w->w.priority = (i < 250 ? i : 0);
267       w->id = i;
268       work_submit(&q, &w->w);
269       msg(L_INFO, "Submitted request %d (pri %d)", w->id, w->w.priority);
270     }
271
272   struct w *w;
273   while (w = (struct w *) work_wait(&q))
274     msg(L_INFO, "Finished request %d", w->id);
275
276   work_queue_cleanup(&q);
277   worker_pool_cleanup(&pool);
278   return 0;
279 }
280
281 #endif