]> mj.ucw.cz Git - libucw.git/blobdiff - lib/fb-direct.c
Let bconfig() return the original value.
[libucw.git] / lib / fb-direct.c
index a4277ec929506c760aead5f8f70dc733536c207d..2fa6ec7f7e16e12037f6a98d017d3c48fcced9e3 100644 (file)
@@ -19,7 +19,6 @@
  *         and take care of locking.
  *
  *     FIXME: what if the OS doesn't support O_DIRECT?
- *     FIXME: doc: don't mix threads
  *     FIXME: unaligned seeks and partial writes?
  *     FIXME: merge with other file-oriented fastbufs
  */
 #include "lib/lfs.h"
 #include "lib/asio.h"
 #include "lib/conf.h"
+#include "lib/threads.h"
 
 #include <string.h>
 #include <fcntl.h>
 #include <unistd.h>
-#include <pthread.h>
+#include <stdio.h>
 
-static uns fbdir_cheat;
-static uns fbdir_buffer_size = 65536;
-static uns fbdir_read_ahead = 1;
-static uns fbdir_write_back = 1;
+uns fbdir_cheat;
 
 static struct cf_section fbdir_cf = {
   CF_ITEMS {
     CF_UNS("Cheat", &fbdir_cheat),
-    CF_UNS("BufferSize", &fbdir_buffer_size),
-    CF_UNS("ReadAhead", &fbdir_read_ahead),
-    CF_UNS("WriteBack", &fbdir_write_back),
     CF_END
   }
 };
 
 #define FBDIR_ALIGN 512
 
-static pthread_key_t fbdir_queue_key;
-
 enum fbdir_mode {                              // Current operating mode
     M_NULL,
     M_READ,
@@ -80,8 +72,6 @@ static void CONSTRUCTOR
 fbdir_global_init(void)
 {
   cf_declare_section("FBDirect", &fbdir_cf, 0);
-  if (pthread_key_create(&fbdir_queue_key, NULL) < 0)
-    die("Cannot create fbdir_queue_key: %m");
 }
 
 static void
@@ -199,7 +189,7 @@ fbdir_spout(struct fastbuf *f)
          r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
          asio_submit(r);
          asio_sync(F->io_queue);
-         DBG("FB-DIRECT: Truncating at %Ld", (long long)f->pos);
+         DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos);
          if (sh_ftruncate(F->fd, f->pos) < 0)
            die("Error truncating %s: %m", f->name);
        }
@@ -214,32 +204,34 @@ fbdir_spout(struct fastbuf *f)
   F->active_buffer = r;
 }
 
-static void
+static int
 fbdir_seek(struct fastbuf *f, sh_off_t pos, int whence)
 {
-  DBG("FB-DIRECT: Seek %Ld %d", (long long)pos, whence);
+  DBG("FB-DIRECT: Seek %llu %d", (long long)pos, whence);
 
   if (whence == SEEK_SET && pos == f->pos)
-    return;
+    return 1;
 
   fbdir_change_mode(FB_DIRECT(f), M_NULL);                     // Wait for all async requests to finish
   sh_off_t l = sh_seek(FB_DIRECT(f)->fd, pos, whence);
   if (l < 0)
-    die("lseek on %s: %m", f->name);
+    return 0;
   f->pos = l;
+  return 1;
 }
 
 static struct asio_queue *
-fbdir_get_io_queue(void)
+fbdir_get_io_queue(uns buffer_size, uns write_back)
 {
-  struct asio_queue *q = pthread_getspecific(fbdir_queue_key);
+  struct ucwlib_context *ctx = ucwlib_thread_context();
+  struct asio_queue *q = ctx->io_queue;
   if (!q)
     {
       q = xmalloc_zero(sizeof(struct asio_queue));
-      q->buffer_size = fbdir_buffer_size;
-      q->max_writebacks = fbdir_write_back;
+      q->buffer_size = buffer_size;
+      q->max_writebacks = write_back;
       asio_init_queue(q);
-      pthread_setspecific(fbdir_queue_key, q);
+      ctx->io_queue = q;
     }
   q->use_count++;
   DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
@@ -249,14 +241,15 @@ fbdir_get_io_queue(void)
 static void
 fbdir_put_io_queue(void)
 {
-  struct asio_queue *q = pthread_getspecific(fbdir_queue_key);
+  struct ucwlib_context *ctx = ucwlib_thread_context();
+  struct asio_queue *q = ctx->io_queue;
   ASSERT(q);
   DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
   if (!--q->use_count)
     {
       asio_cleanup_queue(q);
       xfree(q);
-      pthread_setspecific(fbdir_queue_key, NULL);
+      ctx->io_queue = NULL;
     }
 }
 
@@ -275,7 +268,7 @@ fbdir_close(struct fastbuf *f)
     {
     case 1:
       if (unlink(f->name) < 0)
-       log(L_ERROR, "unlink(%s): %m", f->name);
+       msg(L_ERROR, "unlink(%s): %m", f->name);
     case 0:
       close(F->fd);
     }
@@ -286,18 +279,21 @@ fbdir_close(struct fastbuf *f)
 static int
 fbdir_config(struct fastbuf *f, uns item, int value)
 {
+  int orig;
+
   switch (item)
     {
     case BCONFIG_IS_TEMP_FILE:
+      orig = FB_DIRECT(f)->is_temp_file;
       FB_DIRECT(f)->is_temp_file = value;
-      return 0;
+      return orig;
     default:
       return -1;
     }
 }
 
-static struct fastbuf *
-fbdir_open_internal(byte *name, int fd, struct asio_queue *q)
+struct fastbuf *
+fbdir_open_fd_internal(int fd, const char *name, struct asio_queue *q, uns buffer_size, uns read_ahead UNUSED, uns write_back)
 {
   int namelen = strlen(name) + 1;
   struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
@@ -311,7 +307,7 @@ fbdir_open_internal(byte *name, int fd, struct asio_queue *q)
   if (q)
     F->io_queue = F->user_queue = q;
   else
-    F->io_queue = fbdir_get_io_queue();
+    F->io_queue = fbdir_get_io_queue(buffer_size, write_back);
   f->refill = fbdir_refill;
   f->spout = fbdir_spout;
   f->seek = fbdir_seek;
@@ -321,41 +317,6 @@ fbdir_open_internal(byte *name, int fd, struct asio_queue *q)
   return f;
 }
 
-struct fastbuf *
-fbdir_open_try(byte *name, uns mode, struct asio_queue *q)
-{
-  if (!fbdir_cheat)
-    mode |= O_DIRECT;
-  int fd = sh_open(name, mode, 0666);
-  if (fd < 0)
-    return NULL;
-  struct fastbuf *b = fbdir_open_internal(name, fd, q);
-  if (mode & O_APPEND)
-    fbdir_seek(b, 0, SEEK_END);
-  return b;
-}
-
-struct fastbuf *
-fbdir_open(byte *name, uns mode, struct asio_queue *q)
-{
-  struct fastbuf *b = fbdir_open_try(name, mode, q);
-  if (!b)
-    die("Unable to %s file %s: %m",
-       (mode & O_CREAT) ? "create" : "open", name);
-  return b;
-}
-
-struct fastbuf *
-fbdir_open_fd(int fd, struct asio_queue *q)
-{
-  byte x[32];
-
-  sprintf(x, "fd%d", fd);
-  if (!fbdir_cheat && fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_DIRECT) < 0)
-    log(L_WARN, "Cannot set O_DIRECT on fd %d: %m", fd);
-  return fbdir_open_internal(x, fd, q);
-}
-
 #ifdef TEST
 
 #include "lib/getopt.h"