#include <sys/file.h>
static int obuck_fd;
-static unsigned int obuck_remains, obuck_check_pad;
-static struct fastbuf *obuck_fb;
-static struct obuck_header obuck_hdr;
-static sh_off_t bucket_start, bucket_current;
+static struct obuck_header obuck_hdr, obuck_create_hdr;
+static sh_off_t bucket_find_pos;
+static struct fastbuf *obuck_write_fb;
/*** Configuration ***/
/*** Internal operations ***/
static void
-obuck_broken(char *msg)
+obuck_broken(char *msg, sh_off_t pos)
{
- die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start);
+ die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos);
}
/*
/*** FastIO emulation ***/
+struct fb_bucket {
+ struct fastbuf fb;
+ sh_off_t start_pos;
+ uns bucket_size;
+ byte buffer[0];
+};
+#define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf)
+
+static int obuck_fb_count;
+
+static void
+obuck_fb_close(struct fastbuf *f)
+{
+ obuck_fb_count--;
+ xfree(f);
+}
+
/* We need to use pread/pwrite since we work on fd's shared between processes */
static int
obuck_fb_refill(struct fastbuf *f)
{
- unsigned limit = (obuck_io_buflen < obuck_remains) ? obuck_io_buflen : obuck_remains;
- unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit;
- int l;
+ uns remains, bufsize, size, datasize;
- if (!limit)
+ remains = FB_BUCKET(f)->bucket_size - (uns)f->pos;
+ bufsize = f->bufend - f->buffer;
+ if (!remains)
return 0;
- l = sh_pread(obuck_fd, f->buffer, size, bucket_current);
+ sh_off_t start = FB_BUCKET(f)->start_pos;
+ sh_off_t pos = start + sizeof(struct obuck_header) + f->pos;
+ if (remains <= bufsize)
+ {
+ datasize = remains;
+ size = start + ALIGN(FB_BUCKET(f)->bucket_size + sizeof(struct obuck_header) + 4, OBUCK_ALIGN) - pos;
+ }
+ else
+ size = datasize = bufsize;
+ int l = sh_pread(obuck_fd, f->buffer, size, pos);
if (l < 0)
die("Error reading bucket: %m");
if ((unsigned) l != size)
- obuck_broken("Short read");
+ obuck_broken("Short read", FB_BUCKET(f)->start_pos);
f->bptr = f->buffer;
- f->bstop = f->buffer + limit;
- bucket_current += limit;
- f->pos = bucket_current - bucket_start - sizeof(obuck_hdr);
- obuck_remains -= limit;
- if (!obuck_remains) /* Should check the trailer */
+ f->bstop = f->buffer + datasize;
+ f->pos += datasize;
+ if (datasize < size)
{
if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER)
- obuck_broken("Missing trailer");
+ obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos);
}
- return limit;
+ return datasize;
}
static void
while (l)
{
- int z = sh_pwrite(obuck_fd, c, l, bucket_current);
+ int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos);
if (z <= 0)
die("Error writing bucket: %m");
- bucket_current += z;
+ f->pos += z;
l -= z;
c += z;
}
f->bptr = f->buffer;
- f->pos = bucket_current - bucket_start - sizeof(obuck_hdr);
}
/*** Exported functions ***/
void
obuck_init(int writeable)
{
- struct fastbuf *b;
sh_off_t size;
obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666);
if (obuck_fd < 0)
die("Unable to open bucket file %s: %m", obuck_name);
- obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4);
- b->buffer = (char *)(b+1);
- b->bptr = b->bstop = b->buffer;
- b->bufend = b->buffer + obuck_io_buflen;
- b->name = "bucket";
- b->refill = obuck_fb_refill;
- b->spout = obuck_fb_spout;
obuck_lock_read();
size = sh_seek(obuck_fd, 0, SEEK_END);
if (size)
{
/* If the bucket pool is not empty, check consistency of its end */
u32 check;
- bucket_start = size - 4; /* for error reporting */
if (sh_pread(obuck_fd, &check, 4, size-4) != 4 ||
check != OBUCK_TRAILER)
- obuck_broken("Missing trailer of last object");
+ obuck_broken("Missing trailer of last object", size - 4);
}
obuck_unlock();
}
void
obuck_cleanup(void)
{
- bclose(obuck_fb);
close(obuck_fd);
- xfree(obuck_fb);
+ if (obuck_fb_count)
+ log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count);
+ if (obuck_write_fb)
+ log(L_ERROR, "Bug: Forgot to close bucket write stream");
}
void
obuck_sync(void)
{
- bflush(obuck_fb);
+ if (obuck_write_fb)
+ bflush(obuck_write_fb);
fsync(obuck_fd);
}
static void
obuck_get(oid_t oid)
{
- struct fastbuf *b = obuck_fb;
-
- bucket_start = obuck_get_pos(oid);
- bflush(b);
- if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr))
- obuck_broken("Short header read");
- bucket_current = bucket_start + sizeof(obuck_hdr);
+ bucket_find_pos = obuck_get_pos(oid);
+ if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr))
+ obuck_broken("Short header read", bucket_find_pos);
if (obuck_hdr.magic != OBUCK_MAGIC)
- obuck_broken("Missing magic number");
+ obuck_broken("Missing magic number", bucket_find_pos);
if (obuck_hdr.oid == OBUCK_OID_DELETED)
- obuck_broken("Access to deleted bucket");
+ obuck_broken("Access to deleted bucket", bucket_find_pos);
if (obuck_hdr.oid != oid)
- obuck_broken("Invalid backlink");
+ obuck_broken("Invalid backlink", bucket_find_pos);
}
void
int
obuck_find_first(struct obuck_header *hdrp, int full)
{
- bucket_start = 0;
+ bucket_find_pos = 0;
obuck_hdr.magic = 0;
return obuck_find_next(hdrp, full);
}
obuck_find_next(struct obuck_header *hdrp, int full)
{
int c;
- struct fastbuf *b = obuck_fb;
for(;;)
{
if (obuck_hdr.magic)
- bucket_start = (bucket_start + sizeof(obuck_hdr) + obuck_hdr.length +
- 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
- bflush(b);
+ bucket_find_pos = (bucket_find_pos + sizeof(obuck_hdr) + obuck_hdr.length +
+ 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
obuck_lock_read();
- c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
+ c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
obuck_unlock();
if (!c)
return 0;
if (c != sizeof(obuck_hdr))
- obuck_broken("Short header read");
- bucket_current = bucket_start + sizeof(obuck_hdr);
+ obuck_broken("Short header read", bucket_find_pos);
if (obuck_hdr.magic != OBUCK_MAGIC)
- obuck_broken("Missing magic number");
+ obuck_broken("Missing magic number", bucket_find_pos);
if (obuck_hdr.oid != OBUCK_OID_DELETED || full)
{
memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
struct fastbuf *
obuck_fetch(void)
{
- obuck_fb->pos = 0;
- obuck_remains = obuck_hdr.length;
- obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
- return obuck_fb;
+ struct fastbuf *b;
+ uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN);
+ uns real_buflen = official_buflen + OBUCK_ALIGN;
+
+ b = xmalloc(sizeof(struct fb_bucket) + real_buflen);
+ b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer;
+ b->bufend = b->buffer + official_buflen;
+ b->name = "bucket-read";
+ b->pos = 0;
+ b->refill = obuck_fb_refill;
+ b->spout = NULL;
+ b->seek = NULL;
+ b->close = obuck_fb_close;
+ b->config = NULL;
+ FB_BUCKET(b)->start_pos = bucket_find_pos;
+ FB_BUCKET(b)->bucket_size = obuck_hdr.length;
+ obuck_fb_count++;
+ return b;
}
oid_t
obuck_predict_last_oid(void)
{
+ /* BEWARE: This is not fork-safe. */
sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END);
return size >> OBUCK_SHIFT;
}
struct fastbuf *
obuck_create(u32 type)
{
+ ASSERT(!obuck_write_fb);
+
obuck_lock_write();
- bflush(obuck_fb);
- bucket_start = sh_seek(obuck_fd, 0, SEEK_END);
- if (bucket_start & (OBUCK_ALIGN - 1))
- obuck_broken("Misaligned file");
- obuck_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
- obuck_hdr.oid = bucket_start >> OBUCK_SHIFT;
- obuck_hdr.length = 0;
- obuck_hdr.type = type;
- bucket_current = bucket_start;
- bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr));
- obuck_fb->pos = -sizeof(obuck_hdr);
- return obuck_fb;
+ sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
+ if (start & (OBUCK_ALIGN - 1))
+ obuck_broken("Misaligned file", start);
+ obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC;
+ obuck_create_hdr.oid = start >> OBUCK_SHIFT;
+ obuck_create_hdr.length = 0;
+ obuck_create_hdr.type = type;
+
+ struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen);
+ obuck_write_fb = b;
+ b->buffer = FB_BUCKET(b)->buffer;
+ b->bptr = b->bstop = b->buffer;
+ b->bufend = b->buffer + obuck_io_buflen;
+ b->pos = -(int)sizeof(obuck_create_hdr);
+ b->name = "bucket-write";
+ b->refill = NULL;
+ b->spout = obuck_fb_spout;
+ b->seek = NULL;
+ b->close = NULL;
+ b->config = NULL;
+ FB_BUCKET(b)->start_pos = start;
+ FB_BUCKET(b)->bucket_size = 0;
+ bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr));
+
+ return b;
}
void
-obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp)
+obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp)
{
- int pad;
- obuck_hdr.magic = OBUCK_MAGIC;
- obuck_hdr.length = btell(obuck_fb);
- pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1);
+ ASSERT(b == obuck_write_fb);
+ obuck_write_fb = NULL;
+
+ obuck_create_hdr.magic = OBUCK_MAGIC;
+ obuck_create_hdr.length = btell(b);
+ int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1);
while (pad--)
- bputc(obuck_fb, 0);
- bputl(obuck_fb, OBUCK_TRAILER);
- bflush(obuck_fb);
- ASSERT(!(bucket_current & (OBUCK_ALIGN - 1)));
- sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
+ bputc(b, 0);
+ bputl(b, OBUCK_TRAILER);
+ bflush(b);
+ ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1)));
+ if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr))
+ die("Bucket header update failed: %m");
obuck_unlock();
- memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr));
+ memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr));
+ xfree(b);
}
void
obuck_lock_write();
obuck_get(oid);
obuck_hdr.oid = OBUCK_OID_DELETED;
- sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start);
+ sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos);
obuck_unlock();
}
/*** Fast reading of the whole pool ***/
static struct fastbuf *obuck_rpf;
+static uns slurp_remains;
+static sh_off_t slurp_start, slurp_current;
static int
obuck_slurp_refill(struct fastbuf *f)
{
uns l;
- if (!obuck_remains)
+ if (!slurp_remains)
return 0;
l = bdirect_read_prepare(obuck_rpf, &f->buffer);
if (!l)
- obuck_broken("Incomplete object");
- l = MIN(l, obuck_remains);
+ obuck_broken("Incomplete object", slurp_start);
+ l = MIN(l, slurp_remains);
bdirect_read_commit(obuck_rpf, f->buffer + l);
- obuck_remains -= l;
+ slurp_remains -= l;
f->bptr = f->buffer;
f->bufend = f->bstop = f->buffer + l;
return 1;
}
else
{
- bsetpos(obuck_rpf, bucket_current - 4);
+ bsetpos(obuck_rpf, slurp_current - 4);
if (bgetl(obuck_rpf) != OBUCK_TRAILER)
- obuck_broken("Missing trailer");
+ obuck_broken("Missing trailer", slurp_start);
}
- bucket_start = btell(obuck_rpf);
+ slurp_start = btell(obuck_rpf);
l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
if (!l)
{
return NULL;
}
if (l != sizeof(struct obuck_header))
- obuck_broken("Short header read");
+ obuck_broken("Short header read", slurp_start);
if (hdrp->magic != OBUCK_MAGIC)
- obuck_broken("Missing magic number");
- bucket_current = (bucket_start + sizeof(obuck_hdr) + hdrp->length +
- 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
+ obuck_broken("Missing magic number", slurp_start);
+ slurp_current = (slurp_start + sizeof(obuck_hdr) + hdrp->length +
+ 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1));
}
while (hdrp->oid == OBUCK_OID_DELETED);
- if (obuck_get_pos(hdrp->oid) != bucket_start)
- obuck_broken("Invalid backlink");
- obuck_remains = hdrp->length;
+ if (obuck_get_pos(hdrp->oid) != slurp_start)
+ obuck_broken("Invalid backlink", slurp_start);
+ slurp_remains = hdrp->length;
limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL;
limiter.name = "Bucket";
limiter.pos = 0;
obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck))
{
byte *rbuf, *wbuf, *msg;
- sh_off_t rstart, wstart, w_bucket_start;
+ sh_off_t rstart, wstart, r_bucket_start, w_bucket_start;
int roff, woff, rsize, l;
struct obuck_header *rhdr, *whdr;
for(;;)
{
- bucket_start = rstart + roff;
+ r_bucket_start = rstart + roff;
w_bucket_start = wstart + woff;
if (rsize - roff < OBUCK_ALIGN)
goto reread;
rhdr = (struct obuck_header *)(rbuf + roff);
if (rhdr->magic != OBUCK_MAGIC ||
- rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(bucket_start >> OBUCK_SHIFT))
+ rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT))
{
msg = "header mismatch";
goto broken;
msg = "bucket longer than ShakeBufSize";
goto broken;
}
- rstart = bucket_start + l;
+ rstart = r_bucket_start + l;
roff = 0;
rsize = 0;
goto reread;
{
if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1)))
{
- if (bucket_start == w_bucket_start)
+ if (r_bucket_start == w_bucket_start)
{
/* No copying needed now nor ever in the past, hence woff==0 */
wstart += l;
return;
broken:
- log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) bucket_start, (uns)(bucket_start >> OBUCK_SHIFT));
+ log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT));
if (woff)
{
sh_pwrite(obuck_fd, wbuf, woff, wstart);
wstart += woff;
}
- while (wstart + OBUCK_ALIGN <= bucket_start)
+ while (wstart + OBUCK_ALIGN <= r_bucket_start)
{
u32 check = OBUCK_TRAILER;
obuck_hdr.magic = OBUCK_MAGIC;
obuck_hdr.oid = OBUCK_OID_DELETED;
- if (bucket_start - wstart < 0x40000000)
- obuck_hdr.length = bucket_start - wstart - sizeof(obuck_hdr) - 4;
+ if (r_bucket_start - wstart < 0x40000000)
+ obuck_hdr.length = r_bucket_start - wstart - sizeof(obuck_hdr) - 4;
else
obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4;
sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart);
obuck_init(1);
for(j=0; j<COUNT; j++)
{
- b = obuck_create();
+ b = obuck_create(BUCKET_TYPE_PLAIN);
for(i=0; i<LEN(j); i++)
bputc(b, (i+j) % 256);
obuck_create_end(b, &h);