]> mj.ucw.cz Git - libucw.git/commitdiff
Improved and cleaned up the bucket library. The original "single operation
authorMartin Mares <mj@ucw.cz>
Sun, 7 Dec 2003 14:23:58 +0000 (14:23 +0000)
committerMartin Mares <mj@ucw.cz>
Sun, 7 Dec 2003 14:23:58 +0000 (14:23 +0000)
pending per process" invariant was no longer feasible (and it caused several
problems in Shepherd).

Reading and writing of buckets now uses dynamically allocated fastbufs and
there can be any number of readers at any time, but only a single writer
(otherwise a deadlock would occur). Read streams are seekable, write streams
at least btell()-able.

Also removed the omnipresent global variables for start of current bucket
etc., each part (Find, Slurp, Create, Shakedown, ...) has its own state
variables.

Added some more sanity checks.

lib/bucket.c
lib/bucket.h

index 51ca9ccc36f30756bf56fd5833fc52e4271630d0..a513c7cf614b6b4f3aa9cab2b984179d2edbbb74 100644 (file)
 #include <sys/file.h>
 
 static int obuck_fd;
-static unsigned int obuck_remains, obuck_check_pad;
-static struct fastbuf *obuck_fb;
-static struct obuck_header obuck_hdr;
-static sh_off_t bucket_start, bucket_current;
+static struct obuck_header obuck_hdr, obuck_create_hdr;
+static sh_off_t bucket_find_pos;
+static struct fastbuf *obuck_write_fb;
 
 /*** Configuration ***/
 
@@ -49,9 +48,9 @@ static void CONSTRUCTOR obuck_init_config(void)
 /*** Internal operations ***/
 
 static void
-obuck_broken(char *msg)
+obuck_broken(char *msg, sh_off_t pos)
 {
-  die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start);
+  die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos);
 }
 
 /*
@@ -95,33 +94,57 @@ obuck_unlock(void)
 
 /*** FastIO emulation ***/
 
+struct fb_bucket {
+  struct fastbuf fb;
+  sh_off_t start_pos;
+  uns bucket_size;
+  byte buffer[0];
+};
+#define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf)
+
+static int obuck_fb_count;
+
+static void
+obuck_fb_close(struct fastbuf *f)
+{
+  obuck_fb_count--;
+  xfree(f);
+}
+
 /* We need to use pread/pwrite since we work on fd's shared between processes */
 
 static int
 obuck_fb_refill(struct fastbuf *f)
 {
-  unsigned limit = (obuck_io_buflen < obuck_remains) ? obuck_io_buflen : obuck_remains;
-  unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit;
-  int l;
+  uns remains, bufsize, size, datasize;
 
-  if (!limit)
+  remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
+  bufsize = f->bufend - f->buffer;
+  if (!remains)
     return 0;
-  l = sh_pread(obuck_fd, f->buffer, size, bucket_current);
+  sh_off_t start = FB_BUCKET(f)->start_pos;
+  sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
+  if (remains <= bufsize)
+    {
+      datasize = remains;
+      size = start + ALIGN(FB_BUCKET(f)->bucket_size + sizeof(struct obuck_header) + 4, OBUCK_ALIGN) - pos;
+    }
+  else
+    size = datasize = bufsize;
+  int l = sh_pread(obuck_fd, f->buffer, size, pos);
   if (l < 0)
     die("Error reading bucket: %m");
   if ((unsigned) l != size)
-    obuck_broken("Short read");
+    obuck_broken("Short read", FB_BUCKET(f)->start_pos);
   f->bptr = f->buffer;
-  f->bstop = f->buffer + limit;
-  bucket_current += limit;
-  f->pos = bucket_current - bucket_start - sizeof(obuck_hdr);
-  obuck_remains -= limit;
-  if (!obuck_remains)  /* Should check the trailer */
+  f->bstop = f->buffer + datasize;
+  f->pos += datasize;
+  if (datasize < size)
     {
       if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
-       obuck_broken("Missing trailer");
+       obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos);
     }
-  return limit;
+  return datasize;
 }
 
 static void
@@ -132,15 +155,14 @@ obuck_fb_spout(struct fastbuf *f)
 
   while (l)
     {
-      int z = sh_pwrite(obuck_fd, c, l, bucket_current);
+      int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos);
       if (z <= 0)
        die("Error writing bucket: %m");
-      bucket_current += z;
+      f->pos += z;
       l -= z;
       c += z;
     }
   f->bptr = f->buffer;
-  f->pos = bucket_current - bucket_start - sizeof(obuck_hdr);
 }
 
 /*** Exported functions ***/
