]> mj.ucw.cz Git - libucw.git/blob - lib/asio.c
Let eltpools maintain the number of allocated items. The overhead
[libucw.git] / lib / asio.c
1 /*
2  *      UCW Library -- Asynchronous I/O
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #undef LOCAL_DEBUG
11
12 #include "lib/lib.h"
13 #include "lib/asio.h"
14 #include "lib/threads.h"
15
16 #include <string.h>
17 #include <unistd.h>
18 #include <errno.h>
19
20 static uns asio_num_users;
21 static struct worker_pool asio_wpool;
22
23 static void
24 asio_init_unlocked(void)
25 {
26   if (asio_num_users++)
27     return;
28
29   DBG("ASIO: INIT");
30   asio_wpool.num_threads = 1;
31   worker_pool_init(&asio_wpool);
32 }
33
34 static void
35 asio_cleanup_unlocked(void)
36 {
37   if (--asio_num_users)
38     return;
39
40   DBG("ASIO: CLEANUP");
41   worker_pool_cleanup(&asio_wpool);
42 }
43
44 void
45 asio_init_queue(struct asio_queue *q)
46 {
47   ucwlib_lock();
48   asio_init_unlocked();
49   ucwlib_unlock();
50
51   DBG("ASIO: New queue %p", q);
52   ASSERT(q->buffer_size);
53   q->allocated_requests = 0;
54   q->running_requests = 0;
55   q->running_writebacks = 0;
56   q->use_count = 0;
57   clist_init(&q->idle_list);
58   clist_init(&q->done_list);
59   work_queue_init(&asio_wpool, &q->queue);
60 }
61
62 void
63 asio_cleanup_queue(struct asio_queue *q)
64 {
65   DBG("ASIO: Removing queue %p", q);
66   ASSERT(!q->running_requests);
67   ASSERT(!q->running_writebacks);
68   ASSERT(!q->allocated_requests);
69   ASSERT(clist_empty(&q->done_list));
70
71   struct asio_request *r;
72   while (r = clist_head(&q->idle_list))
73     {
74       clist_remove(&r->work.n);
75       big_free(r->buffer, q->buffer_size);
76       xfree(r);
77     }
78
79   work_queue_cleanup(&q->queue);
80
81   ucwlib_lock();
82   asio_cleanup_unlocked();
83   ucwlib_unlock();
84 }
85
86 struct asio_request *
87 asio_get(struct asio_queue *q)
88 {
89   q->allocated_requests++;
90   struct asio_request *r = clist_head(&q->idle_list);
91   if (!r)
92     {
93       r = xmalloc_zero(sizeof(*r));
94       r->queue = q;
95       r->buffer = big_alloc(q->buffer_size);
96       DBG("ASIO: Got %p (new)", r);
97     }
98   else
99     {
100       clist_remove(&r->work.n);
101       DBG("ASIO: Got %p", r);
102     }
103   r->op = ASIO_FREE;
104   r->fd = -1;
105   r->len = 0;
106   r->status = -1;
107   r->returned_errno = -1;
108   r->submitted = 0;
109   return r;
110 }
111
112 static int
113 asio_raw_wait(struct asio_queue *q)
114 {
115   struct asio_request *r = (struct asio_request *) work_wait(&q->queue);
116   if (!r)
117     return 0;
118   r->submitted = 0;
119   q->running_requests--;
120   if (r->op == ASIO_WRITE_BACK)
121     {
122       DBG("ASIO: Finished writeback %p", r);
123       if (r->status < 0)
124         die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno));
125       if (r->status != (int)r->len)
126         die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len);
127       q->running_writebacks--;
128       asio_put(r);
129     }
130   else
131     clist_add_tail(&q->done_list, &r->work.n);
132   return 1;
133 }
134
135 static void
136 asio_handler(struct worker_thread *t UNUSED, struct work *w)
137 {
138   struct asio_request *r = (struct asio_request *) w;
139
140   DBG("ASIO: Servicing %p (%s on fd=%d, len=%d)", r,
141       (char*[]) { "?", "READ", "WRITE", "WRITEBACK" }[r->op], r->fd, r->len);
142   errno = 0;
143   switch (r->op)
144     {
145     case ASIO_READ:
146       r->status = read(r->fd, r->buffer, r->len);
147       break;
148     case ASIO_WRITE:
149     case ASIO_WRITE_BACK:
150       r->status = write(r->fd, r->buffer, r->len);
151       break;
152     default:
153       die("ASIO: Got unknown request type %d", r->op);
154     }
155   r->returned_errno = errno;
156   DBG("ASIO: Finished %p (status=%d, errno=%d)", r, r->status, r->returned_errno);
157 }
158
159 void
160 asio_submit(struct asio_request *r)
161 {
162   struct asio_queue *q = r->queue;
163   DBG("ASIO: Submitting %p on queue %p", r, q);
164   ASSERT(r->op != ASIO_FREE);
165   ASSERT(!r->submitted);
166   if (r->op == ASIO_WRITE_BACK)
167     {
168       while (q->running_writebacks >= q->max_writebacks)
169         {
170           DBG("ASIO: Waiting for free writebacks");
171           if (!asio_raw_wait(q))
172             ASSERT(0);
173         }
174       q->running_writebacks++;
175     }
176   q->running_requests++;
177   r->submitted = 1;
178   r->work.go = asio_handler;
179   r->work.priority = 0;
180   work_submit(&q->queue, &r->work);
181 }
182
183 struct asio_request *
184 asio_wait(struct asio_queue *q)
185 {
186   struct asio_request *r;
187   while (!(r = clist_head(&q->done_list)))
188     {
189       DBG("ASIO: Waiting on queue %p", q);
190       if (!asio_raw_wait(q))
191         return NULL;
192     }
193   clist_remove(&r->work.n);
194   DBG("ASIO: Done %p", r);
195   return r;
196 }
197
198 void
199 asio_put(struct asio_request *r)
200 {
201   struct asio_queue *q = r->queue;
202   DBG("ASIO: Put %p", r);
203   ASSERT(!r->submitted);
204   ASSERT(q->allocated_requests);
205   clist_add_tail(&q->idle_list, &r->work.n);
206   q->allocated_requests--;
207 }
208
209 void
210 asio_sync(struct asio_queue *q)
211 {
212   DBG("ASIO: Syncing queue %p", q);
213   while (q->running_requests)
214     if (!asio_raw_wait(q))
215       ASSERT(0);
216 }
217
218 #ifdef TEST
219
220 int main(void)
221 {
222   struct asio_queue q;
223   struct asio_request *r;
224
225   q.buffer_size = 4096;
226   q.max_writebacks = 2;
227   asio_init_queue(&q);
228
229 #if 0
230
231   for (;;)
232     {
233       r = asio_get(&q);
234       r->op = ASIO_READ;
235       r->fd = 0;
236       r->len = q.buffer_size;
237       asio_submit(r);
238       r = asio_wait(&q);
239       ASSERT(r);
240       if (r->status <= 0)
241         {
242           asio_put(r);
243           break;
244         }
245       r->op = ASIO_WRITE_BACK;
246       r->fd = 1;
247       r->len = r->status;
248       asio_submit(r);
249     }
250   asio_sync(&q);
251
252 #else
253
254   r = asio_get(&q);
255   r->op = ASIO_READ;
256   r->fd = 0;
257   r->len = 1;
258   asio_submit(r);
259   r = asio_wait(&q);
260   ASSERT(r);
261   asio_put(r);
262
263   for (uns i=0; i<10; i++)
264     {
265       r = asio_get(&q);
266       r->op = ASIO_WRITE_BACK;
267       r->fd = 1;
268       r->len = 1;
269       r->buffer[0] = 'A' + i;
270       asio_submit(r);
271     }
272   asio_sync(&q);
273
274   r = asio_get(&q);
275   r->op = ASIO_WRITE;
276   r->fd = 1;
277   r->len = 1;
278   r->buffer[0] = '\n';
279   asio_submit(r);
280   r = asio_wait(&q);
281   ASSERT(r);
282   asio_put(r);
283
284 #endif
285
286   asio_cleanup_queue(&q);
287   return 0;
288 }
289
290 #endif