]> mj.ucw.cz Git - libucw.git/blob - lib/workqueue.c
Removed an old FIXME.
[libucw.git] / lib / workqueue.c
1 /*
2  *      UCW Library -- Thread Pools and Work Queues
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/threads.h"
12 #include "lib/workqueue.h"
13
14 static void *
15 worker_thread_init(void *arg)
16 {
17   struct worker_thread *t = arg;
18   struct worker_pool *pool = t->pool;
19
20   if (pool->init_thread)
21     pool->init_thread(t);
22   sem_post(pool->init_cleanup_sem);
23
24   for (;;)
25     {
26       struct work *w = raw_queue_get(&pool->requests);
27       w->go(t, w);
28       raw_queue_put(&w->reply_to->finished, w);
29     }
30
31   return NULL;
32 }
33
34 static void
35 worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
36 {
37   if (t->pool->cleanup_thread)
38     t->pool->cleanup_thread(t);
39   sem_post(t->pool->init_cleanup_sem);
40   pthread_exit(NULL);
41 }
42
43 void
44 worker_pool_init(struct worker_pool *p)
45 {
46   clist_init(&p->worker_threads);
47   raw_queue_init(&p->requests);
48   p->init_cleanup_sem = sem_alloc();
49
50   pthread_attr_t attr;
51   if (pthread_attr_init(&attr) < 0 ||
52       pthread_attr_setstacksize(&attr, p->stack_size ? : default_thread_stack_size) < 0)
53     ASSERT(0);
54
55   for (uns i=0; i < p->num_threads; i++)
56     {
57       struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
58       t->pool = p;
59       t->id = i;
60       int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
61       if (err)
62         die("Unable to create thread: %m");
63       clist_add_tail(&p->worker_threads, &t->n);
64       sem_wait(p->init_cleanup_sem);
65     }
66
67   pthread_attr_destroy(&attr);
68 }
69
70 void
71 worker_pool_cleanup(struct worker_pool *p)
72 {
73   for (uns i=0; i < p->num_threads; i++)
74     {
75       struct work w = {
76         .go = worker_thread_signal_finish
77       };
78       raw_queue_put(&p->requests, &w);
79       sem_wait(p->init_cleanup_sem);
80     }
81
82   struct worker_thread *tmp;
83   CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
84     {
85       int err = pthread_join(t->thread, NULL);
86       ASSERT(!err);
87       if (p->free_thread)
88         p->free_thread(t);
89       else
90         xfree(t);
91     }
92   raw_queue_cleanup(&p->requests);
93   sem_free(p->init_cleanup_sem);
94 }
95
96 void
97 raw_queue_init(struct raw_queue *q)
98 {
99   pthread_mutex_init(&q->queue_mutex, NULL);
100   clist_init(&q->queue);
101   q->queue_sem = sem_alloc();
102 }
103
104 void
105 raw_queue_cleanup(struct raw_queue *q)
106 {
107   ASSERT(clist_empty(&q->queue));
108   sem_free(q->queue_sem);
109   pthread_mutex_destroy(&q->queue_mutex);
110 }
111
112 void
113 raw_queue_put(struct raw_queue *q, struct work *w)
114 {
115   pthread_mutex_lock(&q->queue_mutex);
116   clist_add_tail(&q->queue, &w->n);
117   pthread_mutex_unlock(&q->queue_mutex);
118   sem_post(q->queue_sem);
119 }
120
121 static inline struct work *
122 raw_queue_do_get(struct raw_queue *q)
123 {
124   pthread_mutex_lock(&q->queue_mutex);
125   struct work *w = clist_head(&q->queue);
126   ASSERT(w);
127   clist_remove(&w->n);
128   pthread_mutex_unlock(&q->queue_mutex);
129   return w;
130 }
131
132 struct work *
133 raw_queue_get(struct raw_queue *q)
134 {
135   sem_wait(q->queue_sem);
136   return raw_queue_do_get(q);
137 }
138
139 struct work *
140 raw_queue_try_get(struct raw_queue *q)
141 {
142   if (!sem_trywait(q->queue_sem))
143     return raw_queue_do_get(q);
144   else
145     return NULL;
146 }
147
148 void
149 work_queue_init(struct worker_pool *p, struct work_queue *q)
150 {
151   q->pool = p;
152   q->nr_running = 0;
153   raw_queue_init(&q->finished);
154 }
155
156 void
157 work_queue_cleanup(struct work_queue *q)
158 {
159   ASSERT(!q->nr_running);
160   raw_queue_cleanup(&q->finished);
161 }
162
163 void
164 work_submit(struct work_queue *q, struct work *w)
165 {
166   ASSERT(w->go);
167   w->reply_to = q;
168   raw_queue_put(&q->pool->requests, w);
169   q->nr_running++;
170 }
171
172 static struct work *
173 work_do_wait(struct work_queue *q, int try)
174 {
175   while (q->nr_running)
176     {
177       struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
178       if (!w)
179         return NULL;
180       q->nr_running--;
181       if (w->returned)
182         w->returned(q, w);
183       else
184         return w;
185     }
186   return NULL;
187 }
188
189 struct work *
190 work_wait(struct work_queue *q)
191 {
192   return work_do_wait(q, 0);
193 }
194
195 struct work *
196 work_try_wait(struct work_queue *q)
197 {
198   return work_do_wait(q, 1);
199 }
200
201 #ifdef TEST
202
203 #include <unistd.h>
204
205 static void wt_init(struct worker_thread *t)
206 {
207   log(L_INFO, "INIT %d", t->id);
208 }
209
210 static void wt_cleanup(struct worker_thread *t)
211 {
212   log(L_INFO, "CLEANUP %d", t->id);
213 }
214
215 struct w {
216   struct work w;
217   uns id;
218 };
219
220 static void go(struct worker_thread *t, struct work *w)
221 {
222   log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
223   usleep(1);
224 }
225
226 int main(void)
227 {
228   struct worker_pool pool = {
229     .num_threads = 10,
230     .stack_size = 65536,
231     .init_thread = wt_init,
232     .cleanup_thread = wt_cleanup
233   };
234   worker_pool_init(&pool);
235
236   struct work_queue q;
237   work_queue_init(&pool, &q);
238   for (uns i=0; i<500; i++)
239     {
240       struct w *w = xmalloc_zero(sizeof(*w));
241       w->w.go = go;
242       w->id = i;
243       work_submit(&q, &w->w);
244     }
245
246   struct w *w;
247   while (w = (struct w *) work_wait(&q))
248     log(L_INFO, "Finished request %d", w->id);
249
250   work_queue_cleanup(&q);
251   worker_pool_cleanup(&pool);
252   return 0;
253 }
254
255 #endif