/* FastO with atomic writes for multi-threaded programs */
-struct fastbuf *fbatomic_create(byte *name, struct fastbuf *master, uns bufsize, int record_len);
-void fbatomic_checkpoint(struct fastbuf *b);
+struct fb_atomic {
+ struct fastbuf fb;
+ struct fb_atomic_file *af;
+ byte *expected_max_bptr;
+ uns slack_size;
+};
+#define FB_ATOMIC(f) ((struct fb_atomic *)(f)->is_fastbuf)
+
+struct fastbuf *fbatomic_open(byte *name, struct fastbuf *master, uns bufsize, int record_len);
+void fbatomic_internal_write(struct fastbuf *b);
+
+static inline void
+fbatomic_commit(struct fastbuf *b)
+{
+ if (b->bptr >= ((struct fb_atomic *)b)->expected_max_bptr)
+ fbatomic_internal_write(b);
+}
/* Configuring stream parameters */
* if the free space in the buffer falls below the expected maximum record
* length.
*
- * fbatomic_create() is called with the following parameters:
+ * fbatomic_open() is called with the following parameters:
* name - name of the file to open
* master - fbatomic for the master thread or NULL if it's the first open
* bufsize - initial buffer size
* or -(expected maximum record length) for variable-sized ones.
*/
+#define LOCAL_DEBUG
+
#include "lib/lib.h"
#include "lib/fastbuf.h"
#include "lib/lfs.h"
#include <fcntl.h>
#include <unistd.h>
-struct fb_atomic {
- struct fastbuf fb;
- int fd; /* File descriptor */
+struct fb_atomic_file {
+ int fd;
+ int use_count;
+ int record_len;
uns locked;
- uns expected_max_len;
- byte *expected_max_bptr;
+ byte name[1];
};
-#define FB_ATOMIC(f) ((struct fb_atomic *)(f)->is_fastbuf)
+
+void
+fbatomic_internal_write(struct fastbuf *f)
+{
+ struct fb_atomic_file *af = FB_ATOMIC(f)->af;
+ int size = f->bptr - f->buffer;
+ if (size)
+ {
+ ASSERT(af->record_len < 0 || !(size % af->record_len));
+ int res = write(af->fd, f->buffer, size);
+ if (res < 0)
+ die("Error writing %s: %m", f->name);
+ if (res != size)
+ die("Unexpected partial write to %s: written only %d bytes of %d", f->name, res, size);
+ f->bptr = f->buffer;
+ }
+}
+
+static void
+fbatomic_spout(struct fastbuf *f)
+{
+ if (f->bptr < f->bufend) /* Explicit flushes should be ignored */
+ return;
+
+ struct fb_atomic *F = FB_ATOMIC(f);
+ if (F->af->locked)
+ {
+ uns written = f->bptr - f->buffer;
+ uns size = f->bufend - f->buffer + F->slack_size;
+ F->slack_size *= 2;
+ DBG("Reallocating buffer for atomic file %s with slack %d", f->name, F->slack_size);
+ f->buffer = xrealloc(f->buffer, size);
+ f->bufend = f->buffer + size;
+ f->bptr = f->buffer + written;
+ F->expected_max_bptr = f->bufend - F->slack_size;
+ }
+ else
+ fbatomic_internal_write(f);
+}
+
+static void
+fbatomic_close(struct fastbuf *f)
+{
+ struct fb_atomic_file *af = FB_ATOMIC(f)->af;
+ fbatomic_internal_write(f); /* Need to flush explicitly, because the file can be locked */
+ if (!--af->use_count)
+ {
+ close(af->fd);
+ xfree(af);
+ }
+ xfree(f);
+}
+
+struct fastbuf *
+fbatomic_open(byte *name, struct fastbuf *master, uns bufsize, int record_len)
+{
+ struct fb_atomic *F = xmalloc_zero(sizeof(*F));
+ struct fastbuf *f = &F->fb;
+ struct fb_atomic_file *af;
+ if (master)
+ {
+ af = FB_ATOMIC(master)->af;
+ af->use_count++;
+ ASSERT(af->record_len == record_len);
+ }
+ else
+ {
+ af = xmalloc_zero(sizeof(*af) + strlen(name));
+ if ((af->fd = sh_open(name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666)) < 0)
+ die("Cannot create %s: %m", name);
+ af->use_count = 1;
+ af->record_len = record_len;
+ af->locked = (record_len < 0);
+ strcpy(af->name, name);
+ }
+ F->af = af;
+ if (record_len > 0 && bufsize % record_len)
+ bufsize += record_len - (bufsize % record_len);
+ f->buffer = xmalloc(bufsize);
+ f->bufend = f->buffer + bufsize;
+ F->slack_size = (record_len < 0) ? -record_len : 0;
+ ASSERT(bufsize > F->slack_size);
+ F->expected_max_bptr = f->bufend - F->slack_size;
+ f->bptr = f->bstop = f->buffer;
+ f->name = af->name;
+ f->spout = fbatomic_spout;
+ f->close = fbatomic_close;
+ return f;
+}
+
+#ifdef TEST
+
+int main(int argc UNUSED, char **argv UNUSED)
+{
+ struct fastbuf *f, *g;
+
+ log(L_INFO, "Testing block writes");
+ f = fbatomic_open("test", NULL, 16, 4);
+ for (u32 i=0; i<17; i++)
+ bwrite(f, &i, 4);
+ bclose(f);
+
+ log(L_INFO, "Testing interleaved var-size writes");
+ f = fbatomic_open("test2", NULL, 23, -5);
+ g = fbatomic_open("test2", f, 23, -5);
+ for (int i=0; i<100; i++)
+ {
+ struct fastbuf *x = (i%2) ? g : f;
+ bprintf(x, "%c<%d>\n", "fg"[i%2], ((259309*i) % 1000000) >> (i % 8));
+ fbatomic_commit(x);
+ }
+ bclose(f);
+ bclose(g);
+
+ return 0;
+}
+
+#endif