* Sherlock Library -- Object Buckets
*
* (c) 2001--2004 Martin Mares <mj@ucw.cz>
+ * (c) 2004 Robert Spalek <robert@ucw.cz>
*
* This software may be freely distributed and used according to the terms
* of the GNU Lesser General Public License.
}
/*
- * Unfortunately we cannot use flock() here since it happily permits
- * locking a shared fd (e.g., after fork()) multiple times. The fcntl
- * locks are very ugly and they don't support 64-bit offsets, but we
- * can work around the problem by always locking the first header
- * in the file.
+ * We need several types of locks:
+ *
+ * Read lock reading parts of bucket file
+ * Write lock any write operations
+ * Append lock appending to the end of the file
+ * Scan lock reading parts which we are certain they exist
+ *
+ * Multiple read and scan locks can co-exist together.
+ * Scan locks can co-exist with an append lock.
+ * There can be at most one write/append lock at a time.
+ *
+ * These lock types map to a pair of normal read-write locks which
+ * we represent as fcntl() locks on the first and second byte of the
+ * bucket file. [We cannot use flock() since it happily permits
+ * locking a shared fd (e.g., after fork()) multiple times at it also
+ * doesn't offer multiple locks on a single file.]
+ *
+ * byte0 byte1
+ * Read <read> <read>
+ * Write <write> <write>
+ * Append <write> -
+ * Scan - <read>
*/
static inline void
-obuck_do_lock(int type)
+obuck_do_lock(int type, int start, int len)
{
struct flock fl;
fl.l_type = type;
fl.l_whence = SEEK_SET;
- fl.l_start = 0;
- fl.l_len = sizeof(struct obuck_header);
+ fl.l_start = start;
+ fl.l_len = len;
if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
die("fcntl lock: %m");
}
inline void
obuck_lock_read(void)
{
- obuck_do_lock(F_RDLCK);
+ obuck_do_lock(F_RDLCK, 0, 2);
}
inline void
obuck_lock_write(void)
{
- obuck_do_lock(F_WRLCK);
+ obuck_do_lock(F_WRLCK, 0, 2);
+}
+
+static inline void
+obuck_lock_append(void)
+{
+ obuck_do_lock(F_WRLCK, 0, 1);
+}
+
+static inline void
+obuck_lock_read_to_scan(void)
+{
+ obuck_do_lock(F_UNLCK, 0, 1);
}
inline void
obuck_unlock(void)
{
- obuck_do_lock(F_UNLCK);
+ obuck_do_lock(F_UNLCK, 0, 2);
}
/*** FastIO emulation ***/
uns remains, bufsize, size, datasize;
remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
- bufsize = f->bufend - f->buffer;
if (!remains)
return 0;
+ f->buffer = FB_BUCKET(f)->buffer; /* Could have been trimmed by bdirect_read_commit_modified() */
+ bufsize = f->bufend - f->buffer;
sh_off_t start = FB_BUCKET(f)->start_pos;
sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
if (remains <= bufsize)
{
datasize = remains;
- size = start + ALIGN(FB_BUCKET(f)->bucket_size + sizeof(struct obuck_header) + 4, OBUCK_ALIGN) - pos;
+ size = start + obuck_bucket_size(FB_BUCKET(f)->bucket_size) - pos;
}
else
size = datasize = bufsize;
for(;;)
{
if (obuck_hdr.magic)
- bucket_find_pos = (bucket_find_pos + sizeof(obuck_hdr) + obuck_hdr.length +
- 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
+ bucket_find_pos += obuck_bucket_size(obuck_hdr.length);
obuck_lock_read();
c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
obuck_unlock();
b->seek = NULL;
b->close = obuck_fb_close;
b->config = NULL;
+ b->can_overwrite_buffer = 2;
FB_BUCKET(b)->start_pos = bucket_find_pos;
FB_BUCKET(b)->bucket_size = obuck_hdr.length;
obuck_fb_count++;
oid_t
obuck_predict_last_oid(void)
{
- obuck_lock_write();
sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
- oid_t ss = size >> OBUCK_SHIFT;
- obuck_unlock();
- return ss;
+ return (oid_t)(size >> OBUCK_SHIFT);
}
struct fastbuf *
{
ASSERT(!obuck_write_fb);
- obuck_lock_write();
+ obuck_lock_append();
sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
if (start & (OBUCK_ALIGN - 1))
obuck_broken("Misaligned file", start);
b->seek = NULL;
b->close = NULL;
b->config = NULL;
+ b->can_overwrite_buffer = 0;
FB_BUCKET(b)->start_pos = start;
FB_BUCKET(b)->bucket_size = 0;
bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
static struct fastbuf *obuck_rpf;
static uns slurp_remains;
-static sh_off_t slurp_start, slurp_current;
+static sh_off_t slurp_start, slurp_current, slurp_end;
static int
obuck_slurp_refill(struct fastbuf *f)
{
- uns l;
-
if (!slurp_remains)
return 0;
- l = bdirect_read_prepare(obuck_rpf, &f->buffer);
+ uns l = bdirect_read_prepare(obuck_rpf, &f->buffer);
if (!l)
obuck_broken("Incomplete object", slurp_start);
l = MIN(l, slurp_remains);
+ /* XXX: This probably should be bdirect_read_commit_modified() in some cases,
+ * but it doesn't hurt since we aren't going to seek.
+ */
bdirect_read_commit(obuck_rpf, f->buffer + l);
slurp_remains -= l;
f->bptr = f->buffer;
{
obuck_lock_read();
obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
+ slurp_end = bfilesize(obuck_rpf);
+ obuck_lock_read_to_scan();
}
else
{
obuck_broken("Missing trailer", slurp_start);
}
slurp_start = btell(obuck_rpf);
- l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
+ if (slurp_start < slurp_end)
+ l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
+ else
+ l = 0;
if (!l)
{
bclose(obuck_rpf);
obuck_broken("Short header read", slurp_start);
if (hdrp->magic != OBUCK_MAGIC)
obuck_broken("Missing magic number", slurp_start);
- slurp_current = (slurp_start + sizeof(obuck_hdr) + hdrp->length +
- 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
+ slurp_current = slurp_start + obuck_bucket_size(hdrp->length);
}
while (hdrp->oid == OBUCK_OID_DELETED);
if (obuck_get_pos(hdrp->oid) != slurp_start)
limiter.name = "Bucket";
limiter.pos = 0;
limiter.refill = obuck_slurp_refill;
+ limiter.can_overwrite_buffer = obuck_rpf->can_overwrite_buffer;
return &limiter;
}
/* This needn't be optimized for speed. */
bhdr = (struct obuck_header *) (norm_buf + boff);
ASSERT(bhdr->magic == OBUCK_MAGIC);
- l = ALIGN(sizeof(struct obuck_header) + bhdr->length + 4, OBUCK_ALIGN);
+ l = obuck_bucket_size(bhdr->length);
old_oid = bhdr->oid;
bhdr->oid = bpos >> OBUCK_SHIFT;
shake_write(bhdr, l, bpos);
msg = "header mismatch";
goto broken;
}
- l = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN);
+ l = obuck_bucket_size(rhdr->length);
if (l > buflen)
{
if (rhdr->oid != OBUCK_OID_DELETED)
int status = kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1));
if (status)
{
+ int lnew = l;
if (status > 1)
{
/* Changed! Reconstruct the trailer. */
- int l2 = ALIGN(sizeof(struct obuck_header) + rhdr->length + 4, OBUCK_ALIGN);
- ASSERT(l2 <= l);
- PUT_U32((byte *)rhdr + l2 - 4, OBUCK_TRAILER);
- l = l2;
+ lnew = obuck_bucket_size(rhdr->length);
+ ASSERT(lnew <= l);
+ PUT_U32((byte *)rhdr + lnew - 4, OBUCK_TRAILER);
changed = 1;
}
whdr = (struct obuck_header *)(buf+woff);
if (rhdr != whdr)
- memmove(whdr, rhdr, l);
+ memmove(whdr, rhdr, lnew);
whdr->oid = w_bucket_start >> OBUCK_SHIFT;
- woff += l;
+ woff += lnew;
}
else
changed = 1;