]> mj.ucw.cz Git - libucw.git/blobdiff - ucw/fb-atomic.c
Heap: Interface cleanup
[libucw.git] / ucw / fb-atomic.c
index b5fb6223a01fec4c5d74554a465cdced73c5f28f..cd0217468279090b61ea69a19f816715254eecd9 100644 (file)
@@ -7,41 +7,36 @@
  *     of the GNU Lesser General Public License.
  */
 
-/*
- *     This fastbuf backend is intended for cases where several threads
- *     of a single program append records to a single file and while the
- *     record can mix in an arbitrary way, the bytes inside a single
- *     record must remain uninterrupted.
- *
- *     In case of files with fixed record size, we just allocate the
- *     buffer to hold a whole number of records and take advantage
- *     of the atomicity of the write() system call.
- *
- *     With variable-sized records, we need another solution: when
- *     writing a record, we keep the fastbuf in a locked state, which
- *     prevents buffer flushing (and if the buffer becomes full, we extend it),
- *     and we wait for an explicit commit operation which write()s the buffer
- *     if the free space in the buffer falls below the expected maximum record
- *     length.
- *
- *     fbatomic_open() is called with the following parameters:
- *         name - name of the file to open
- *         master - fbatomic for the master thread or NULL if it's the first open
- *         bufsize - initial buffer size
- *         record_len - record length for fixed-size records;
- *             or -(expected maximum record length) for variable-sized ones.
- */
-
-#define LOCAL_DEBUG
-
-#include "ucw/lib.h"
-#include "ucw/fastbuf.h"
-#include "ucw/lfs.h"
+#include <ucw/lib.h>
+#include <ucw/fastbuf.h>
+#include <ucw/io.h>
+#include <ucw/conf.h>
+#include <ucw/trans.h>
 
 #include <string.h>
 #include <fcntl.h>
 #include <unistd.h>
 
+static uns trace;
+
+#ifndef TEST
+
+static struct cf_section fbatomic_config = {
+  CF_ITEMS {
+    CF_UNS("Trace", &trace)
+  }
+};
+
+static void CONSTRUCTOR fbatomic_init_config(void)
+{
+  cf_declare_section("FBAtomic", &fbatomic_config, 1);
+}
+
+#endif
+
+#define FB_ATOMIC(f) ((struct fb_atomic *)(f))
+#define TRACE(m...) do { if(trace) msg(L_DEBUG, "FB_ATOMIC: " m); } while(0)
+
 struct fb_atomic_file {
   int fd;
   int use_count;
@@ -60,9 +55,9 @@ fbatomic_internal_write(struct fastbuf *f)
       ASSERT(af->record_len < 0 || !(size % af->record_len));
       int res = write(af->fd, f->buffer, size);
       if (res < 0)
-       die("Error writing %s: %m", f->name);
+       bthrow(f, "write", "Error writing %s: %m", f->name);
       if (res != size)
-       die("Unexpected partial write to %s: written only %d bytes of %d", f->name, res, size);
+       bthrow(f, "write", "Unexpected partial write to %s: written only %d bytes of %d", f->name, res, size);
       f->bptr = f->buffer;
     }
 }
@@ -79,10 +74,11 @@ fbatomic_spout(struct fastbuf *f)
       uns written = f->bptr - f->buffer;
       uns size = f->bufend - f->buffer + F->slack_size;
       F->slack_size *= 2;
-      DBG("Reallocating buffer for atomic file %s with slack %d", f->name, F->slack_size);
+      TRACE("Reallocating buffer for atomic file %s with slack %d", f->name, F->slack_size);
       f->buffer = xrealloc(f->buffer, size);
       f->bufend = f->buffer + size;
       f->bptr = f->buffer + written;
+      f->bstop = f->buffer;
       F->expected_max_bptr = f->bufend - F->slack_size;
     }
   else
@@ -93,7 +89,8 @@ static void
 fbatomic_close(struct fastbuf *f)
 {
   struct fb_atomic_file *af = FB_ATOMIC(f)->af;
-  fbatomic_internal_write(f);  /* Need to flush explicitly, because the file can be locked */
+  if (!(f->flags & FB_DEAD))
+    fbatomic_internal_write(f);        /* Need to flush explicitly, because the file can be locked */
   if (!--af->use_count)
     {
       close(af->fd);
@@ -116,9 +113,11 @@ fbatomic_open(const char *name, struct fastbuf *master, uns bufsize, int record_
     }
   else
     {
+      int fd = ucw_open(name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666);
+      if (fd < 0)
+       trans_throw("ucw.fb.open", NULL, "Cannot create %s: %m", name);
       af = xmalloc_zero(sizeof(*af) + strlen(name));
-      if ((af->fd = ucw_open(name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666)) < 0)
-       die("Cannot create %s: %m", name);
+      af->fd = fd;
       af->use_count = 1;
       af->record_len = record_len;
       af->locked = (record_len < 0);
@@ -145,13 +144,16 @@ int main(int argc UNUSED, char **argv UNUSED)
 {
   struct fastbuf *f, *g;
 
-  log(L_INFO, "Testing block writes");
+  // Always trace in the test
+  trace = 1;
+
+  msg(L_INFO, "Testing block writes");
   f = fbatomic_open("test", NULL, 16, 4);
   for (u32 i=0; i<17; i++)
     bwrite(f, &i, 4);
   bclose(f);
 
-  log(L_INFO, "Testing interleaved var-size writes");
+  msg(L_INFO, "Testing interleaved var-size writes");
   f = fbatomic_open("test2", NULL, 23, -5);
   g = fbatomic_open("test2", f, 23, -5);
   for (int i=0; i<100; i++)