]> mj.ucw.cz Git - libucw.git/blob - lib/asio.c
Creation of temporary files is now thread-safe (ugh).
[libucw.git] / lib / asio.c
1 /*
2  *      UCW Library -- Asynchronous I/O
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "lib/lib.h"
13 #include "lib/asio.h"
14
15 #include <string.h>
16 #include <unistd.h>
17 #include <errno.h>
18
19 static uns asio_num_users;
20 static struct worker_pool asio_wpool;
21 static pthread_mutex_t asio_init_lock;
22
23 static void CONSTRUCTOR
24 asio_global_init(void)
25 {
26   pthread_mutex_init(&asio_init_lock, NULL);
27 }
28
29 static void
30 asio_init_unlocked(void)
31 {
32   if (asio_num_users++)
33     return;
34
35   DBG("ASIO: INIT");
36   asio_wpool.num_threads = 1;
37   asio_wpool.stack_size = 65536;
38   worker_pool_init(&asio_wpool);
39 }
40
41 static void
42 asio_cleanup_unlocked(void)
43 {
44   if (--asio_num_users)
45     return;
46
47   DBG("ASIO: CLEANUP");
48   worker_pool_cleanup(&asio_wpool);
49 }
50
51 void
52 asio_init_queue(struct asio_queue *q)
53 {
54   pthread_mutex_lock(&asio_init_lock);
55   asio_init_unlocked();
56   pthread_mutex_unlock(&asio_init_lock);
57
58   DBG("ASIO: New queue %p", q);
59   ASSERT(q->buffer_size);
60   q->allocated_requests = 0;
61   q->running_requests = 0;
62   q->running_writebacks = 0;
63   q->use_count = 0;
64   clist_init(&q->idle_list);
65   clist_init(&q->done_list);
66   work_queue_init(&asio_wpool, &q->queue);
67 }
68
69 void
70 asio_cleanup_queue(struct asio_queue *q)
71 {
72   DBG("ASIO: Removing queue %p", q);
73   ASSERT(!q->running_requests);
74   ASSERT(!q->running_writebacks);
75   ASSERT(!q->allocated_requests);
76   ASSERT(clist_empty(&q->done_list));
77
78   struct asio_request *r;
79   while (r = clist_head(&q->idle_list))
80     {
81       clist_remove(&r->work.n);
82       big_free(r->buffer, q->buffer_size);
83       xfree(r);
84     }
85
86   work_queue_cleanup(&q->queue);
87
88   pthread_mutex_lock(&asio_init_lock);
89   asio_cleanup_unlocked();
90   pthread_mutex_unlock(&asio_init_lock);
91 }
92
93 struct asio_request *
94 asio_get(struct asio_queue *q)
95 {
96   q->allocated_requests++;
97   struct asio_request *r = clist_head(&q->idle_list);
98   if (!r)
99     {
100       r = xmalloc_zero(sizeof(*r));
101       r->queue = q;
102       r->buffer = big_alloc(q->buffer_size);
103       DBG("ASIO: Got %p (new)", r);
104     }
105   else
106     {
107       clist_remove(&r->work.n);
108       DBG("ASIO: Got %p", r);
109     }
110   r->op = ASIO_FREE;
111   r->fd = -1;
112   r->len = 0;
113   r->status = -1;
114   r->returned_errno = -1;
115   r->submitted = 0;
116   return r;
117 }
118
119 static int
120 asio_raw_wait(struct asio_queue *q)
121 {
122   struct asio_request *r = (struct asio_request *) work_wait(&q->queue);
123   if (!r)
124     return 0;
125   r->submitted = 0;
126   q->running_requests--;
127   if (r->op == ASIO_WRITE_BACK)
128     {
129       DBG("ASIO: Finished writeback %p", r);
130       if (r->status < 0)
131         die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno));
132       if (r->status != (int)r->len)
133         die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len);
134       q->running_writebacks--;
135       asio_put(r);
136     }
137   else
138     clist_add_tail(&q->done_list, &r->work.n);
139   return 1;
140 }
141
142 static void
143 asio_handler(struct worker_thread *t UNUSED, struct work *w)
144 {
145   struct asio_request *r = (struct asio_request *) w;
146
147   DBG("ASIO: Servicing %p (%s on fd=%d, len=%d)", r,
148       (char*[]) { "?", "READ", "WRITE", "WRITEBACK" }[r->op], r->fd, r->len);
149   errno = 0;
150   switch (r->op)
151     {
152     case ASIO_READ:
153       r->status = read(r->fd, r->buffer, r->len);
154       break;
155     case ASIO_WRITE:
156     case ASIO_WRITE_BACK:
157       r->status = write(r->fd, r->buffer, r->len);
158       break;
159     default:
160       die("ASIO: Got unknown request type %d", r->op);
161     }
162   r->returned_errno = errno;
163   DBG("ASIO: Finished %p (status=%d, errno=%d)", r, r->status, r->returned_errno);
164 }
165
166 void
167 asio_submit(struct asio_request *r)
168 {
169   struct asio_queue *q = r->queue;
170   DBG("ASIO: Submitting %p on queue %p", r, q);
171   ASSERT(r->op != ASIO_FREE);
172   ASSERT(!r->submitted);
173   if (r->op == ASIO_WRITE_BACK)
174     {
175       while (q->running_writebacks >= q->max_writebacks)
176         {
177           DBG("ASIO: Waiting for free writebacks");
178           if (!asio_raw_wait(q))
179             ASSERT(0);
180         }
181       q->running_writebacks++;
182     }
183   q->running_requests++;
184   r->submitted = 1;
185   r->work.go = asio_handler;
186   r->work.returned = NULL;
187   work_submit(&q->queue, &r->work);
188 }
189
190 struct asio_request *
191 asio_wait(struct asio_queue *q)
192 {
193   struct asio_request *r;
194   while (!(r = clist_head(&q->done_list)))
195     {
196       DBG("ASIO: Waiting on queue %p", q);
197       if (!asio_raw_wait(q))
198         return NULL;
199     }
200   clist_remove(&r->work.n);
201   DBG("ASIO: Done %p", r);
202   return r;
203 }
204
205 void
206 asio_put(struct asio_request *r)
207 {
208   struct asio_queue *q = r->queue;
209   DBG("ASIO: Put %p", r);
210   ASSERT(!r->submitted);
211   ASSERT(q->allocated_requests);
212   clist_add_tail(&q->idle_list, &r->work.n);
213   q->allocated_requests--;
214 }
215
216 void
217 asio_sync(struct asio_queue *q)
218 {
219   DBG("ASIO: Syncing queue %p", q);
220   while (q->running_requests)
221     if (!asio_raw_wait(q))
222       ASSERT(0);
223 }
224
225 #ifdef TEST
226
227 int main(void)
228 {
229   struct asio_queue q;
230   struct asio_request *r;
231
232   q.buffer_size = 4096;
233   q.max_writebacks = 2;
234   asio_init_queue(&q);
235
236 #if 0
237
238   for (;;)
239     {
240       r = asio_get(&q);
241       r->op = ASIO_READ;
242       r->fd = 0;
243       r->len = q.buffer_size;
244       asio_submit(r);
245       r = asio_wait(&q);
246       ASSERT(r);
247       if (r->status <= 0)
248         {
249           asio_put(r);
250           break;
251         }
252       r->op = ASIO_WRITE_BACK;
253       r->fd = 1;
254       r->len = r->status;
255       asio_submit(r);
256     }
257   asio_sync(&q);
258
259 #else
260
261   r = asio_get(&q);
262   r->op = ASIO_READ;
263   r->fd = 0;
264   r->len = 1;
265   asio_submit(r);
266   r = asio_wait(&q);
267   ASSERT(r);
268   asio_put(r);
269
270   for (uns i=0; i<10; i++)
271     {
272       r = asio_get(&q);
273       r->op = ASIO_WRITE_BACK;
274       r->fd = 1;
275       r->len = 1;
276       r->buffer[0] = 'A' + i;
277       asio_submit(r);
278     }
279   asio_sync(&q);
280
281   r = asio_get(&q);
282   r->op = ASIO_WRITE;
283   r->fd = 1;
284   r->len = 1;
285   r->buffer[0] = '\n';
286   asio_submit(r);
287   r = asio_wait(&q);
288   ASSERT(r);
289   asio_put(r);
290
291 #endif
292
293   asio_cleanup_queue(&q);
294   return 0;
295 }
296
297 #endif