]> mj.ucw.cz Git - libucw.git/commitdiff
Implemented an asynchronous I/O library module.
authorMartin Mares <mj@ucw.cz>
Sat, 25 Nov 2006 22:35:55 +0000 (23:35 +0100)
committerMartin Mares <mj@ucw.cz>
Sat, 25 Nov 2006 22:35:55 +0000 (23:35 +0100)
It's built at the top of the work queue module.

lib/Makefile
lib/asio.c [new file with mode: 0644]
lib/asio.h [new file with mode: 0644]
lib/asio.t [new file with mode: 0644]
lib/asyncio.h [deleted file]

index 53343a17603a0e69675d70a83bca22896ccdd1cd..d26b9c25d0206aa807e33355e59012214c5888c6 100644 (file)
@@ -30,7 +30,7 @@ LIBUCW_MODS= \
        qache \
        string \
        bbuf \
-       workqueue
+       workqueue asio
 
 LIBUCW_INCLUDES= \
        lib.h config.h math.h \
@@ -53,7 +53,8 @@ LIBUCW_INCLUDES= \
        lizard.h \
        md5.h \
        base64.h base224.h \
-       qache.h
+       qache.h \
+       workqueue.h asio.h
 
 ifdef CONFIG_OWN_REGEX
 include $(s)/lib/regex/Makefile
@@ -83,8 +84,9 @@ $(o)/lib/kmp-test: $(o)/lib/kmp-test.o $(LIBUCW) $(LIBCHARSET)
 $(o)/lib/ipaccess-test: $(o)/lib/ipaccess-test.o $(LIBUCW)
 
 $(o)/lib/workqueue-t: LIBS+=-lpthread  # FIXME
+$(o)/lib/asio-t: LIBS+=-lpthread  # FIXME
 
-TESTS+=$(addprefix $(o)/lib/,regex.test unicode-utf8.test hash-test.test mempool.test stkstring.test slists.test kmp-test.test bbuf.test)
+TESTS+=$(addprefix $(o)/lib/,regex.test unicode-utf8.test hash-test.test mempool.test stkstring.test slists.test kmp-test.test bbuf.test asio.test)
 $(o)/lib/regex.test: $(o)/lib/regex-t
 $(o)/lib/unicode-utf8.test: $(o)/lib/unicode-utf8-t
 $(o)/lib/hash-test.test: $(o)/lib/hash-test
@@ -94,6 +96,7 @@ $(o)/lib/bitops.test: $(o)/lib/bit-ffs-t $(o)/lib/bit-fls-t
 $(o)/lib/slists.test: $(o)/lib/slists-t
 $(o)/lib/kmp-test.test: $(o)/lib/kmp-test
 $(o)/lib/bbuf.test: $(o)/lib/bbuf-t
+$(o)/lib/asio.test: $(o)/lib/asio-t
 
 INCLUDES+=$(o)/lib/.include-stamp
 $(o)/lib/.include-stamp: $(addprefix $(s)/lib/,$(LIBUCW_INCLUDES))
