]> mj.ucw.cz Git - libucw.git/blob - lib/asio.c
312dd6071c3f448822604998b39dcf36045bdffb
[libucw.git] / lib / asio.c
1 /*
2  *      UCW Library -- Asynchronous I/O
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "lib/lib.h"
13 #include "lib/asio.h"
14 #include "lib/threads.h"
15
16 #include <string.h>
17 #include <unistd.h>
18 #include <errno.h>
19
20 static uns asio_num_users;
21 static struct worker_pool asio_wpool;
22
23 static void
24 asio_init_unlocked(void)
25 {
26   if (asio_num_users++)
27     return;
28
29   DBG("ASIO: INIT");
30   asio_wpool.num_threads = 1;
31   asio_wpool.stack_size = 65536;
32   worker_pool_init(&asio_wpool);
33 }
34
35 static void
36 asio_cleanup_unlocked(void)
37 {
38   if (--asio_num_users)
39     return;
40
41   DBG("ASIO: CLEANUP");
42   worker_pool_cleanup(&asio_wpool);
43 }
44
45 void
46 asio_init_queue(struct asio_queue *q)
47 {
48   ucwlib_lock();
49   asio_init_unlocked();
50   ucwlib_unlock();
51
52   DBG("ASIO: New queue %p", q);
53   ASSERT(q->buffer_size);
54   q->allocated_requests = 0;
55   q->running_requests = 0;
56   q->running_writebacks = 0;
57   q->use_count = 0;
58   clist_init(&q->idle_list);
59   clist_init(&q->done_list);
60   work_queue_init(&asio_wpool, &q->queue);
61 }
62
63 void
64 asio_cleanup_queue(struct asio_queue *q)
65 {
66   DBG("ASIO: Removing queue %p", q);
67   ASSERT(!q->running_requests);
68   ASSERT(!q->running_writebacks);
69   ASSERT(!q->allocated_requests);
70   ASSERT(clist_empty(&q->done_list));
71
72   struct asio_request *r;
73   while (r = clist_head(&q->idle_list))
74     {
75       clist_remove(&r->work.n);
76       big_free(r->buffer, q->buffer_size);
77       xfree(r);
78     }
79
80   work_queue_cleanup(&q->queue);
81
82   ucwlib_lock();
83   asio_cleanup_unlocked();
84   ucwlib_unlock();
85 }
86
87 struct asio_request *
88 asio_get(struct asio_queue *q)
89 {
90   q->allocated_requests++;
91   struct asio_request *r = clist_head(&q->idle_list);
92   if (!r)
93     {
94       r = xmalloc_zero(sizeof(*r));
95       r->queue = q;
96       r->buffer = big_alloc(q->buffer_size);
97       DBG("ASIO: Got %p (new)", r);
98     }
99   else
100     {
101       clist_remove(&r->work.n);
102       DBG("ASIO: Got %p", r);
103     }
104   r->op = ASIO_FREE;
105   r->fd = -1;
106   r->len = 0;
107   r->status = -1;
108   r->returned_errno = -1;
109   r->submitted = 0;
110   return r;
111 }
112
113 static int
114 asio_raw_wait(struct asio_queue *q)
115 {
116   struct asio_request *r = (struct asio_request *) work_wait(&q->queue);
117   if (!r)
118     return 0;
119   r->submitted = 0;
120   q->running_requests--;
121   if (r->op == ASIO_WRITE_BACK)
122     {
123       DBG("ASIO: Finished writeback %p", r);
124       if (r->status < 0)
125         die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno));
126       if (r->status != (int)r->len)
127         die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len);
128       q->running_writebacks--;
129       asio_put(r);
130     }
131   else
132     clist_add_tail(&q->done_list, &r->work.n);
133   return 1;
134 }
135
136 static void
137 asio_handler(struct worker_thread *t UNUSED, struct work *w)
138 {
139   struct asio_request *r = (struct asio_request *) w;
140
141   DBG("ASIO: Servicing %p (%s on fd=%d, len=%d)", r,
142       (char*[]) { "?", "READ", "WRITE", "WRITEBACK" }[r->op], r->fd, r->len);
143   errno = 0;
144   switch (r->op)
145     {
146     case ASIO_READ:
147       r->status = read(r->fd, r->buffer, r->len);
148       break;
149     case ASIO_WRITE:
150     case ASIO_WRITE_BACK:
151       r->status = write(r->fd, r->buffer, r->len);
152       break;
153     default:
154       die("ASIO: Got unknown request type %d", r->op);
155     }
156   r->returned_errno = errno;
157   DBG("ASIO: Finished %p (status=%d, errno=%d)", r, r->status, r->returned_errno);
158 }
159
160 void
161 asio_submit(struct asio_request *r)
162 {
163   struct asio_queue *q = r->queue;
164   DBG("ASIO: Submitting %p on queue %p", r, q);
165   ASSERT(r->op != ASIO_FREE);
166   ASSERT(!r->submitted);
167   if (r->op == ASIO_WRITE_BACK)
168     {
169       while (q->running_writebacks >= q->max_writebacks)
170         {
171           DBG("ASIO: Waiting for free writebacks");
172           if (!asio_raw_wait(q))
173             ASSERT(0);
174         }
175       q->running_writebacks++;
176     }
177   q->running_requests++;
178   r->submitted = 1;
179   r->work.go = asio_handler;
180   r->work.returned = NULL;
181   work_submit(&q->queue, &r->work);
182 }
183
184 struct asio_request *
185 asio_wait(struct asio_queue *q)
186 {
187   struct asio_request *r;
188   while (!(r = clist_head(&q->done_list)))
189     {
190       DBG("ASIO: Waiting on queue %p", q);
191       if (!asio_raw_wait(q))
192         return NULL;
193     }
194   clist_remove(&r->work.n);
195   DBG("ASIO: Done %p", r);
196   return r;
197 }
198
199 void
200 asio_put(struct asio_request *r)
201 {
202   struct asio_queue *q = r->queue;
203   DBG("ASIO: Put %p", r);
204   ASSERT(!r->submitted);
205   ASSERT(q->allocated_requests);
206   clist_add_tail(&q->idle_list, &r->work.n);
207   q->allocated_requests--;
208 }
209
210 void
211 asio_sync(struct asio_queue *q)
212 {
213   DBG("ASIO: Syncing queue %p", q);
214   while (q->running_requests)
215     if (!asio_raw_wait(q))
216       ASSERT(0);
217 }
218
219 #ifdef TEST
220
221 int main(void)
222 {
223   struct asio_queue q;
224   struct asio_request *r;
225
226   q.buffer_size = 4096;
227   q.max_writebacks = 2;
228   asio_init_queue(&q);
229
230 #if 0
231
232   for (;;)
233     {
234       r = asio_get(&q);
235       r->op = ASIO_READ;
236       r->fd = 0;
237       r->len = q.buffer_size;
238       asio_submit(r);
239       r = asio_wait(&q);
240       ASSERT(r);
241       if (r->status <= 0)
242         {
243           asio_put(r);
244           break;
245         }
246       r->op = ASIO_WRITE_BACK;
247       r->fd = 1;
248       r->len = r->status;
249       asio_submit(r);
250     }
251   asio_sync(&q);
252
253 #else
254
255   r = asio_get(&q);
256   r->op = ASIO_READ;
257   r->fd = 0;
258   r->len = 1;
259   asio_submit(r);
260   r = asio_wait(&q);
261   ASSERT(r);
262   asio_put(r);
263
264   for (uns i=0; i<10; i++)
265     {
266       r = asio_get(&q);
267       r->op = ASIO_WRITE_BACK;
268       r->fd = 1;
269       r->len = 1;
270       r->buffer[0] = 'A' + i;
271       asio_submit(r);
272     }
273   asio_sync(&q);
274
275   r = asio_get(&q);
276   r->op = ASIO_WRITE;
277   r->fd = 1;
278   r->len = 1;
279   r->buffer[0] = '\n';
280   asio_submit(r);
281   r = asio_wait(&q);
282   ASSERT(r);
283   asio_put(r);
284
285 #endif
286
287   asio_cleanup_queue(&q);
288   return 0;
289 }
290
291 #endif