]> mj.ucw.cz Git - libucw.git/blob - lib/asio.c
Implemented an asynchronous I/O library module.
[libucw.git] / lib / asio.c
1 /*
2  *      UCW Library -- Asynchronous I/O
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "lib/lib.h"
13 #include "lib/asio.h"
14
15 #include <string.h>
16 #include <unistd.h>
17 #include <errno.h>
18
19 static uns asio_num_users;
20 static struct worker_pool asio_wpool;
21
22 static void
23 asio_init(void)
24 {
25   if (asio_num_users++)
26     return;
27
28   DBG("ASIO: INIT");
29   asio_wpool.num_threads = 1;
30   asio_wpool.stack_size = 65536;
31   worker_pool_init(&asio_wpool);
32 }
33
34 static void
35 asio_cleanup(void)
36 {
37   if (--asio_num_users)
38     return;
39
40   DBG("ASIO: CLEANUP");
41   worker_pool_cleanup(&asio_wpool);
42 }
43
44 void
45 asio_init_queue(struct asio_queue *q)
46 {
47   asio_init();
48
49   DBG("ASIO: New queue %p", q);
50   ASSERT(q->buffer_size);
51   q->allocated_requests = 0;
52   q->allocated_writebacks = 0;
53   q->running_requests = 0;
54   clist_init(&q->idle_list);
55   clist_init(&q->done_list);
56   work_queue_init(&asio_wpool, &q->queue);
57 }
58
59 void
60 asio_cleanup_queue(struct asio_queue *q)
61 {
62   DBG("ASIO: Removing queue %p", q);
63   ASSERT(!q->running_requests);
64   ASSERT(!q->allocated_requests);
65   ASSERT(!q->allocated_writebacks);
66   ASSERT(clist_empty(&q->done_list));
67
68   struct asio_request *r;
69   while (r = clist_head(&q->idle_list))
70     {
71       clist_remove(&r->work.n);
72       big_free(r->buffer, q->buffer_size);
73       xfree(r);
74     }
75
76   work_queue_cleanup(&q->queue);
77   asio_cleanup();
78 }
79
80 struct asio_request *
81 asio_get(struct asio_queue *q)
82 {
83   q->allocated_requests++;
84   struct asio_request *r = clist_head(&q->idle_list);
85   if (!r)
86     {
87       r = xmalloc_zero(sizeof(*r));
88       r->queue = q;
89       r->buffer = big_alloc(q->buffer_size);
90       DBG("ASIO: Got %p (new)", r);
91     }
92   else
93     {
94       clist_remove(&r->work.n);
95       DBG("ASIO: Got %p", r);
96     }
97   r->op = ASIO_FREE;
98   return r;
99 }
100
101 static int
102 asio_raw_wait(struct asio_queue *q)
103 {
104   struct asio_request *r = (struct asio_request *) work_wait(&q->queue);
105   if (!r)
106     return 0;
107   q->running_requests--;
108   if (r->op == ASIO_WRITE_BACK)
109     {
110       DBG("ASIO: Finished writeback %p", r);
111       if (r->status < 0)
112         die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno));
113       if (r->status != (int)r->len)
114         die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len);
115       q->allocated_writebacks--;
116       asio_put(r);
117     }
118   else
119     clist_add_tail(&q->done_list, &r->work.n);
120   return 1;
121 }
122
123 struct asio_request *
124 asio_get_writeback(struct asio_queue *q)
125 {
126   while (q->allocated_writebacks >= q->max_writebacks)
127     {
128       DBG("ASIO: Waiting for free writeback request");
129       if (!asio_raw_wait(q))
130         ASSERT(0);
131     }
132   q->allocated_writebacks++;
133   struct asio_request *r = asio_get(q);
134   r->op = ASIO_WRITE_BACK;
135   return r;
136 }
137
138 void
139 asio_turn_to_writeback(struct asio_request *r)
140 {
141   struct asio_queue *q = r->queue;
142   ASSERT(r->op != ASIO_WRITE_BACK);
143   while (q->allocated_writebacks >= q->max_writebacks)
144     {
145       DBG("ASIO: Waiting for free writeback request");
146       if (!asio_raw_wait(q))
147         ASSERT(0);
148     }
149   q->allocated_writebacks++;
150   r->op = ASIO_WRITE_BACK;
151 }
152
153 static void
154 asio_handler(struct worker_thread *t UNUSED, struct work *w)
155 {
156   struct asio_request *r = (struct asio_request *) w;
157
158   DBG("ASIO: Servicing %p (%s on fd=%d, len=%d)", r,
159       (char*[]) { "?", "READ", "WRITE", "WRITEBACK" }[r->op], r->fd, r->len);
160   errno = 0;
161   switch (r->op)
162     {
163     case ASIO_READ:
164       r->status = read(r->fd, r->buffer, r->len);
165       break;
166     case ASIO_WRITE:
167     case ASIO_WRITE_BACK:
168       r->status = write(r->fd, r->buffer, r->len);
169       break;
170     default:
171       die("ASIO: Got unknown request type %d", r->op);
172     }
173   r->returned_errno = errno;
174   DBG("ASIO: Finished %p (status=%d, errno=%d)", r, r->status, r->returned_errno);
175 }
176
177 void
178 asio_submit(struct asio_request *r)
179 {
180   struct asio_queue *q = r->queue;
181   DBG("ASIO: Submitting %p on queue %p", r, q);
182   ASSERT(r->op != ASIO_FREE);
183   q->running_requests++;
184   r->work.go = asio_handler;
185   r->work.returned = NULL;
186   work_submit(&q->queue, &r->work);
187 }
188
189 struct asio_request *
190 asio_wait(struct asio_queue *q)
191 {
192   struct asio_request *r;
193   while (!(r = clist_head(&q->done_list)))
194     {
195       DBG("ASIO: Waiting on queue %p", q);
196       if (!asio_raw_wait(q))
197         return NULL;
198     }
199   clist_remove(&r->work.n);
200   DBG("ASIO: Done %p", r);
201   return r;
202 }
203
204 void
205 asio_put(struct asio_request *r)
206 {
207   struct asio_queue *q = r->queue;
208   DBG("ASIO: Put %p", r);
209   ASSERT(q->allocated_requests);
210   clist_add_tail(&q->idle_list, &r->work.n);
211   q->allocated_requests--;
212 }
213
214 void
215 asio_sync(struct asio_queue *q)
216 {
217   DBG("ASIO: Syncing queue %p", q);
218   while (q->running_requests)
219     if (!asio_raw_wait(q))
220       ASSERT(0);
221 }
222
223 #ifdef TEST
224
225 int main(void)
226 {
227   struct asio_queue q;
228   struct asio_request *r;
229
230   q.buffer_size = 4096;
231   q.max_writebacks = 2;
232   asio_init_queue(&q);
233
234 #if 0
235
236   for (;;)
237     {
238       r = asio_get(&q);
239       r->op = ASIO_READ;
240       r->fd = 0;
241       r->len = q.buffer_size;
242       asio_submit(r);
243       r = asio_wait(&q);
244       ASSERT(r);
245       if (r->status <= 0)
246         {
247           asio_put(r);
248           break;
249         }
250       asio_turn_to_writeback(r);
251       r->fd = 1;
252       r->len = r->status;
253       asio_submit(r);
254     }
255   asio_sync(&q);
256
257 #else
258
259   r = asio_get(&q);
260   r->op = ASIO_READ;
261   r->fd = 0;
262   r->len = 1;
263   asio_submit(r);
264   r = asio_wait(&q);
265   ASSERT(r);
266   asio_put(r);
267
268   for (uns i=0; i<10; i++)
269     {
270       r = asio_get_writeback(&q);
271       r->fd = 1;
272       r->len = 1;
273       r->buffer[0] = 'A' + i;
274       asio_submit(r);
275     }
276   asio_sync(&q);
277
278   r = asio_get(&q);
279   r->op = ASIO_WRITE;
280   r->fd = 1;
281   r->len = 1;
282   r->buffer[0] = '\n';
283   asio_submit(r);
284   r = asio_wait(&q);
285   ASSERT(r);
286   asio_put(r);
287
288 #endif
289
290   asio_cleanup_queue(&q);
291   return 0;
292 }
293
294 #endif