diff --git a/lib/asio.c b/lib/asio.c
new file mode 100644 (file)
index 0000000..f1996aa
--- /dev/null
@@ -0,0 +1,294 @@
+/*
+ *     UCW Library -- Asynchronous I/O
+ *
+ *     (c) 2006 Martin Mares <mj@ucw.cz>
+ *
+ *     This software may be freely distributed and used according to the terms
+ *     of the GNU Lesser General Public License.
+ */
+
+#undef LOCAL_DEBUG
+
+#include "lib/lib.h"
+#include "lib/asio.h"
+
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+
+static uns asio_num_users;
+static struct worker_pool asio_wpool;
+
+static void
+asio_init(void)
+{
+  if (asio_num_users++)
+    return;
+
+  DBG("ASIO: INIT");
+  asio_wpool.num_threads = 1;
+  asio_wpool.stack_size = 65536;
+  worker_pool_init(&asio_wpool);
+}
+
+static void
+asio_cleanup(void)
+{
+  if (--asio_num_users)
+    return;
+
+  DBG("ASIO: CLEANUP");
+  worker_pool_cleanup(&asio_wpool);
+}
+
+void
+asio_init_queue(struct asio_queue *q)
+{
+  asio_init();
+
+  DBG("ASIO: New queue %p", q);
+  ASSERT(q->buffer_size);
+  q->allocated_requests = 0;
+  q->allocated_writebacks = 0;
+  q->running_requests = 0;
+  clist_init(&q->idle_list);
+  clist_init(&q->done_list);
+  work_queue_init(&asio_wpool, &q->queue);
+}
+
+void
+asio_cleanup_queue(struct asio_queue *q)
+{
+  DBG("ASIO: Removing queue %p", q);
+  ASSERT(!q->running_requests);
+  ASSERT(!q->allocated_requests);
+  ASSERT(!q->allocated_writebacks);
+  ASSERT(clist_empty(&q->done_list));
+
+  struct asio_request *r;
+  while (r = clist_head(&q->idle_list))
+    {
+      clist_remove(&r->work.n);
+      big_free(r->buffer, q->buffer_size);
+      xfree(r);
+    }
+
+  work_queue_cleanup(&q->queue);
+  asio_cleanup();
+}
+
+struct asio_request *
+asio_get(struct asio_queue *q)
+{
+  q->allocated_requests++;
+  struct asio_request *r = clist_head(&q->idle_list);
+  if (!r)
+    {
+      r = xmalloc_zero(sizeof(*r));
+      r->queue = q;
+      r->buffer = big_alloc(q->buffer_size);
+      DBG("ASIO: Got %p (new)", r);
+    }
+  else
+    {
+      clist_remove(&r->work.n);
+      DBG("ASIO: Got %p", r);
+    }
+  r->op = ASIO_FREE;
+  return r;
+}
+
+static int
+asio_raw_wait(struct asio_queue *q)
+{
+  struct asio_request *r = (struct asio_request *) work_wait(&q->queue);
+  if (!r)
+    return 0;
+  q->running_requests--;
+  if (r->op == ASIO_WRITE_BACK)
+    {
+      DBG("ASIO: Finished writeback %p", r);
+      if (r->status < 0)
+       die("Asynchronous write to fd %d failed: %s", r->fd, strerror(r->returned_errno));
+      if (r->status != (int)r->len)
+       die("Asynchronous write to fd %d wrote only %d bytes out of %d", r->fd, r->status, r->len);
+      q->allocated_writebacks--;
+      asio_put(r);
+    }
+  else
+    clist_add_tail(&q->done_list, &r->work.n);
+  return 1;
+}
+
+struct asio_request *
+asio_get_writeback(struct asio_queue *q)
+{
+  while (q->allocated_writebacks >= q->max_writebacks)
+    {
+      DBG("ASIO: Waiting for free writeback request");
+      if (!asio_raw_wait(q))
+       ASSERT(0);
+    }
+  q->allocated_writebacks++;
+  struct asio_request *r = asio_get(q);
+  r->op = ASIO_WRITE_BACK;
+  return r;
+}
+
+void
+asio_turn_to_writeback(struct asio_request *r)
+{
+  struct asio_queue *q = r->queue;
+  ASSERT(r->op != ASIO_WRITE_BACK);
+  while (q->allocated_writebacks >= q->max_writebacks)
+    {
+      DBG("ASIO: Waiting for free writeback request");
+      if (!asio_raw_wait(q))
+       ASSERT(0);
+    }
+  q->allocated_writebacks++;
+  r->op = ASIO_WRITE_BACK;
+}
+
+static void
+asio_handler(struct worker_thread *t UNUSED, struct work *w)
+{
+  struct asio_request *r = (struct asio_request *) w;
+
+  DBG("ASIO: Servicing %p (%s on fd=%d, len=%d)", r,
+      (char*[]) { "?", "READ", "WRITE", "WRITEBACK" }[r->op], r->fd, r->len);
+  errno = 0;
+  switch (r->op)
+    {
+    case ASIO_READ:
+      r->status = read(r->fd, r->buffer, r->len);
+      break;
+    case ASIO_WRITE:
+    case ASIO_WRITE_BACK:
+      r->status = write(r->fd, r->buffer, r->len);
+      break;
+    default:
+      die("ASIO: Got unknown request type %d", r->op);
+    }
+  r->returned_errno = errno;
+  DBG("ASIO: Finished %p (status=%d, errno=%d)", r, r->status, r->returned_errno);
+}
+
+void
+asio_submit(struct asio_request *r)
+{
+  struct asio_queue *q = r->queue;
+  DBG("ASIO: Submitting %p on queue %p", r, q);
+  ASSERT(r->op != ASIO_FREE);
+  q->running_requests++;
+  r->work.go = asio_handler;
+  r->work.returned = NULL;
+  work_submit(&q->queue, &r->work);
+}
+
+struct asio_request *
+asio_wait(struct asio_queue *q)
+{
+  struct asio_request *r;
+  while (!(r = clist_head(&q->done_list)))
+    {
+      DBG("ASIO: Waiting on queue %p", q);
+      if (!asio_raw_wait(q))
+       return NULL;
+    }
+  clist_remove(&r->work.n);
+  DBG("ASIO: Done %p", r);
+  return r;
+}
+
+void
+asio_put(struct asio_request *r)
+{
+  struct asio_queue *q = r->queue;
+  DBG("ASIO: Put %p", r);
+  ASSERT(q->allocated_requests);
+  clist_add_tail(&q->idle_list, &r->work.n);
+  q->allocated_requests--;
+}
+
+void
+asio_sync(struct asio_queue *q)
+{
+  DBG("ASIO: Syncing queue %p", q);
+  while (q->running_requests)
+    if (!asio_raw_wait(q))
+      ASSERT(0);
+}
+
+#ifdef TEST
+
+int main(void)
+{
+  struct asio_queue q;
+  struct asio_request *r;
+
+  q.buffer_size = 4096;
+  q.max_writebacks = 2;
+  asio_init_queue(&q);
+
+#if 0
+
+  for (;;)
+    {
+      r = asio_get(&q);
+      r->op = ASIO_READ;
+      r->fd = 0;
+      r->len = q.buffer_size;
+      asio_submit(r);
+      r = asio_wait(&q);
+      ASSERT(r);
+      if (r->status <= 0)
+       {
+         asio_put(r);
+         break;
+       }
+      asio_turn_to_writeback(r);
+      r->fd = 1;
+      r->len = r->status;
+      asio_submit(r);
+    }
+  asio_sync(&q);
+
+#else
+
+  r = asio_get(&q);
+  r->op = ASIO_READ;
+  r->fd = 0;
+  r->len = 1;
+  asio_submit(r);
+  r = asio_wait(&q);
+  ASSERT(r);
+  asio_put(r);
+
+  for (uns i=0; i<10; i++)
+    {
+      r = asio_get_writeback(&q);
+      r->fd = 1;
+      r->len = 1;
+      r->buffer[0] = 'A' + i;
+      asio_submit(r);
+    }
+  asio_sync(&q);
+
+  r = asio_get(&q);
+  r->op = ASIO_WRITE;
+  r->fd = 1;
+  r->len = 1;
+  r->buffer[0] = '\n';
+  asio_submit(r);
+  r = asio_wait(&q);
+  ASSERT(r);
+  asio_put(r);
+
+#endif
+
+  asio_cleanup_queue(&q);
+  return 0;
+}
+
+#endif
diff --git a/lib/asio.h b/lib/asio.h
new file mode 100644 (file)
index 0000000..31c1f5a
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ *     UCW Library -- Asynchronous I/O
+ *
+ *     (c) 2006 Martin Mares <mj@ucw.cz>
+ *
+ *     This software may be freely distributed and used according to the terms
+ *     of the GNU Lesser General Public License.
+ */
+
+#ifndef _UCW_ASIO_H
+#define _UCW_ASIO_H
+
+#include "lib/workqueue.h"
+#include "lib/clists.h"
+
+/* FIXME: Comment, especially on request ordering */
+
+struct asio_queue {
+  uns buffer_size;                     // How large buffers do we use [user-settable]
+  uns max_writebacks;                  // Maximum number of writeback requests active [user-settable]
+  uns allocated_requests;
+  uns allocated_writebacks;
+  uns running_requests;
+  clist idle_list;                     // Recycled requests waiting for get
+  clist done_list;                     // Finished requests
+  struct work_queue queue;
+};
+
+enum asio_op {
+  ASIO_FREE,
+  ASIO_READ,
+  ASIO_WRITE,
+  ASIO_WRITE_BACK,                     // Write with no success notification
+};
+
+struct asio_request {
+  struct work work;                    // asio_requests are internally just work nodes
+  struct asio_queue *queue;
+  byte *buffer;
+  int fd;
+  enum asio_op op;
+  uns len;
+  int status;
+  int returned_errno;
+};
+
+void asio_init_queue(struct asio_queue *q);                    // Initialize a new queue
+void asio_cleanup_queue(struct asio_queue *q);
+struct asio_request *asio_get(struct asio_queue *q);           // Get an empty request (not for writeback)
+struct asio_request *asio_get_writeback(struct asio_queue *q); // Get an empty writeback request
+void asio_turn_to_writeback(struct asio_request *r);           // Convert a request allocated as non-writeback to writeback
+void asio_submit(struct asio_request *r);                      // Submit the request
+struct asio_request *asio_wait(struct asio_queue *q);          // Wait for the first finished request, NULL if no more
+void asio_put(struct asio_request *r);                         // Return a finished request for recycling
+void asio_sync(struct asio_queue *q);                          // Wait until all requests are finished
+
+#endif /* !_UCW_ASIO_H */
diff --git a/lib/asio.t b/lib/asio.t
new file mode 100644 (file)
index 0000000..4382107
--- /dev/null
@@ -0,0 +1,4 @@
+# Tests for asynchronous I/O
+
+Run:   echo y | obj/lib/asio-t
+Out:   ABCDEFGHIJ
diff --git a/lib/asyncio.h b/lib/asyncio.h
deleted file mode 100644 (file)
index d230f68..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *     UCW Library -- Asynchronous I/O
- *
- *     (c) 2006 Martin Mares <mj@ucw.cz>
- *
- *     This software may be freely distributed and used according to the terms
- *     of the GNU Lesser General Public License.
- */
-
-#ifndef _UCW_ASYNCIO_H
-#define _UCW_ASYNCIO_H
-
-#include "lib/semaphore.h"
-#include "lib/clists.h"
-
-/* FIXME: Comment, especially on request ordering */
-
-struct asio_queue {
-  uns max_requests;                    // Maximum number of requests allowed on this queue [user-settable]
-  uns buffer_size;                     // How large buffers do we use [user-settable]
-  uns allocated_requests;
-  uns running_requests;
-  clist idle_list;                     // Recycled requests waiting for get
-  clist done_list;                     // Requests returned from the worker threads
-  sem_t *done_sem;                     // ... and how many of them
-  clist wait_list;                     // Requests available for wait()
-};
-
-enum asio_op {
-  ASIO_FREE,
-  ASIO_READ,
-  ASIO_WRITE,
-  ASIO_BACKGROUND_WRITE,               // Write with no success notification
-};
-
-struct asio_request {
-  cnode n;
-  struct asio_queue *queue;
-  byte *buffer;
-  int fd;
-  enum asio_op op;
-  uns len;
-  int status;
-};
-
-void asio_init(void);
-void asio_cleanup(void);
-
-void asio_init_queue(struct asio_queue *q);                    // Initialize a new queue
-struct asio_request *asio_get(struct asio_queue *q);           // Get an empty request
-void asio_submit(struct asio_request *r);                      // Submit the request
-struct asio_request *asio_wait(struct asio_queue *q);          // Wait for the first finished request, NULL if no more
-void asio_put(struct asio_request *r);                         // Return a finished request for recycling
-struct asio_request *asio_get_bg(struct asio_queue *q);                // Get and if there are no free requests, wait for background writes to finish
-void asio_sync(struct asio_queue *q);                          // Wait for all requests to finish
-
-#endif /* !_UCW_ASYNCIO_H */