]> mj.ucw.cz Git - libucw.git/blobdiff - lib/bucket.c
taken much faster implementation of Adler32 and put into a separate source-code
[libucw.git] / lib / bucket.c
index df9740bd5cd8dd5972a8bbd7d4c15556d1b83bd7..f81f03e8d3f5444d0af9d3176edf932f0fe1b5a3 100644 (file)
@@ -2,6 +2,7 @@
  *     Sherlock Library -- Object Buckets
  *
  *     (c) 2001--2004 Martin Mares <mj@ucw.cz>
  *     Sherlock Library -- Object Buckets
  *
  *     (c) 2001--2004 Martin Mares <mj@ucw.cz>
+ *     (c) 2004 Robert Spalek <robert@ucw.cz>
  *
  *     This software may be freely distributed and used according to the terms
  *     of the GNU Lesser General Public License.
  *
  *     This software may be freely distributed and used according to the terms
  *     of the GNU Lesser General Public License.
@@ -59,22 +60,39 @@ obuck_broken(char *msg, sh_off_t pos)
 }
 
 /*
 }
 
 /*
- *  Unfortunately we cannot use flock() here since it happily permits
- *  locking a shared fd (e.g., after fork()) multiple times. The fcntl
- *  locks are very ugly and they don't support 64-bit offsets, but we
- *  can work around the problem by always locking the first header
- *  in the file.
+ *  We need several types of locks:
+ *
+ *     Read lock       reading parts of bucket file
+ *     Write lock      any write operations
+ *     Append lock     appending to the end of the file
+ *     Scan lock       reading parts which we are certain they exist
+ *
+ *  Multiple read and scan locks can co-exist together.
+ *  Scan locks can co-exist with an append lock.
+ *  There can be at most one write/append lock at a time.
+ *
+ *  These lock types map to a pair of normal read-write locks which
+ *  we represent as fcntl() locks on the first and second byte of the
+ *  bucket file. [We cannot use flock() since it happily permits
+ *  locking a shared fd (e.g., after fork()) multiple times at it also
+ *  doesn't offer multiple locks on a single file.]
+ *
+ *                     byte0           byte1
+ *     Read            <read>          <read>
+ *     Write           <write>         <write>
+ *     Append          <write>         -
+ *     Scan            -               <read>
  */
 
 static inline void
  */
 
 static inline void
-obuck_do_lock(int type)
+obuck_do_lock(int type, int start, int len)
 {
   struct flock fl;
 
   fl.l_type = type;
   fl.l_whence = SEEK_SET;
 {
   struct flock fl;
 
   fl.l_type = type;
   fl.l_whence = SEEK_SET;
-  fl.l_start = 0;
-  fl.l_len = sizeof(struct obuck_header);
+  fl.l_start = start;
+  fl.l_len = len;
   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
     die("fcntl lock: %m");
 }
   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
     die("fcntl lock: %m");
 }
@@ -82,19 +100,31 @@ obuck_do_lock(int type)
 inline void
 obuck_lock_read(void)
 {
 inline void
 obuck_lock_read(void)
 {
-  obuck_do_lock(F_RDLCK);
+  obuck_do_lock(F_RDLCK, 0, 2);
 }
 
 inline void
 obuck_lock_write(void)
 {
 }
 
 inline void
 obuck_lock_write(void)
 {
-  obuck_do_lock(F_WRLCK);
+  obuck_do_lock(F_WRLCK, 0, 2);
+}
+
+static inline void
+obuck_lock_append(void)
+{
+  obuck_do_lock(F_WRLCK, 0, 1);
+}
+
+static inline void
+obuck_lock_read_to_scan(void)
+{
+  obuck_do_lock(F_UNLCK, 0, 1);
 }
 
 inline void
 obuck_unlock(void)
 {
 }
 
 inline void
 obuck_unlock(void)
 {
-  obuck_do_lock(F_UNLCK);
+  obuck_do_lock(F_UNLCK, 0, 2);
 }
 
 /*** FastIO emulation ***/
 }
 
 /*** FastIO emulation ***/
@@ -124,9 +154,10 @@ obuck_fb_refill(struct fastbuf *f)
   uns remains, bufsize, size, datasize;
 
   remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
   uns remains, bufsize, size, datasize;
 
   remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
-  bufsize = f->bufend - f->buffer;
   if (!remains)
     return 0;
   if (!remains)
     return 0;
+  f->buffer = FB_BUCKET(f)->buffer;    /* Could have been trimmed by bdirect_read_commit_modified() */
+  bufsize = f->bufend - f->buffer;
   sh_off_t start = FB_BUCKET(f)->start_pos;
   sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
   if (remains <= bufsize)
   sh_off_t start = FB_BUCKET(f)->start_pos;
   sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
   if (remains <= bufsize)
@@ -288,6 +319,7 @@ obuck_fetch(void)
   b->seek = NULL;
   b->close = obuck_fb_close;
   b->config = NULL;
   b->seek = NULL;
   b->close = obuck_fb_close;
   b->config = NULL;
+  b->can_overwrite_buffer = 2;
   FB_BUCKET(b)->start_pos = bucket_find_pos;
   FB_BUCKET(b)->bucket_size = obuck_hdr.length;
   obuck_fb_count++;
   FB_BUCKET(b)->start_pos = bucket_find_pos;
   FB_BUCKET(b)->bucket_size = obuck_hdr.length;
   obuck_fb_count++;
