]> mj.ucw.cz Git - libucw.git/blobdiff - lib/bucket.c
BCONFIG_CAN_OVERWRITE is a read/write parameter, i.e. the used can
[libucw.git] / lib / bucket.c
index df9740bd5cd8dd5972a8bbd7d4c15556d1b83bd7..4a5533cfa6ef19e485e8a50d6d43429799b540f1 100644 (file)
@@ -2,6 +2,7 @@
  *     Sherlock Library -- Object Buckets
  *
  *     (c) 2001--2004 Martin Mares <mj@ucw.cz>
+ *     (c) 2004 Robert Spalek <robert@ucw.cz>
  *
  *     This software may be freely distributed and used according to the terms
  *     of the GNU Lesser General Public License.
@@ -59,22 +60,39 @@ obuck_broken(char *msg, sh_off_t pos)
 }
 
 /*
- *  Unfortunately we cannot use flock() here since it happily permits
- *  locking a shared fd (e.g., after fork()) multiple times. The fcntl
- *  locks are very ugly and they don't support 64-bit offsets, but we
- *  can work around the problem by always locking the first header
- *  in the file.
+ *  We need several types of locks:
+ *
+ *     Read lock       reading parts of bucket file
+ *     Write lock      any write operations
+ *     Append lock     appending to the end of the file
+ *     Scan lock       reading parts which we are certain they exist
+ *
+ *  Multiple read and scan locks can co-exist together.
+ *  Scan locks can co-exist with an append lock.
+ *  There can be at most one write/append lock at a time.
+ *
+ *  These lock types map to a pair of normal read-write locks which
+ *  we represent as fcntl() locks on the first and second byte of the
+ *  bucket file. [We cannot use flock() since it happily permits
+ *  locking a shared fd (e.g., after fork()) multiple times at it also
+ *  doesn't offer multiple locks on a single file.]
+ *
+ *                     byte0           byte1
+ *     Read            <read>          <read>
+ *     Write           <write>         <write>
+ *     Append          <write>         -
+ *     Scan            -               <read>
  */
 
 static inline void
-obuck_do_lock(int type)
+obuck_do_lock(int type, int start, int len)
 {
   struct flock fl;
 
   fl.l_type = type;
   fl.l_whence = SEEK_SET;
-  fl.l_start = 0;
-  fl.l_len = sizeof(struct obuck_header);
+  fl.l_start = start;
+  fl.l_len = len;
   if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
     die("fcntl lock: %m");
 }
@@ -82,25 +100,38 @@ obuck_do_lock(int type)
 inline void
 obuck_lock_read(void)
 {
-  obuck_do_lock(F_RDLCK);
+  obuck_do_lock(F_RDLCK, 0, 2);
 }
 
 inline void
 obuck_lock_write(void)
 {
-  obuck_do_lock(F_WRLCK);
+  obuck_do_lock(F_WRLCK, 0, 2);
+}
+
+static inline void
+obuck_lock_append(void)
+{
+  obuck_do_lock(F_WRLCK, 0, 1);
+}
+
+static inline void
+obuck_lock_read_to_scan(void)
+{
+  obuck_do_lock(F_UNLCK, 0, 1);
 }
 
 inline void
 obuck_unlock(void)
 {
-  obuck_do_lock(F_UNLCK);
+  obuck_do_lock(F_UNLCK, 0, 2);
 }
 
 /*** FastIO emulation ***/
 
 struct fb_bucket {
   struct fastbuf fb;
+  int can_overwrite;
   sh_off_t start_pos;
   uns bucket_size;
   byte buffer[0];
@@ -271,6 +302,21 @@ obuck_find_next(struct obuck_header *hdrp, int full)
     }
 }
 
+static int
+obuck_bconfig(struct fastbuf *f, uns item, int value)
+{
+  switch (item)
+    {
+    case BCONFIG_CAN_OVERWRITE: ;
+      int old_value = FB_BUCKET(f)->can_overwrite;
+      if (value >= 0 && value <= 2)
+       FB_BUCKET(f)->can_overwrite = value;
+      return old_value;
+    default:
+      return -1;
+    }
+}
+
 struct fastbuf *
 obuck_fetch(void)
 {
@@ -287,7 +333,7 @@ obuck_fetch(void)
   b->spout = NULL;
   b->seek = NULL;
   b->close = obuck_fb_close;
-  b->config = NULL;
+  b->config = obuck_bconfig;
   FB_BUCKET(b)->start_pos = bucket_find_pos;
   FB_BUCKET(b)->bucket_size = obuck_hdr.length;
   obuck_fb_count++;
@@ -297,11 +343,8 @@ obuck_fetch(void)
 oid_t
 obuck_predict_last_oid(void)
 {
-  obuck_lock_write();
   sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
-  oid_t ss = size >> OBUCK_SHIFT;
-  obuck_unlock();
-  return ss;
+  return (oid_t)(size >> OBUCK_SHIFT);
 }
 
 struct fastbuf *
@@ -309,7 +352,7 @@ obuck_create(u32 type)
 {
   ASSERT(!obuck_write_fb);
 
-  obuck_lock_write();
+  obuck_lock_append();
   sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
   if (start & (OBUCK_ALIGN - 1))
     obuck_broken("Misaligned file", start);
@@ -332,6 +375,7 @@ obuck_create(u32 type)
   b->config = NULL;
   FB_BUCKET(b)->start_pos = start;
   FB_BUCKET(b)->bucket_size = 0;
+  FB_BUCKET(b)->can_overwrite = 2;
   bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
 
   return b;
@@ -372,7 +416,7 @@ obuck_delete(oid_t oid)
 
 static struct fastbuf *obuck_rpf;
 static uns slurp_remains;
-static sh_off_t slurp_start, slurp_current;
+static sh_off_t slurp_start, slurp_current, slurp_end;
 
 static int
 obuck_slurp_refill(struct fastbuf *f)
@@ -404,6 +448,10 @@ obuck_slurp_pool(struct obuck_header *hdrp)
        {
          obuck_lock_read();
          obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
+         bseek(obuck_rpf, 0, SEEK_END);
+         slurp_end = btell(obuck_rpf);
+         bsetpos(obuck_rpf, 0);
+         obuck_lock_read_to_scan();
        }
       else
        {
@@ -412,7 +460,10 @@ obuck_slurp_pool(struct obuck_header *hdrp)
            obuck_broken("Missing trailer", slurp_start);
        }
       slurp_start = btell(obuck_rpf);
-      l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
+      if (slurp_start < slurp_end)
+       l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
+      else
+       l = 0;
       if (!l)
        {
          bclose(obuck_rpf);
@@ -609,20 +660,20 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
          int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1));
          if (status)
            {
+             int lnew = l;
              if (status > 1)
                {
                  /* Changed! Reconstruct the trailer. */
-                 int l2 = obuck_bucket_size(rhdr->length);
-                 ASSERT(l2 <= l);
-                 PUT_U32((byte *)rhdr + l2 - 4, OBUCK_TRAILER);
-                 l = l2;
+                 lnew = obuck_bucket_size(rhdr->length);
+                 ASSERT(lnew <= l);
+                 PUT_U32((byte *)rhdr + lnew - 4, OBUCK_TRAILER);
                  changed = 1;
                }
              whdr = (struct obuck_header *)(buf+woff);
              if (rhdr != whdr)
-               memmove(whdr, rhdr, l);
+               memmove(whdr, rhdr, lnew);
              whdr->oid = w_bucket_start >> OBUCK_SHIFT;
-             woff += l;
+             woff += lnew;
            }
          else
            changed = 1;