]> mj.ucw.cz Git - libucw.git/blob - lib/workqueue.c
Implemented an asynchronous I/O library module.
[libucw.git] / lib / workqueue.c
1 /*
2  *      UCW Library -- Thread Pools and Work Queues
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/workqueue.h"
12
13 static void *
14 worker_thread_init(void *arg)
15 {
16   struct worker_thread *t = arg;
17   struct worker_pool *pool = t->pool;
18
19   if (pool->init_thread)
20     pool->init_thread(t);
21   sem_post(pool->init_cleanup_sem);
22
23   for (;;)
24     {
25       struct work *w = raw_queue_get(&pool->requests);
26       w->go(t, w);
27       raw_queue_put(&w->reply_to->finished, w);
28     }
29
30   return NULL;
31 }
32
33 static void
34 worker_thread_signal_finish(struct worker_thread *t, struct work *w UNUSED)
35 {
36   if (t->pool->cleanup_thread)
37     t->pool->cleanup_thread(t);
38   sem_post(t->pool->init_cleanup_sem);
39   pthread_exit(NULL);
40 }
41
42 void
43 worker_pool_init(struct worker_pool *p)
44 {
45   clist_init(&p->worker_threads);
46   raw_queue_init(&p->requests);
47   p->init_cleanup_sem = sem_alloc();
48
49   pthread_attr_t attr;
50   if (pthread_attr_init(&attr) < 0 ||
51       pthread_attr_setstacksize(&attr, p->stack_size) < 0)
52     ASSERT(0);
53
54   for (uns i=0; i < p->num_threads; i++)
55     {
56       struct worker_thread *t = (p->new_thread ? p->new_thread() : xmalloc(sizeof(*t)));
57       t->pool = p;
58       t->id = i;
59       int err = pthread_create(&t->thread, &attr, worker_thread_init, t);
60       if (err)
61         die("Unable to create thread: %m");
62       clist_add_tail(&p->worker_threads, &t->n);
63       sem_wait(p->init_cleanup_sem);
64     }
65
66   pthread_attr_destroy(&attr);
67 }
68
69 void
70 worker_pool_cleanup(struct worker_pool *p)
71 {
72   for (uns i=0; i < p->num_threads; i++)
73     {
74       struct work w = {
75         .go = worker_thread_signal_finish
76       };
77       raw_queue_put(&p->requests, &w);
78       sem_wait(p->init_cleanup_sem);
79     }
80
81   struct worker_thread *tmp;
82   CLIST_FOR_EACH_DELSAFE(struct worker_thread *, t, p->worker_threads, tmp)
83     {
84       int err = pthread_join(t->thread, NULL);
85       ASSERT(!err);
86       if (p->free_thread)
87         p->free_thread(t);
88       else
89         xfree(t);
90     }
91   raw_queue_cleanup(&p->requests);
92   sem_free(p->init_cleanup_sem);
93 }
94
95 void
96 raw_queue_init(struct raw_queue *q)
97 {
98   pthread_mutex_init(&q->queue_mutex, NULL);
99   clist_init(&q->queue);
100   q->queue_sem = sem_alloc();
101 }
102
103 void
104 raw_queue_cleanup(struct raw_queue *q)
105 {
106   ASSERT(clist_empty(&q->queue));
107   sem_free(q->queue_sem);
108   pthread_mutex_destroy(&q->queue_mutex);
109 }
110
111 void
112 raw_queue_put(struct raw_queue *q, struct work *w)
113 {
114   pthread_mutex_lock(&q->queue_mutex);
115   clist_add_tail(&q->queue, &w->n);
116   pthread_mutex_unlock(&q->queue_mutex);
117   sem_post(q->queue_sem);
118 }
119
120 struct work *
121 raw_queue_get(struct raw_queue *q)
122 {
123   sem_wait(q->queue_sem);
124   pthread_mutex_lock(&q->queue_mutex);
125   struct work *w = clist_head(&q->queue);
126   ASSERT(w);
127   clist_remove(&w->n);
128   pthread_mutex_unlock(&q->queue_mutex);
129   return w;
130 }
131
132 void
133 work_queue_init(struct worker_pool *p, struct work_queue *q)
134 {
135   q->pool = p;
136   q->nr_running = 0;
137   raw_queue_init(&q->finished);
138 }
139
140 void
141 work_queue_cleanup(struct work_queue *q)
142 {
143   ASSERT(!q->nr_running);
144   raw_queue_cleanup(&q->finished);
145 }
146
147 void
148 work_submit(struct work_queue *q, struct work *w)
149 {
150   ASSERT(w->go);
151   w->reply_to = q;
152   raw_queue_put(&q->pool->requests, w);
153   q->nr_running++;
154 }
155
156 struct work *
157 work_wait(struct work_queue *q)
158 {
159   while (q->nr_running)
160     {
161       struct work *w = raw_queue_get(&q->finished);
162       q->nr_running--;
163       if (w->returned)
164         w->returned(q, w);
165       else
166         return w;
167     }
168   return NULL;
169 }
170
171 #ifdef TEST
172
173 #include <unistd.h>
174
175 static void wt_init(struct worker_thread *t)
176 {
177   log(L_INFO, "INIT %d", t->id);
178 }
179
180 static void wt_cleanup(struct worker_thread *t)
181 {
182   log(L_INFO, "CLEANUP %d", t->id);
183 }
184
185 struct w {
186   struct work w;
187   uns id;
188 };
189
190 static void go(struct worker_thread *t, struct work *w)
191 {
192   log(L_INFO, "GO %d: request %d", t->id, ((struct w *)w)->id);
193   usleep(1);
194 }
195
196 int main(void)
197 {
198   struct worker_pool pool = {
199     .num_threads = 10,
200     .stack_size = 65536,
201     .init_thread = wt_init,
202     .cleanup_thread = wt_cleanup
203   };
204   worker_pool_init(&pool);
205
206   struct work_queue q;
207   work_queue_init(&pool, &q);
208   for (uns i=0; i<500; i++)
209     {
210       struct w *w = xmalloc_zero(sizeof(*w));
211       w->w.go = go;
212       w->id = i;
213       work_submit(&q, &w->w);
214     }
215
216   struct w *w;
217   while (w = (struct w *) work_wait(&q))
218     log(L_INFO, "Finished request %d", w->id);
219
220   work_queue_cleanup(&q);
221   worker_pool_cleanup(&pool);
222   return 0;
223 }
224
225 #endif