]> mj.ucw.cz Git - libucw.git/blobdiff - lib/fb-direct.c
Merge with git+ssh://cvs.ucw.cz/projects/sherlock/GIT/sherlock.git#dev-sorter
[libucw.git] / lib / fb-direct.c
index a4277ec929506c760aead5f8f70dc733536c207d..865bbb44a4114aa276d034a66170d8a0eaeefc58 100644 (file)
@@ -1,7 +1,7 @@
 /*
  *     UCW Library -- Fast Buffered I/O on O_DIRECT Files
  *
- *     (c) 2006 Martin Mares <mj@ucw.cz>
+ *     (c) 2006--2007 Martin Mares <mj@ucw.cz>
  *
  *     This software may be freely distributed and used according to the terms
  *     of the GNU Lesser General Public License.
@@ -19,9 +19,8 @@
  *         and take care of locking.
  *
  *     FIXME: what if the OS doesn't support O_DIRECT?
- *     FIXME: doc: don't mix threads
  *     FIXME: unaligned seeks and partial writes?
- *     FIXME: merge with other file-oriented fastbufs
+ *     FIXME: append to unaligned file
  */
 
 #undef LOCAL_DEBUG
 #include "lib/lfs.h"
 #include "lib/asio.h"
 #include "lib/conf.h"
+#include "lib/threads.h"
 
 #include <string.h>
 #include <fcntl.h>
 #include <unistd.h>
-#include <pthread.h>
+#include <stdio.h>
 
-static uns fbdir_cheat;
-static uns fbdir_buffer_size = 65536;
-static uns fbdir_read_ahead = 1;
-static uns fbdir_write_back = 1;
+uns fbdir_cheat;
 
 static struct cf_section fbdir_cf = {
   CF_ITEMS {
     CF_UNS("Cheat", &fbdir_cheat),
-    CF_UNS("BufferSize", &fbdir_buffer_size),
-    CF_UNS("ReadAhead", &fbdir_read_ahead),
-    CF_UNS("WriteBack", &fbdir_write_back),
     CF_END
   }
 };
 
 #define FBDIR_ALIGN 512
 
-static pthread_key_t fbdir_queue_key;
-
 enum fbdir_mode {                              // Current operating mode
     M_NULL,
     M_READ,
@@ -65,7 +57,7 @@ enum fbdir_mode {                             // Current operating mode
 struct fb_direct {
   struct fastbuf fb;
   int fd;                                      // File descriptor
-  int is_temp_file;                            // 0=normal file, 1=temporary file, delete on close, -1=shared FD
+  int is_temp_file;
   struct asio_queue *io_queue;                 // I/O queue to use
   struct asio_queue *user_queue;               // If io_queue was supplied by the user
   struct asio_request *pending_read;
@@ -80,8 +72,6 @@ static void CONSTRUCTOR
 fbdir_global_init(void)
 {
   cf_declare_section("FBDirect", &fbdir_cf, 0);
-  if (pthread_key_create(&fbdir_queue_key, NULL) < 0)
-    die("Cannot create fbdir_queue_key: %m");
 }
 
 static void
@@ -199,7 +189,7 @@ fbdir_spout(struct fastbuf *f)
          r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
          asio_submit(r);
          asio_sync(F->io_queue);
-         DBG("FB-DIRECT: Truncating at %Ld", (long long)f->pos);
+         DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos);
          if (sh_ftruncate(F->fd, f->pos) < 0)
            die("Error truncating %s: %m", f->name);
        }
@@ -214,32 +204,34 @@ fbdir_spout(struct fastbuf *f)
   F->active_buffer = r;
 }
 
-static void
+static int
 fbdir_seek(struct fastbuf *f, sh_off_t pos, int whence)
 {
-  DBG("FB-DIRECT: Seek %Ld %d", (long long)pos, whence);
+  DBG("FB-DIRECT: Seek %llu %d", (long long)pos, whence);
 
   if (whence == SEEK_SET && pos == f->pos)
-    return;
+    return 1;
 
   fbdir_change_mode(FB_DIRECT(f), M_NULL);                     // Wait for all async requests to finish
   sh_off_t l = sh_seek(FB_DIRECT(f)->fd, pos, whence);
   if (l < 0)
-    die("lseek on %s: %m", f->name);
+    return 0;
   f->pos = l;
+  return 1;
 }
 
 static struct asio_queue *