@@ -148,29 +170,20 @@ obuck_fb_spout(struct fastbuf *f)
 void
 obuck_init(int writeable)
 {
-  struct fastbuf *b;
   sh_off_t size;
 
   obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
   if (obuck_fd < 0)
     die("Unable to open bucket file %s: %m", obuck_name);
-  obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4);
-  b->buffer = (char *)(b+1);
-  b->bptr = b->bstop = b->buffer;
-  b->bufend = b->buffer + obuck_io_buflen;
-  b->name = "bucket";
-  b->refill = obuck_fb_refill;
-  b->spout = obuck_fb_spout;
   obuck_lock_read();
   size = sh_seek(obuck_fd, 0, SEEK_END);
   if (size)
     {
       /* If the bucket pool is not empty, check consistency of its end */
       u32 check;
-      bucket_start = size - 4; /* for error reporting */
       if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
          check != OBUCK_TRAILER)
-       obuck_broken("Missing trailer of last object");
+       obuck_broken("Missing trailer of last object", size - 4);
     }
   obuck_unlock();
 }
@@ -178,34 +191,33 @@ obuck_init(int writeable)
 void
 obuck_cleanup(void)
 {
-  bclose(obuck_fb);
   close(obuck_fd);
-  xfree(obuck_fb);
+  if (obuck_fb_count)
+    log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count);
+  if (obuck_write_fb)
+    log(L_ERROR, "Bug: Forgot to close bucket write stream");
 }
 
 void
 obuck_sync(void)
 {
-  bflush(obuck_fb);
+  if (obuck_write_fb)
+    bflush(obuck_write_fb);
   fsync(obuck_fd);
 }
 
 static void
 obuck_get(oid_t oid)
 {
-  struct fastbuf *b = obuck_fb;
-
-  bucket_start = obuck_get_pos(oid);
-  bflush(b);
-  if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr))
-    obuck_broken("Short header read");
-  bucket_current = bucket_start + sizeof(obuck_hdr);
+  bucket_find_pos = obuck_get_pos(oid);
+  if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr))
+    obuck_broken("Short header read", bucket_find_pos);
   if (obuck_hdr.magic != OBUCK_MAGIC)
-    obuck_broken("Missing magic number");
+    obuck_broken("Missing magic number", bucket_find_pos);
   if (obuck_hdr.oid == OBUCK_OID_DELETED)
-    obuck_broken("Access to deleted bucket");
+    obuck_broken("Access to deleted bucket", bucket_find_pos);
   if (obuck_hdr.oid != oid)
-    obuck_broken("Invalid backlink");
+    obuck_broken("Invalid backlink", bucket_find_pos);
 }
 
 void
@@ -223,7 +235,7 @@ obuck_find_by_oid(struct obuck_header *hdrp)
 int
 obuck_find_first(struct obuck_header *hdrp, int full)
 {
-  bucket_start = 0;
+  bucket_find_pos = 0;
   obuck_hdr.magic = 0;
   return obuck_find_next(hdrp, full);
 }
