2 * UCW Library -- Asynchronous I/O
4 * (c) 2006 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
14 #include "lib/threads.h"
20 static uns asio_num_users;
21 static struct worker_pool asio_wpool;
24 asio_init_unlocked(void)
30 asio_wpool.num_threads = 1;
31 asio_wpool.stack_size = 65536;
32 worker_pool_init(&asio_wpool);
36 asio_cleanup_unlocked(void)
42 worker_pool_cleanup(&asio_wpool);
46 asio_init_queue(struct asio_queue *q)
52 DBG("ASIO: New queue %p", q);
53 ASSERT(q->buffer_size);
54 q->allocated_requests = 0;
55 q->running_requests = 0;
56 q->running_writebacks = 0;
58 clist_init(&q->idle_list);
59 clist_init(&q->done_list);
60 work_queue_init(&asio_wpool, &q->queue);
64 asio_cleanup_queue(struct asio_queue *q)
66 DBG("ASIO: Removing queue %p", q);
67 ASSERT(!q->running_requests);
68 ASSERT(!q->running_writebacks);
69 ASSERT(!q->allocated_requests);
70 ASSERT(clist_empty(&q->done_list));
72 struct asio_request *r;
73 while (r = clist_head(&q->idle_list))
75 clist_remove(&r->work.n);
76 big_free(r->buffer, q->buffer_size);
80 work_queue_cleanup(&q->queue);
83 asio_cleanup_unlocked();
88 asio_get(struct asio_queue *q)
90 q->allocated_requests++;
91 struct asio_request *r = clist_head(&q->idle_list);
94 r = xmalloc_zero(sizeof(*r));
96 r->buffer = big_alloc(q->buffer_size);
97 DBG("ASIO: Got %p (new)", r);
101 clist_remove(&r->work.n);
102 DBG("ASIO: Got %p", r);
108 r->returned_errno = -1;
114 asio_raw_wait(struct asio_queue *q)
116 struct asio_request *r = (struct asio_request *) work_wait(&q->queue);
120 q->running_requests--;
121 if (r->op == ASIO_WRITE_BACK)
123 DBG("ASIO: Finished writeback %p", r);
125 die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno));
126 if (r->status != (int)r->len)
127 die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len);
128 q->running_writebacks--;
132 clist_add_tail(&q->done_list, &r->work.n);
137 asio_handler(struct worker_thread *t UNUSED, struct work *w)
139 struct asio_request *r = (struct asio_request *) w;
141 DBG("ASIO: Servicing %p (%s on fd=%d, len=%d)", r,
142 (char*[]) { "?", "READ", "WRITE", "WRITEBACK" }[r->op], r->fd, r->len);
147 r->status = read(r->fd, r->buffer, r->len);
150 case ASIO_WRITE_BACK:
151 r->status = write(r->fd, r->buffer, r->len);
154 die("ASIO: Got unknown request type %d", r->op);
156 r->returned_errno = errno;
157 DBG("ASIO: Finished %p (status=%d, errno=%d)", r, r->status, r->returned_errno);
161 asio_submit(struct asio_request *r)
163 struct asio_queue *q = r->queue;
164 DBG("ASIO: Submitting %p on queue %p", r, q);
165 ASSERT(r->op != ASIO_FREE);
166 ASSERT(!r->submitted);
167 if (r->op == ASIO_WRITE_BACK)
169 while (q->running_writebacks >= q->max_writebacks)
171 DBG("ASIO: Waiting for free writebacks");
172 if (!asio_raw_wait(q))
175 q->running_writebacks++;
177 q->running_requests++;
179 r->work.go = asio_handler;
180 r->work.returned = NULL;
181 work_submit(&q->queue, &r->work);
184 struct asio_request *
185 asio_wait(struct asio_queue *q)
187 struct asio_request *r;
188 while (!(r = clist_head(&q->done_list)))
190 DBG("ASIO: Waiting on queue %p", q);
191 if (!asio_raw_wait(q))
194 clist_remove(&r->work.n);
195 DBG("ASIO: Done %p", r);
200 asio_put(struct asio_request *r)
202 struct asio_queue *q = r->queue;
203 DBG("ASIO: Put %p", r);
204 ASSERT(!r->submitted);
205 ASSERT(q->allocated_requests);
206 clist_add_tail(&q->idle_list, &r->work.n);
207 q->allocated_requests--;
211 asio_sync(struct asio_queue *q)
213 DBG("ASIO: Syncing queue %p", q);
214 while (q->running_requests)
215 if (!asio_raw_wait(q))
224 struct asio_request *r;
226 q.buffer_size = 4096;
227 q.max_writebacks = 2;
237 r->len = q.buffer_size;
246 r->op = ASIO_WRITE_BACK;
264 for (uns i=0; i<10; i++)
267 r->op = ASIO_WRITE_BACK;
270 r->buffer[0] = 'A' + i;
287 asio_cleanup_queue(&q);