@@ -297,11 +329,8 @@ obuck_fetch(void)
 oid_t
 obuck_predict_last_oid(void)
 {
 oid_t
 obuck_predict_last_oid(void)
 {
-  obuck_lock_write();
   sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
   sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
-  oid_t ss = size >> OBUCK_SHIFT;
-  obuck_unlock();
-  return ss;
+  return (oid_t)(size >> OBUCK_SHIFT);
 }
 
 struct fastbuf *
 }
 
 struct fastbuf *
@@ -309,7 +338,7 @@ obuck_create(u32 type)
 {
   ASSERT(!obuck_write_fb);
 
 {
   ASSERT(!obuck_write_fb);
 
-  obuck_lock_write();
+  obuck_lock_append();
   sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
   if (start & (OBUCK_ALIGN - 1))
     obuck_broken("Misaligned file", start);
   sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
   if (start & (OBUCK_ALIGN - 1))
     obuck_broken("Misaligned file", start);
@@ -330,6 +359,7 @@ obuck_create(u32 type)
   b->seek = NULL;
   b->close = NULL;
   b->config = NULL;
   b->seek = NULL;
   b->close = NULL;
   b->config = NULL;
+  b->can_overwrite_buffer = 0;
   FB_BUCKET(b)->start_pos = start;
   FB_BUCKET(b)->bucket_size = 0;
   bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
   FB_BUCKET(b)->start_pos = start;
   FB_BUCKET(b)->bucket_size = 0;
   bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
@@ -372,19 +402,20 @@ obuck_delete(oid_t oid)
 
 static struct fastbuf *obuck_rpf;
 static uns slurp_remains;
 
 static struct fastbuf *obuck_rpf;
 static uns slurp_remains;
-static sh_off_t slurp_start, slurp_current;
+static sh_off_t slurp_start, slurp_current, slurp_end;
 
 static int
 obuck_slurp_refill(struct fastbuf *f)
 {
 
 static int
 obuck_slurp_refill(struct fastbuf *f)
 {
-  uns l;
-
   if (!slurp_remains)
     return 0;
   if (!slurp_remains)
     return 0;
-  l = bdirect_read_prepare(obuck_rpf, &f->buffer);
+  uns l = bdirect_read_prepare(obuck_rpf, &f->buffer);
   if (!l)
     obuck_broken("Incomplete object", slurp_start);
   l = MIN(l, slurp_remains);
   if (!l)
     obuck_broken("Incomplete object", slurp_start);
   l = MIN(l, slurp_remains);
+  /* XXX: This probably should be bdirect_read_commit_modified() in some cases,
+   *      but it doesn't hurt since we aren't going to seek.
+   */
   bdirect_read_commit(obuck_rpf, f->buffer + l);
   slurp_remains -= l;
   f->bptr = f->buffer;
   bdirect_read_commit(obuck_rpf, f->buffer + l);
   slurp_remains -= l;
   f->bptr = f->buffer;
@@ -404,6 +435,8 @@ obuck_slurp_pool(struct obuck_header *hdrp)
        {
          obuck_lock_read();
          obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
        {
          obuck_lock_read();
          obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
+         slurp_end = bfilesize(obuck_rpf);
+         obuck_lock_read_to_scan();
        }
       else
        {
        }
       else
        {
@@ -412,7 +445,10 @@ obuck_slurp_pool(struct obuck_header *hdrp)
            obuck_broken("Missing trailer", slurp_start);
        }
       slurp_start = btell(obuck_rpf);
            obuck_broken("Missing trailer", slurp_start);
        }
       slurp_start = btell(obuck_rpf);
-      l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
+      if (slurp_start < slurp_end)
+       l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
+      else
+       l = 0;
       if (!l)
        {
          bclose(obuck_rpf);
       if (!l)
        {
          bclose(obuck_rpf);
@@ -434,6 +470,7 @@ obuck_slurp_pool(struct obuck_header *hdrp)
   limiter.name = "Bucket";
   limiter.pos = 0;
   limiter.refill = obuck_slurp_refill;
   limiter.name = "Bucket";
   limiter.pos = 0;
   limiter.refill = obuck_slurp_refill;
+  limiter.can_overwrite_buffer = obuck_rpf->can_overwrite_buffer;
   return &limiter;
 }
 
   return &limiter;
 }
 
@@ -609,20 +646,20 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
          int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1));
          if (status)
            {
          int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1));
          if (status)
            {
+             int lnew = l;
              if (status > 1)
                {
                  /* Changed! Reconstruct the trailer. */
              if (status > 1)
                {
                  /* Changed! Reconstruct the trailer. */
-                 int l2 = obuck_bucket_size(rhdr->length);
-                 ASSERT(l2 <= l);
-                 PUT_U32((byte *)rhdr + l2 - 4, OBUCK_TRAILER);
-                 l = l2;
+                 lnew = obuck_bucket_size(rhdr->length);
+                 ASSERT(lnew <= l);
+                 PUT_U32((byte *)rhdr + lnew - 4, OBUCK_TRAILER);
                  changed = 1;
                }
              whdr = (struct obuck_header *)(buf+woff);
              if (rhdr != whdr)
                  changed = 1;
                }
              whdr = (struct obuck_header *)(buf+woff);
              if (rhdr != whdr)
-               memmove(whdr, rhdr, l);
+               memmove(whdr, rhdr, lnew);
              whdr->oid = w_bucket_start >> OBUCK_SHIFT;
              whdr->oid = w_bucket_start >> OBUCK_SHIFT;
-             woff += l;
+             woff += lnew;
            }
          else
            changed = 1;
            }
          else
            changed = 1;