]> mj.ucw.cz Git - libucw.git/blobdiff - lib/fb-atomic.c
Several bits of the new sorter.
[libucw.git] / lib / fb-atomic.c
index 29e7c3039f7c81749b51a3240ebfad0f8294899a..73dd8b13ff826e6686fc5ae901e44761e479c1e9 100644 (file)
@@ -24,7 +24,7 @@
  *     if the free space in the buffer falls below the expected maximum record
  *     length.
  *
  *     if the free space in the buffer falls below the expected maximum record
  *     length.
  *
- *     fbatomic_create() is called with the following parameters:
+ *     fbatomic_open() is called with the following parameters:
  *         name - name of the file to open
  *         master - fbatomic for the master thread or NULL if it's the first open
  *         bufsize - initial buffer size
  *         name - name of the file to open
  *         master - fbatomic for the master thread or NULL if it's the first open
  *         bufsize - initial buffer size
@@ -32,6 +32,8 @@
  *             or -(expected maximum record length) for variable-sized ones.
  */
 
  *             or -(expected maximum record length) for variable-sized ones.
  */
 
+#define LOCAL_DEBUG
+
 #include "lib/lib.h"
 #include "lib/fastbuf.h"
 #include "lib/lfs.h"
 #include "lib/lib.h"
 #include "lib/fastbuf.h"
 #include "lib/lfs.h"
 #include <fcntl.h>
 #include <unistd.h>
 
 #include <fcntl.h>
 #include <unistd.h>
 
-struct fb_atomic {
-  struct fastbuf fb;
-  int fd;                              /* File descriptor */
+struct fb_atomic_file {
+  int fd;
+  int use_count;
+  int record_len;
   uns locked;
   uns locked;
-  uns expected_max_len;
-  byte *expected_max_bptr;
+  byte name[1];
 };
 };
-#define FB_ATOMIC(f) ((struct fb_atomic *)(f)->is_fastbuf)
+
+void
+fbatomic_internal_write(struct fastbuf *f)
+{
+  struct fb_atomic_file *af = FB_ATOMIC(f)->af;
+  int size = f->bptr - f->buffer;
+  if (size)
+    {
+      ASSERT(af->record_len < 0 || !(size % af->record_len));
+      int res = write(af->fd, f->buffer, size);
+      if (res < 0)
+       die("Error writing %s: %m", f->name);
+      if (res != size)
+       die("Unexpected partial write to %s: written only %d bytes of %d", f->name, res, size);
+      f->bptr = f->buffer;
+    }
+}
+
+static void
+fbatomic_spout(struct fastbuf *f)
+{
+  if (f->bptr < f->bufend)             /* Explicit flushes should be ignored */
+    return;
+
+  struct fb_atomic *F = FB_ATOMIC(f);
+  if (F->af->locked)
+    {
+      uns written = f->bptr - f->buffer;
+      uns size = f->bufend - f->buffer + F->slack_size;
+      F->slack_size *= 2;
+      DBG("Reallocating buffer for atomic file %s with slack %d", f->name, F->slack_size);
+      f->buffer = xrealloc(f->buffer, size);
+      f->bufend = f->buffer + size;
+      f->bptr = f->buffer + written;
+      F->expected_max_bptr = f->bufend - F->slack_size;
+    }
+  else
+    fbatomic_internal_write(f);
+}
+
+static void
+fbatomic_close(struct fastbuf *f)
+{
+  struct fb_atomic_file *af = FB_ATOMIC(f)->af;
+  fbatomic_internal_write(f);  /* Need to flush explicitly, because the file can be locked */
+  if (!--af->use_count)
+    {
+      close(af->fd);
+      xfree(af);
+    }
+  xfree(f);
+}
+
+struct fastbuf *
+fbatomic_open(byte *name, struct fastbuf *master, uns bufsize, int record_len)
+{
+  struct fb_atomic *F = xmalloc_zero(sizeof(*F));
+  struct fastbuf *f = &F->fb;
+  struct fb_atomic_file *af;
+  if (master)
+    {
+      af = FB_ATOMIC(master)->af;
+      af->use_count++;
+      ASSERT(af->record_len == record_len);
+    }
+  else
+    {
+      af = xmalloc_zero(sizeof(*af) + strlen(name));
+      if ((af->fd = sh_open(name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666)) < 0)
+       die("Cannot create %s: %m", name);
+      af->use_count = 1;
+      af->record_len = record_len;
+      af->locked = (record_len < 0);
+      strcpy(af->name, name);
+    }
+  F->af = af;
+  if (record_len > 0 && bufsize % record_len)
+    bufsize += record_len - (bufsize % record_len);
+  f->buffer = xmalloc(bufsize);
+  f->bufend = f->buffer + bufsize;
+  F->slack_size = (record_len < 0) ? -record_len : 0;
+  ASSERT(bufsize > F->slack_size);
+  F->expected_max_bptr = f->bufend - F->slack_size;
+  f->bptr = f->bstop = f->buffer;
+  f->name = af->name;
+  f->spout = fbatomic_spout;
+  f->close = fbatomic_close;
+  return f;
+}
+
+#ifdef TEST
+
+int main(int argc UNUSED, char **argv UNUSED)
+{
+  struct fastbuf *f, *g;
+
+  log(L_INFO, "Testing block writes");
+  f = fbatomic_open("test", NULL, 16, 4);
+  for (u32 i=0; i<17; i++)
+    bwrite(f, &i, 4);
+  bclose(f);
+
+  log(L_INFO, "Testing interleaved var-size writes");
+  f = fbatomic_open("test2", NULL, 23, -5);
+  g = fbatomic_open("test2", f, 23, -5);
+  for (int i=0; i<100; i++)
+    {
+      struct fastbuf *x = (i%2) ? g : f;
+      bprintf(x, "%c<%d>\n", "fg"[i%2], ((259309*i) % 1000000) >> (i % 8));
+      fbatomic_commit(x);
+    }
+  bclose(f);
+  bclose(g);
+
+  return 0;
+}
+
+#endif