@@ -232,24 +244,21 @@ int
 obuck_find_next(struct obuck_header *hdrp, int full)
 {
   int c;
-  struct fastbuf *b = obuck_fb;
 
   for(;;)
     {
       if (obuck_hdr.magic)
-       bucket_start = (bucket_start + sizeof(obuck_hdr) + obuck_hdr.length +
-                       4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
-      bflush(b);
+       bucket_find_pos = (bucket_find_pos + sizeof(obuck_hdr) + obuck_hdr.length +
+                          4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
       obuck_lock_read();
-      c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
+      c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
       obuck_unlock();
       if (!c)
        return 0;
       if (c != sizeof(obuck_hdr))
-       obuck_broken("Short header read");
-      bucket_current = bucket_start + sizeof(obuck_hdr);
+       obuck_broken("Short header read", bucket_find_pos);
       if (obuck_hdr.magic != OBUCK_MAGIC)
-       obuck_broken("Missing magic number");
+       obuck_broken("Missing magic number", bucket_find_pos);
       if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
        {
          memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
@@ -261,15 +270,30 @@ obuck_find_next(struct obuck_header *hdrp, int full)
 struct fastbuf *
 obuck_fetch(void)
 {
-  obuck_fb->pos = 0;
-  obuck_remains = obuck_hdr.length;
-  obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
-  return obuck_fb;
+  struct fastbuf *b;
+  uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN);
+  uns real_buflen = official_buflen + OBUCK_ALIGN;
+
+  b = xmalloc(sizeof(struct fb_bucket) + real_buflen);
+  b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer;
+  b->bufend = b->buffer + official_buflen;
+  b->name = "bucket-read";
+  b->pos = 0;
+  b->refill = obuck_fb_refill;
+  b->spout = NULL;
+  b->seek = NULL;
+  b->close = obuck_fb_close;
+  b->config = NULL;
+  FB_BUCKET(b)->start_pos = bucket_find_pos;
+  FB_BUCKET(b)->bucket_size = obuck_hdr.length;
+  obuck_fb_count++;
+  return b;
 }
 
 oid_t
 obuck_predict_last_oid(void)
 {
+  /* BEWARE: This is not fork-safe. */
   sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
   return size >> OBUCK_SHIFT;
 }
@@ -277,36 +301,55 @@ obuck_predict_last_oid(void)
 struct fastbuf *
 obuck_create(u32 type)
 {
+  ASSERT(!obuck_write_fb);
+
   obuck_lock_write();
-  bflush(obuck_fb);
-  bucket_start = sh_seek(obuck_fd, 0, SEEK_END);
-  if (bucket_start & (OBUCK_ALIGN - 1))
-    obuck_broken("Misaligned file");
-  obuck_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
-  obuck_hdr.oid = bucket_start >> OBUCK_SHIFT;
-  obuck_hdr.length = 0;
-  obuck_hdr.type = type;
-  bucket_current = bucket_start;
-  bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr));
-  obuck_fb->pos = -sizeof(obuck_hdr);
-  return obuck_fb;
+  sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
+  if (start & (OBUCK_ALIGN - 1))
+    obuck_broken("Misaligned file", start);
+  obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
+  obuck_create_hdr.oid = start >> OBUCK_SHIFT;
+  obuck_create_hdr.length = 0;
+  obuck_create_hdr.type = type;
+
+  struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen);
+  obuck_write_fb = b;
+  b->buffer = FB_BUCKET(b)->buffer;
+  b->bptr = b->bstop = b->buffer;
+  b->bufend = b->buffer + obuck_io_buflen;
+  b->pos = -(int)sizeof(obuck_create_hdr);
+  b->name = "bucket-write";
+  b->refill = NULL;
+  b->spout = obuck_fb_spout;
+  b->seek = NULL;
+  b->close = NULL;
+  b->config = NULL;
+  FB_BUCKET(b)->start_pos = start;
+  FB_BUCKET(b)->bucket_size = 0;
+  bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
+
+  return b;
 }
 
 void
-obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp)
+obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp)
 {
-  int pad;
-  obuck_hdr.magic = OBUCK_MAGIC;
-  obuck_hdr.length = btell(obuck_fb);
-  pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
+  ASSERT(b == obuck_write_fb);
+  obuck_write_fb = NULL;
+
+  obuck_create_hdr.magic = OBUCK_MAGIC;
+  obuck_create_hdr.length = btell(b);
+  int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1);
   while (pad--)
-    bputc(obuck_fb, 0);
-  bputl(obuck_fb, OBUCK_TRAILER);
-  bflush(obuck_fb);
-  ASSERT(!(bucket_current & (OBUCK_ALIGN - 1)));
-  sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
+    bputc(b, 0);
+  bputl(b, OBUCK_TRAILER);
+  bflush(b);
+  ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1)));
+  if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr))
+    die("Bucket header update failed: %m");
   obuck_unlock();
-  memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
+  memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr));
+  xfree(b);
 }
 
 void
@@ -315,27 +358,29 @@ obuck_delete(oid_t oid)
   obuck_lock_write();
   obuck_get(oid);
   obuck_hdr.oid = OBUCK_OID_DELETED;
-  sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
+  sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
   obuck_unlock();
 }
 
 /*** Fast reading of the whole pool ***/
 
 static struct fastbuf *obuck_rpf;
+static uns slurp_remains;
+static sh_off_t slurp_start, slurp_current;
 
 static int
 obuck_slurp_refill(struct fastbuf *f)
 {
   uns l;
 
-  if (!obuck_remains)
+  if (!slurp_remains)
     return 0;
   l = bdirect_read_prepare(obuck_rpf, &f->buffer);
   if (!l)
-    obuck_broken("Incomplete object");
-  l = MIN(l, obuck_remains);
+    obuck_broken("Incomplete object", slurp_start);
+  l = MIN(l, slurp_remains);
   bdirect_read_commit(obuck_rpf, f->buffer + l);
-  obuck_remains -= l;
+  slurp_remains -= l;
   f->bptr = f->buffer;
   f->bufend = f->bstop = f->buffer + l;
   return 1;
@@ -356,11 +401,11 @@ obuck_slurp_pool(struct obuck_header *hdrp)
        }
       else
        {
-         bsetpos(obuck_rpf, bucket_current - 4);
+         bsetpos(obuck_rpf, slurp_current - 4);
          if (bgetl(obuck_rpf) != OBUCK_TRAILER)
-           obuck_broken("Missing trailer");
+           obuck_broken("Missing trailer", slurp_start);
        }
