]> mj.ucw.cz Git - libucw.git/blob - lib/workqueue.c
workqueue: Added non-blocking wait functions.
[libucw.git] / lib / workqueue.c
1 /*
2  *      UCW Library -- Thread Pools and Work Queues
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/workqueue.h"
12
13 static void *
14 worker_thread_init(void *arg)
15 {
16   struct worker_thread *t = arg;
17   struct worker_pool *pool = t->pool;
18
19   if (pool->init_thread)
20     pool->init_thread(t);
21   sem_post(pool->init_cleanup_sem);
22
23   for (;;)
24     {
25       struct work *w = raw_queue_get(&pool->requests);
26       w->go(t, w);
27       raw_queue_put(&w->reply_to->finished, w);
28     }
29
30   return NULL;
31 }
32
33 static void
34 worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
35 {
36   if (t->pool->cleanup_thread)
37     t->pool->cleanup_thread(t);
38   sem_post(t->pool->init_cleanup_sem);
39   pthread_exit(NULL);
40 }
41
42 void
43 worker_pool_init(struct worker_pool *p)
44 {
45   clist_init(&p->worker_threads);
46   raw_queue_init(&p->requests);
47   p->init_cleanup_sem = sem_alloc();
48
49   pthread_attr_t attr;
50   if (pthread_attr_init(&attr) < 0 ||
51       pthread_attr_setstacksize(&attr, p->stack_size) < 0)
52     ASSERT(0);
53
54   for (uns i=0; i < p->num_threads; i++)
55     {
56       struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
57       t->pool = p;
58       t->id = i;
59       int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
60       if (err)
61         die("Unable to create thread: %m");
62       clist_add_tail(&p->worker_threads, &t->n);
63       sem_wait(p->init_cleanup_sem);
64     }
65
66   pthread_attr_destroy(&attr);
67 }
68
69 void
70 worker_pool_cleanup(struct worker_pool *p)
71 {
72   for (uns i=0; i < p->num_threads; i++)
73     {
74       struct work w = {
75         .go = worker_thread_signal_finish
76       };
77       raw_queue_put(&p->requests, &w);
78       sem_wait(p->init_cleanup_sem);
79     }
80
81   struct worker_thread *tmp;
82   CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
83     {
84       int err = pthread_join(t->thread, NULL);
85       ASSERT(!err);
86       if (p->free_thread)
87         p->free_thread(t);
88       else
89         xfree(t);
90     }
91   raw_queue_cleanup(&p->requests);
92   sem_free(p->init_cleanup_sem);
93 }
94
95 void
96 raw_queue_init(struct raw_queue *q)
97 {
98   pthread_mutex_init(&q->queue_mutex, NULL);
99   clist_init(&q->queue);
100   q->queue_sem = sem_alloc();
101 }
102
103 void
104 raw_queue_cleanup(struct raw_queue *q)
105 {
106   ASSERT(clist_empty(&q->queue));
107   sem_free(q->queue_sem);
108   pthread_mutex_destroy(&q->queue_mutex);
109 }
110
111 void
112 raw_queue_put(struct raw_queue *q, struct work *w)
113 {
114   pthread_mutex_lock(&q->queue_mutex);
115   clist_add_tail(&q->queue, &w->n);
116   pthread_mutex_unlock(&q->queue_mutex);
117   sem_post(q->queue_sem);
118 }
119
120 static inline struct work *
121 raw_queue_do_get(struct raw_queue *q)
122 {
123   pthread_mutex_lock(&q->queue_mutex);
124   struct work *w = clist_head(&q->queue);
125   ASSERT(w);
126   clist_remove(&w->n);
127   pthread_mutex_unlock(&q->queue_mutex);
128   return w;
129 }
130
131 struct work *
132 raw_queue_get(struct raw_queue *q)
133 {
134   sem_wait(q->queue_sem);
135   return raw_queue_do_get(q);
136 }
137
138 struct work *
139 raw_queue_try_get(struct raw_queue *q)
140 {
141   if (!sem_trywait(q->queue_sem))
142     return raw_queue_do_get(q);
143   else
144     return NULL;
145 }
146
147 void
148 work_queue_init(struct worker_pool *p, struct work_queue *q)
149 {
150   q->pool = p;
151   q->nr_running = 0;
152   raw_queue_init(&q->finished);
153 }
154
155 void
156 work_queue_cleanup(struct work_queue *q)
157 {
158   ASSERT(!q->nr_running);
159   raw_queue_cleanup(&q->finished);
160 }
161
162 void
163 work_submit(struct work_queue *q, struct work *w)
164 {
165   ASSERT(w->go);
166   w->reply_to = q;
167   raw_queue_put(&q->pool->requests, w);
168   q->nr_running++;
169 }
170
171 static struct work *
172 work_do_wait(struct work_queue *q, int try)
173 {
174   while (q->nr_running)
175     {
176       struct work *w = (try ? raw_queue_try_get : raw_queue_get)(&q->finished);
177       if (!w)
178         return NULL;
179       q->nr_running--;
180       if (w->returned)
181         w->returned(q, w);
182       else
183         return w;
184     }
185   return NULL;
186 }
187
188 struct work *
189 work_wait(struct work_queue *q)
190 {
191   return work_do_wait(q, 0);
192 }
193
194 struct work *
195 work_try_wait(struct work_queue *q)
196 {
197   return work_do_wait(q, 1);
198 }
199
200 #ifdef TEST
201
202 #include <unistd.h>
203
204 static void wt_init(struct worker_thread *t)
205 {
206   log(L_INFO, "INIT %d", t->id);
207 }
208
209 static void wt_cleanup(struct worker_thread *t)
210 {
211   log(L_INFO, "CLEANUP %d", t->id);
212 }
213
214 struct w {
215   struct work w;
216   uns id;
217 };
218
219 static void go(struct worker_thread *t, struct work *w)
220 {
221   log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
222   usleep(1);
223 }
224
225 int main(void)
226 {
227   struct worker_pool pool = {
228     .num_threads = 10,
229     .stack_size = 65536,
230     .init_thread = wt_init,
231     .cleanup_thread = wt_cleanup
232   };
233   worker_pool_init(&pool);
234
235   struct work_queue q;
236   work_queue_init(&pool, &q);
237   for (uns i=0; i<500; i++)
238     {
239       struct w *w = xmalloc_zero(sizeof(*w));
240       w->w.go = go;
241       w->id = i;
242       work_submit(&q, &w->w);
243     }
244
245   struct w *w;
246   while (w = (struct w *) work_wait(&q))
247     log(L_INFO, "Finished request %d", w->id);
248
249   work_queue_cleanup(&q);
250   worker_pool_cleanup(&pool);
251   return 0;
252 }
253
254 #endif