2 * UCW Library -- Asynchronous I/O
4 * (c) 2006 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
19 static uns asio_num_users;
20 static struct worker_pool asio_wpool;
29 asio_wpool.num_threads = 1;
30 asio_wpool.stack_size = 65536;
31 worker_pool_init(&asio_wpool);
41 worker_pool_cleanup(&asio_wpool);
45 asio_init_queue(struct asio_queue *q)
49 DBG("ASIO: New queue %p", q);
50 ASSERT(q->buffer_size);
51 q->allocated_requests = 0;
52 q->allocated_writebacks = 0;
53 q->running_requests = 0;
54 clist_init(&q->idle_list);
55 clist_init(&q->done_list);
56 work_queue_init(&asio_wpool, &q->queue);
60 asio_cleanup_queue(struct asio_queue *q)
62 DBG("ASIO: Removing queue %p", q);
63 ASSERT(!q->running_requests);
64 ASSERT(!q->allocated_requests);
65 ASSERT(!q->allocated_writebacks);
66 ASSERT(clist_empty(&q->done_list));
68 struct asio_request *r;
69 while (r = clist_head(&q->idle_list))
71 clist_remove(&r->work.n);
72 big_free(r->buffer, q->buffer_size);
76 work_queue_cleanup(&q->queue);
81 asio_get(struct asio_queue *q)
83 q->allocated_requests++;
84 struct asio_request *r = clist_head(&q->idle_list);
87 r = xmalloc_zero(sizeof(*r));
89 r->buffer = big_alloc(q->buffer_size);
90 DBG("ASIO: Got %p (new)", r);
94 clist_remove(&r->work.n);
95 DBG("ASIO: Got %p", r);
102 asio_raw_wait(struct asio_queue *q)
104 struct asio_request *r = (struct asio_request *) work_wait(&q->queue);
107 q->running_requests--;
108 if (r->op == ASIO_WRITE_BACK)
110 DBG("ASIO: Finished writeback %p", r);
112 die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno));
113 if (r->status != (int)r->len)
114 die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len);
115 q->allocated_writebacks--;
119 clist_add_tail(&q->done_list, &r->work.n);
123 struct asio_request *
124 asio_get_writeback(struct asio_queue *q)
126 while (q->allocated_writebacks >= q->max_writebacks)
128 DBG("ASIO: Waiting for free writeback request");
129 if (!asio_raw_wait(q))
132 q->allocated_writebacks++;
133 struct asio_request *r = asio_get(q);
134 r->op = ASIO_WRITE_BACK;
139 asio_turn_to_writeback(struct asio_request *r)
141 struct asio_queue *q = r->queue;
142 ASSERT(r->op != ASIO_WRITE_BACK);
143 while (q->allocated_writebacks >= q->max_writebacks)
145 DBG("ASIO: Waiting for free writeback request");
146 if (!asio_raw_wait(q))
149 q->allocated_writebacks++;
150 r->op = ASIO_WRITE_BACK;
154 asio_handler(struct worker_thread *t UNUSED, struct work *w)
156 struct asio_request *r = (struct asio_request *) w;
158 DBG("ASIO: Servicing %p (%s on fd=%d, len=%d)", r,
159 (char*[]) { "?", "READ", "WRITE", "WRITEBACK" }[r->op], r->fd, r->len);
164 r->status = read(r->fd, r->buffer, r->len);
167 case ASIO_WRITE_BACK:
168 r->status = write(r->fd, r->buffer, r->len);
171 die("ASIO: Got unknown request type %d", r->op);
173 r->returned_errno = errno;
174 DBG("ASIO: Finished %p (status=%d, errno=%d)", r, r->status, r->returned_errno);
178 asio_submit(struct asio_request *r)
180 struct asio_queue *q = r->queue;
181 DBG("ASIO: Submitting %p on queue %p", r, q);
182 ASSERT(r->op != ASIO_FREE);
183 q->running_requests++;
184 r->work.go = asio_handler;
185 r->work.returned = NULL;
186 work_submit(&q->queue, &r->work);
189 struct asio_request *
190 asio_wait(struct asio_queue *q)
192 struct asio_request *r;
193 while (!(r = clist_head(&q->done_list)))
195 DBG("ASIO: Waiting on queue %p", q);
196 if (!asio_raw_wait(q))
199 clist_remove(&r->work.n);
200 DBG("ASIO: Done %p", r);
205 asio_put(struct asio_request *r)
207 struct asio_queue *q = r->queue;
208 DBG("ASIO: Put %p", r);
209 ASSERT(q->allocated_requests);
210 clist_add_tail(&q->idle_list, &r->work.n);
211 q->allocated_requests--;
215 asio_sync(struct asio_queue *q)
217 DBG("ASIO: Syncing queue %p", q);
218 while (q->running_requests)
219 if (!asio_raw_wait(q))
228 struct asio_request *r;
230 q.buffer_size = 4096;
231 q.max_writebacks = 2;
241 r->len = q.buffer_size;
250 asio_turn_to_writeback(r);
268 for (uns i=0; i<10; i++)
270 r = asio_get_writeback(&q);
273 r->buffer[0] = 'A' + i;
290 asio_cleanup_queue(&q);