]> mj.ucw.cz Git - libucw.git/blob - ucw/asio.c
UCW doc: Fixed a typo.
[libucw.git] / ucw / asio.c
1 /*
2  *      UCW Library -- Asynchronous I/O
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "ucw/lib.h"
13 #include "ucw/asio.h"
14 #include "ucw/threads.h"
15
16 #include <string.h>
17 #include <unistd.h>
18 #include <errno.h>
19
20 static uns asio_num_users;
21 static struct worker_pool asio_wpool;
22
23 static void
24 asio_init_unlocked(void)
25 {
26   if (asio_num_users++)
27     return;
28
29   DBG("ASIO: INIT");
30   asio_wpool.num_threads = 1;
31   worker_pool_init(&asio_wpool);
32 }
33
34 static void
35 asio_cleanup_unlocked(void)
36 {
37   if (--asio_num_users)
38     return;
39
40   DBG("ASIO: CLEANUP");
41   worker_pool_cleanup(&asio_wpool);
42 }
43
44 void
45 asio_init_queue(struct asio_queue *q)
46 {
47   ucwlib_lock();
48   asio_init_unlocked();
49   ucwlib_unlock();
50
51   DBG("ASIO: New queue %p", q);
52   ASSERT(q->buffer_size);
53   q->allocated_requests = 0;
54   q->running_requests = 0;
55   q->running_writebacks = 0;
56   q->use_count = 0;
57   clist_init(&q->idle_list);
58   clist_init(&q->done_list);
59   work_queue_init(&asio_wpool, &q->queue);
60 }
61
62 void
63 asio_cleanup_queue(struct asio_queue *q)
64 {
65   DBG("ASIO: Removing queue %p", q);
66   ASSERT(!q->running_requests);
67   ASSERT(!q->running_writebacks);
68   ASSERT(!q->allocated_requests);
69   ASSERT(clist_empty(&q->done_list));
70
71   struct asio_request *r;
72   while (r = clist_remove_head(&q->idle_list))
73     {
74       big_free(r->buffer, q->buffer_size);
75       xfree(r);
76     }
77
78   work_queue_cleanup(&q->queue);
79
80   ucwlib_lock();
81   asio_cleanup_unlocked();
82   ucwlib_unlock();
83 }
84
85 struct asio_request *
86 asio_get(struct asio_queue *q)
87 {
88   q->allocated_requests++;
89   struct asio_request *r = clist_head(&q->idle_list);
90   if (!r)
91     {
92       r = xmalloc_zero(sizeof(*r));
93       r->queue = q;
94       r->buffer = big_alloc(q->buffer_size);
95       DBG("ASIO: Got %p (new)", r);
96     }
97   else
98     {
99       clist_remove(&r->work.n);
100       DBG("ASIO: Got %p", r);
101     }
102   r->op = ASIO_FREE;
103   r->fd = -1;
104   r->len = 0;
105   r->status = -1;
106   r->returned_errno = -1;
107   r->submitted = 0;
108   return r;
109 }
110
111 static int
112 asio_raw_wait(struct asio_queue *q)
113 {
114   struct asio_request *r = (struct asio_request *) work_wait(&q->queue);
115   if (!r)
116     return 0;
117   r->submitted = 0;
118   q->running_requests--;
119   if (r->op == ASIO_WRITE_BACK)
120     {
121       DBG("ASIO: Finished writeback %p", r);
122       if (r->status < 0)
123         die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno));
124       if (r->status != (int)r->len)
125         die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len);
126       q->running_writebacks--;
127       asio_put(r);
128     }
129   else
130     clist_add_tail(&q->done_list, &r->work.n);
131   return 1;
132 }
133
134 static void
135 asio_handler(struct worker_thread *t UNUSED, struct work *w)
136 {
137   struct asio_request *r = (struct asio_request *) w;
138
139   DBG("ASIO: Servicing %p (%s on fd=%d, len=%d)", r,
140       (char*[]) { "?", "READ", "WRITE", "WRITEBACK" }[r->op], r->fd, r->len);
141   errno = 0;
142   switch (r->op)
143     {
144     case ASIO_READ:
145       r->status = read(r->fd, r->buffer, r->len);
146       break;
147     case ASIO_WRITE:
148     case ASIO_WRITE_BACK:
149       r->status = write(r->fd, r->buffer, r->len);
150       break;
151     default:
152       die("ASIO: Got unknown request type %d", r->op);
153     }
154   r->returned_errno = errno;
155   DBG("ASIO: Finished %p (status=%d, errno=%d)", r, r->status, r->returned_errno);
156 }
157
158 void
159 asio_submit(struct asio_request *r)
160 {
161   struct asio_queue *q = r->queue;
162   DBG("ASIO: Submitting %p on queue %p", r, q);
163   ASSERT(r->op != ASIO_FREE);
164   ASSERT(!r->submitted);
165   if (r->op == ASIO_WRITE_BACK)
166     {
167       while (q->running_writebacks >= q->max_writebacks)
168         {
169           DBG("ASIO: Waiting for free writebacks");
170           if (!asio_raw_wait(q))
171             ASSERT(0);
172         }
173       q->running_writebacks++;
174     }
175   q->running_requests++;
176   r->submitted = 1;
177   r->work.go = asio_handler;
178   r->work.priority = 0;
179   work_submit(&q->queue, &r->work);
180 }
181
182 struct asio_request *
183 asio_wait(struct asio_queue *q)
184 {
185   struct asio_request *r;
186   while (!(r = clist_head(&q->done_list)))
187     {
188       DBG("ASIO: Waiting on queue %p", q);
189       if (!asio_raw_wait(q))
190         return NULL;
191     }
192   clist_remove(&r->work.n);
193   DBG("ASIO: Done %p", r);
194   return r;
195 }
196
197 void
198 asio_put(struct asio_request *r)
199 {
200   struct asio_queue *q = r->queue;
201   DBG("ASIO: Put %p", r);
202   ASSERT(!r->submitted);
203   ASSERT(q->allocated_requests);
204   clist_add_tail(&q->idle_list, &r->work.n);
205   q->allocated_requests--;
206 }
207
208 void
209 asio_sync(struct asio_queue *q)
210 {
211   DBG("ASIO: Syncing queue %p", q);
212   while (q->running_requests)
213     if (!asio_raw_wait(q))
214       ASSERT(0);
215 }
216
217 #ifdef TEST
218
219 int main(void)
220 {
221   struct asio_queue q;
222   struct asio_request *r;
223
224   q.buffer_size = 4096;
225   q.max_writebacks = 2;
226   asio_init_queue(&q);
227
228 #if 0
229
230   for (;;)
231     {
232       r = asio_get(&q);
233       r->op = ASIO_READ;
234       r->fd = 0;
235       r->len = q.buffer_size;
236       asio_submit(r);
237       r = asio_wait(&q);
238       ASSERT(r);
239       if (r->status <= 0)
240         {
241           asio_put(r);
242           break;
243         }
244       r->op = ASIO_WRITE_BACK;
245       r->fd = 1;
246       r->len = r->status;
247       asio_submit(r);
248     }
249   asio_sync(&q);
250
251 #else
252
253   r = asio_get(&q);
254   r->op = ASIO_READ;
255   r->fd = 0;
256   r->len = 1;
257   asio_submit(r);
258   r = asio_wait(&q);
259   ASSERT(r);
260   asio_put(r);
261
262   for (uns i=0; i<10; i++)
263     {
264       r = asio_get(&q);
265       r->op = ASIO_WRITE_BACK;
266       r->fd = 1;
267       r->len = 1;
268       r->buffer[0] = 'A' + i;
269       asio_submit(r);
270     }
271   asio_sync(&q);
272
273   r = asio_get(&q);
274   r->op = ASIO_WRITE;
275   r->fd = 1;
276   r->len = 1;
277   r->buffer[0] = '\n';
278   asio_submit(r);
279   r = asio_wait(&q);
280   ASSERT(r);
281   asio_put(r);
282
283 #endif
284
285   asio_cleanup_queue(&q);
286   return 0;
287 }
288
289 #endif