X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=lib%2Fasio.c;h=f8249515eb82c9f4cdc616396762eaeb83b4fffd;hb=a0040bd045e62e1f47852b1e1285fc0f0b34f78d;hp=f1996aaeb9d04c614370184f07f205a21529bff0;hpb=0f2719c181f3a8a9ba88ef76901f2115dee88c48;p=libucw.git diff --git a/lib/asio.c b/lib/asio.c index f1996aae..f8249515 100644 --- a/lib/asio.c +++ b/lib/asio.c @@ -11,6 +11,7 @@ #include "lib/lib.h" #include "lib/asio.h" +#include "lib/threads.h" #include #include @@ -20,19 +21,18 @@ static uns asio_num_users; static struct worker_pool asio_wpool; static void -asio_init(void) +asio_init_unlocked(void) { if (asio_num_users++) return; DBG("ASIO: INIT"); asio_wpool.num_threads = 1; - asio_wpool.stack_size = 65536; worker_pool_init(&asio_wpool); } static void -asio_cleanup(void) +asio_cleanup_unlocked(void) { if (--asio_num_users) return; @@ -44,13 +44,16 @@ asio_cleanup(void) void asio_init_queue(struct asio_queue *q) { - asio_init(); + ucwlib_lock(); + asio_init_unlocked(); + ucwlib_unlock(); DBG("ASIO: New queue %p", q); ASSERT(q->buffer_size); q->allocated_requests = 0; - q->allocated_writebacks = 0; q->running_requests = 0; + q->running_writebacks = 0; + q->use_count = 0; clist_init(&q->idle_list); clist_init(&q->done_list); work_queue_init(&asio_wpool, &q->queue); @@ -61,8 +64,8 @@ asio_cleanup_queue(struct asio_queue *q) { DBG("ASIO: Removing queue %p", q); ASSERT(!q->running_requests); + ASSERT(!q->running_writebacks); ASSERT(!q->allocated_requests); - ASSERT(!q->allocated_writebacks); ASSERT(clist_empty(&q->done_list)); struct asio_request *r; @@ -74,7 +77,10 @@ asio_cleanup_queue(struct asio_queue *q) } work_queue_cleanup(&q->queue); - asio_cleanup(); + + ucwlib_lock(); + asio_cleanup_unlocked(); + ucwlib_unlock(); } struct asio_request * @@ -95,6 +101,11 @@ asio_get(struct asio_queue *q) DBG("ASIO: Got %p", r); } r->op = ASIO_FREE; + r->fd = -1; + r->len = 0; + r->status = -1; + r->returned_errno = -1; + r->submitted = 0; return r; } @@ -104,6 +115,7 @@ asio_raw_wait(struct asio_queue *q) struct asio_request *r = (struct asio_request *) work_wait(&q->queue); if (!r) return 0; + r->submitted = 0; q->running_requests--; if (r->op == ASIO_WRITE_BACK) { @@ -112,7 +124,7 @@ asio_raw_wait(struct asio_queue *q) die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno)); if (r->status != (int)r->len) die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len); - q->allocated_writebacks--; + q->running_writebacks--; asio_put(r); } else @@ -120,36 +132,6 @@ asio_raw_wait(struct asio_queue *q) return 1; } -struct asio_request * -asio_get_writeback(struct asio_queue *q) -{ - while (q->allocated_writebacks >= q->max_writebacks) - { - DBG("ASIO: Waiting for free writeback request"); - if (!asio_raw_wait(q)) - ASSERT(0); - } - q->allocated_writebacks++; - struct asio_request *r = asio_get(q); - r->op = ASIO_WRITE_BACK; - return r; -} - -void -asio_turn_to_writeback(struct asio_request *r) -{ - struct asio_queue *q = r->queue; - ASSERT(r->op != ASIO_WRITE_BACK); - while (q->allocated_writebacks >= q->max_writebacks) - { - DBG("ASIO: Waiting for free writeback request"); - if (!asio_raw_wait(q)) - ASSERT(0); - } - q->allocated_writebacks++; - r->op = ASIO_WRITE_BACK; -} - static void asio_handler(struct worker_thread *t UNUSED, struct work *w) { @@ -180,9 +162,21 @@ asio_submit(struct asio_request *r) struct asio_queue *q = r->queue; DBG("ASIO: Submitting %p on queue %p", r, q); ASSERT(r->op != ASIO_FREE); + ASSERT(!r->submitted); + if (r->op == ASIO_WRITE_BACK) + { + while (q->running_writebacks >= q->max_writebacks) + { + DBG("ASIO: Waiting for free writebacks"); + if (!asio_raw_wait(q)) + ASSERT(0); + } + q->running_writebacks++; + } q->running_requests++; + r->submitted = 1; r->work.go = asio_handler; - r->work.returned = NULL; + r->work.priority = 0; work_submit(&q->queue, &r->work); } @@ -206,6 +200,7 @@ asio_put(struct asio_request *r) { struct asio_queue *q = r->queue; DBG("ASIO: Put %p", r); + ASSERT(!r->submitted); ASSERT(q->allocated_requests); clist_add_tail(&q->idle_list, &r->work.n); q->allocated_requests--; @@ -247,7 +242,7 @@ int main(void) asio_put(r); break; } - asio_turn_to_writeback(r); + r->op = ASIO_WRITE_BACK; r->fd = 1; r->len = r->status; asio_submit(r); @@ -267,7 +262,8 @@ int main(void) for (uns i=0; i<10; i++) { - r = asio_get_writeback(&q); + r = asio_get(&q); + r->op = ASIO_WRITE_BACK; r->fd = 1; r->len = 1; r->buffer[0] = 'A' + i;