-      bucket_start = btell(obuck_rpf);
+      slurp_start = btell(obuck_rpf);
       l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
       if (!l)
        {
@@ -370,16 +415,16 @@ obuck_slurp_pool(struct obuck_header *hdrp)
          return NULL;
        }
       if (l != sizeof(struct obuck_header))
-       obuck_broken("Short header read");
+       obuck_broken("Short header read", slurp_start);
       if (hdrp->magic != OBUCK_MAGIC)
-       obuck_broken("Missing magic number");
-      bucket_current = (bucket_start + sizeof(obuck_hdr) + hdrp->length +
-                       4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
+       obuck_broken("Missing magic number", slurp_start);
+      slurp_current = (slurp_start + sizeof(obuck_hdr) + hdrp->length +
+                      4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
     }
   while (hdrp->oid == OBUCK_OID_DELETED);
-  if (obuck_get_pos(hdrp->oid) != bucket_start)
-    obuck_broken("Invalid backlink");
-  obuck_remains = hdrp->length;
+  if (obuck_get_pos(hdrp->oid) != slurp_start)
+    obuck_broken("Invalid backlink", slurp_start);
+  slurp_remains = hdrp->length;
   limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
   limiter.name = "Bucket";
   limiter.pos = 0;
@@ -393,7 +438,7 @@ void
 obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
 {
   byte *rbuf, *wbuf, *msg;
-  sh_off_t rstart, wstart, w_bucket_start;
+  sh_off_t rstart, wstart, r_bucket_start, w_bucket_start;
   int roff, woff, rsize, l;
   struct obuck_header *rhdr, *whdr;
 
@@ -407,13 +452,13 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
 
   for(;;)
     {
-      bucket_start = rstart + roff;
+      r_bucket_start = rstart + roff;
       w_bucket_start = wstart + woff;
       if (rsize - roff < OBUCK_ALIGN)
        goto reread;
       rhdr = (struct obuck_header *)(rbuf + roff);
       if (rhdr->magic != OBUCK_MAGIC ||
-         rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(bucket_start >> OBUCK_SHIFT))
+         rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT))
        {
          msg = "header mismatch";
          goto broken;
@@ -426,7 +471,7 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
              msg = "bucket longer than ShakeBufSize";
              goto broken;
            }
-         rstart = bucket_start + l;
+         rstart = r_bucket_start + l;
          roff = 0;
          rsize = 0;
          goto reread;
@@ -442,7 +487,7 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
        {
          if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)))
            {
-             if (bucket_start == w_bucket_start)
+             if (r_bucket_start == w_bucket_start)
                {
                  /* No copying needed now nor ever in the past, hence woff==0 */
                  wstart += l;
@@ -502,19 +547,19 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
   return;
 
  broken:
-  log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) bucket_start, (uns)(bucket_start >> OBUCK_SHIFT));
+  log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT));
   if (woff)
     {
       sh_pwrite(obuck_fd, wbuf, woff, wstart);
       wstart += woff;
     }
-  while (wstart + OBUCK_ALIGN <= bucket_start)
+  while (wstart + OBUCK_ALIGN <= r_bucket_start)
     {
       u32 check = OBUCK_TRAILER;
       obuck_hdr.magic = OBUCK_MAGIC;
       obuck_hdr.oid = OBUCK_OID_DELETED;
-      if (bucket_start - wstart < 0x40000000)
-       obuck_hdr.length = bucket_start - wstart - sizeof(obuck_hdr) - 4;
+      if (r_bucket_start - wstart < 0x40000000)
+       obuck_hdr.length = r_bucket_start - wstart - sizeof(obuck_hdr) - 4;
       else
        obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4;
       sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart);
@@ -552,7 +597,7 @@ int main(int argc, char **argv)
   obuck_init(1);
   for(j=0; j<COUNT; j++)
     {
-      b = obuck_create();
+      b = obuck_create(BUCKET_TYPE_PLAIN);
       for(i=0; i<LEN(j); i++)
         bputc(b, (i+j) % 256);
       obuck_create_end(b, &h);
index 815213ca8ea793e19205e8e45c997d2878503d60..b493305d20a30c800268dcc04ee1883f0e4bcadd 100644 (file)
@@ -17,7 +17,9 @@
  *
  * Locking: Each operation on the pool is protected by a flock.
  *
- * The buckets emulate non-seekable fastbuf streams.
+ * The buckets emulate fastbuf streams. Read streams act as normal files,
+ * but there can be only one write stream which is non-seekable and you
+ * also shouldn't open new read streams when writing.
  *
  * fork()'ing if you don't have any bucket open is safe.
  */