-fbdir_get_io_queue(void)
+fbdir_get_io_queue(uns buffer_size, uns write_back)
 {
-  struct asio_queue *q = pthread_getspecific(fbdir_queue_key);
+  struct ucwlib_context *ctx = ucwlib_thread_context();
+  struct asio_queue *q = ctx->io_queue;
   if (!q)
     {
       q = xmalloc_zero(sizeof(struct asio_queue));
-      q->buffer_size = fbdir_buffer_size;
-      q->max_writebacks = fbdir_write_back;
+      q->buffer_size = buffer_size;
+      q->max_writebacks = write_back;
       asio_init_queue(q);
-      pthread_setspecific(fbdir_queue_key, q);
+      ctx->io_queue = q;
     }
   q->use_count++;
   DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
@@ -249,14 +241,15 @@ fbdir_get_io_queue(void)
 static void
 fbdir_put_io_queue(void)
 {
-  struct asio_queue *q = pthread_getspecific(fbdir_queue_key);
+  struct ucwlib_context *ctx = ucwlib_thread_context();
+  struct asio_queue *q = ctx->io_queue;
   ASSERT(q);
   DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
   if (!--q->use_count)
     {
       asio_cleanup_queue(q);
       xfree(q);
-      pthread_setspecific(fbdir_queue_key, NULL);
+      ctx->io_queue = NULL;
     }
 }
 
@@ -271,33 +264,28 @@ fbdir_close(struct fastbuf *f)
   if (!F->user_queue)
     fbdir_put_io_queue();
 
-  switch (F->is_temp_file)
-    {
-    case 1:
-      if (unlink(f->name) < 0)
-       log(L_ERROR, "unlink(%s): %m", f->name);
-    case 0:
-      close(F->fd);
-    }
-
+  bclose_file_helper(f, F->fd, F->is_temp_file);
   xfree(f);
 }
 
 static int
 fbdir_config(struct fastbuf *f, uns item, int value)
 {
+  int orig;
+
   switch (item)
     {
     case BCONFIG_IS_TEMP_FILE:
+      orig = FB_DIRECT(f)->is_temp_file;
       FB_DIRECT(f)->is_temp_file = value;
-      return 0;
+      return orig;
     default:
       return -1;
     }
 }
 
-static struct fastbuf *
-fbdir_open_internal(byte *name, int fd, struct asio_queue *q)
+struct fastbuf *
+fbdir_open_fd_internal(int fd, const char *name, struct asio_queue *q, uns buffer_size, uns read_ahead UNUSED, uns write_back)
 {
   int namelen = strlen(name) + 1;
   struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
@@ -311,7 +299,7 @@ fbdir_open_internal(byte *name, int fd, struct asio_queue *q)
   if (q)
     F->io_queue = F->user_queue = q;
   else
-    F->io_queue = fbdir_get_io_queue();
+    F->io_queue = fbdir_get_io_queue(buffer_size, write_back);
   f->refill = fbdir_refill;
   f->spout = fbdir_spout;
   f->seek = fbdir_seek;
@@ -321,54 +309,20 @@ fbdir_open_internal(byte *name, int fd, struct asio_queue *q)
   return f;
 }
 
-struct fastbuf *
-fbdir_open_try(byte *name, uns mode, struct asio_queue *q)
-{
-  if (!fbdir_cheat)
-    mode |= O_DIRECT;
-  int fd = sh_open(name, mode, 0666);
-  if (fd < 0)
-    return NULL;
-  struct fastbuf *b = fbdir_open_internal(name, fd, q);
-  if (mode & O_APPEND)
-    fbdir_seek(b, 0, SEEK_END);
-  return b;
-}
-
-struct fastbuf *
-fbdir_open(byte *name, uns mode, struct asio_queue *q)
-{
-  struct fastbuf *b = fbdir_open_try(name, mode, q);
-  if (!b)
-    die("Unable to %s file %s: %m",
-       (mode & O_CREAT) ? "create" : "open", name);
-  return b;
-}
-
-struct fastbuf *
-fbdir_open_fd(int fd, struct asio_queue *q)
-{
-  byte x[32];
-
-  sprintf(x, "fd%d", fd);
-  if (!fbdir_cheat && fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_DIRECT) < 0)
-    log(L_WARN, "Cannot set O_DIRECT on fd %d: %m", fd);
-  return fbdir_open_internal(x, fd, q);
-}
-
 #ifdef TEST
 
 #include "lib/getopt.h"
 
 int main(int argc, char **argv)
 {
+  struct fb_params par = { .type = FB_DIRECT };
   struct fastbuf *f, *t;
 
   log_init(NULL);
   if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0)
     die("Hey, whaddya want?");
-  f = (optind < argc) ? fbdir_open(argv[optind++], O_RDONLY, NULL) : fbdir_open_fd(0, NULL);
-  t = (optind < argc) ? fbdir_open(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, NULL) : fbdir_open_fd(1, NULL);
+  f = (optind < argc) ? bopen_file(argv[optind++], O_RDONLY, &par) : bopen_fd(0, &par);
+  t = (optind < argc) ? bopen_file(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, &par) : bopen_fd(1, &par);
 
   bbcopy(f, t, ~0U);
   ASSERT(btell(f) == btell(t));