2 * UCW Library -- Asynchronous I/O
4 * (c) 2006 Martin Mares <mj@ucw.cz>
6 * This software may be freely distributed and used according to the terms
7 * of the GNU Lesser General Public License.
19 static uns asio_num_users;
20 static struct worker_pool asio_wpool;
21 static pthread_mutex_t asio_init_lock;
23 static void CONSTRUCTOR
24 asio_global_init(void)
26 pthread_mutex_init(&asio_init_lock, NULL);
30 asio_init_unlocked(void)
36 asio_wpool.num_threads = 1;
37 asio_wpool.stack_size = 65536;
38 worker_pool_init(&asio_wpool);
42 asio_cleanup_unlocked(void)
48 worker_pool_cleanup(&asio_wpool);
52 asio_init_queue(struct asio_queue *q)
54 pthread_mutex_lock(&asio_init_lock);
56 pthread_mutex_unlock(&asio_init_lock);
58 DBG("ASIO: New queue %p", q);
59 ASSERT(q->buffer_size);
60 q->allocated_requests = 0;
61 q->running_requests = 0;
62 q->running_writebacks = 0;
64 clist_init(&q->idle_list);
65 clist_init(&q->done_list);
66 work_queue_init(&asio_wpool, &q->queue);
70 asio_cleanup_queue(struct asio_queue *q)
72 DBG("ASIO: Removing queue %p", q);
73 ASSERT(!q->running_requests);
74 ASSERT(!q->running_writebacks);
75 ASSERT(!q->allocated_requests);
76 ASSERT(clist_empty(&q->done_list));
78 struct asio_request *r;
79 while (r = clist_head(&q->idle_list))
81 clist_remove(&r->work.n);
82 big_free(r->buffer, q->buffer_size);
86 work_queue_cleanup(&q->queue);
88 pthread_mutex_lock(&asio_init_lock);
89 asio_cleanup_unlocked();
90 pthread_mutex_unlock(&asio_init_lock);
94 asio_get(struct asio_queue *q)
96 q->allocated_requests++;
97 struct asio_request *r = clist_head(&q->idle_list);
100 r = xmalloc_zero(sizeof(*r));
102 r->buffer = big_alloc(q->buffer_size);
103 DBG("ASIO: Got %p (new)", r);
107 clist_remove(&r->work.n);
108 DBG("ASIO: Got %p", r);
114 r->returned_errno = -1;
120 asio_raw_wait(struct asio_queue *q)
122 struct asio_request *r = (struct asio_request *) work_wait(&q->queue);
126 q->running_requests--;
127 if (r->op == ASIO_WRITE_BACK)
129 DBG("ASIO: Finished writeback %p", r);
131 die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno));
132 if (r->status != (int)r->len)
133 die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len);
134 q->running_writebacks--;
138 clist_add_tail(&q->done_list, &r->work.n);
143 asio_handler(struct worker_thread *t UNUSED, struct work *w)
145 struct asio_request *r = (struct asio_request *) w;
147 DBG("ASIO: Servicing %p (%s on fd=%d, len=%d)", r,
148 (char*[]) { "?", "READ", "WRITE", "WRITEBACK" }[r->op], r->fd, r->len);
153 r->status = read(r->fd, r->buffer, r->len);
156 case ASIO_WRITE_BACK:
157 r->status = write(r->fd, r->buffer, r->len);
160 die("ASIO: Got unknown request type %d", r->op);
162 r->returned_errno = errno;
163 DBG("ASIO: Finished %p (status=%d, errno=%d)", r, r->status, r->returned_errno);
167 asio_submit(struct asio_request *r)
169 struct asio_queue *q = r->queue;
170 DBG("ASIO: Submitting %p on queue %p", r, q);
171 ASSERT(r->op != ASIO_FREE);
172 ASSERT(!r->submitted);
173 if (r->op == ASIO_WRITE_BACK)
175 while (q->running_writebacks >= q->max_writebacks)
177 DBG("ASIO: Waiting for free writebacks");
178 if (!asio_raw_wait(q))
181 q->running_writebacks++;
183 q->running_requests++;
185 r->work.go = asio_handler;
186 r->work.returned = NULL;
187 work_submit(&q->queue, &r->work);
190 struct asio_request *
191 asio_wait(struct asio_queue *q)
193 struct asio_request *r;
194 while (!(r = clist_head(&q->done_list)))
196 DBG("ASIO: Waiting on queue %p", q);
197 if (!asio_raw_wait(q))
200 clist_remove(&r->work.n);
201 DBG("ASIO: Done %p", r);
206 asio_put(struct asio_request *r)
208 struct asio_queue *q = r->queue;
209 DBG("ASIO: Put %p", r);
210 ASSERT(!r->submitted);
211 ASSERT(q->allocated_requests);
212 clist_add_tail(&q->idle_list, &r->work.n);
213 q->allocated_requests--;
217 asio_sync(struct asio_queue *q)
219 DBG("ASIO: Syncing queue %p", q);
220 while (q->running_requests)
221 if (!asio_raw_wait(q))
230 struct asio_request *r;
232 q.buffer_size = 4096;
233 q.max_writebacks = 2;
243 r->len = q.buffer_size;
252 r->op = ASIO_WRITE_BACK;
270 for (uns i=0; i<10; i++)
273 r->op = ASIO_WRITE_BACK;
276 r->buffer[0] = 'A' + i;
293 asio_cleanup_queue(&q);