From ed7c7b20afed72df0d4195548ec5346f1daac1d1 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Sun, 7 Dec 2003 14:23:58 +0000 Subject: [PATCH] Improved and cleaned up the bucket library. The original "single operation pending per process" invariant was no longer feasible (and it caused several problems in Shepherd). Reading and writing of buckets now uses dynamically allocated fastbufs and there can be any number of readers at any time, but only a single writer (otherwise a deadlock would occur). Read streams are seekable, write streams at least btell()-able. Also removed the omnipresent global variables for start of current bucket etc., each part (Find, Slurp, Create, Shakedown, ...) has its own state variables. Added some more sanity checks. --- lib/bucket.c | 257 ++++++++++++++++++++++++++++++--------------------- lib/bucket.h | 4 +- 2 files changed, 154 insertions(+), 107 deletions(-) diff --git a/lib/bucket.c b/lib/bucket.c index 51ca9ccc..a513c7cf 100644 --- a/lib/bucket.c +++ b/lib/bucket.c @@ -20,10 +20,9 @@ #include static int obuck_fd; -static unsigned int obuck_remains, obuck_check_pad; -static struct fastbuf *obuck_fb; -static struct obuck_header obuck_hdr; -static sh_off_t bucket_start, bucket_current; +static struct obuck_header obuck_hdr, obuck_create_hdr; +static sh_off_t bucket_find_pos; +static struct fastbuf *obuck_write_fb; /*** Configuration ***/ @@ -49,9 +48,9 @@ static void CONSTRUCTOR obuck_init_config(void) /*** Internal operations ***/ static void -obuck_broken(char *msg) +obuck_broken(char *msg, sh_off_t pos) { - die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start); + die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) pos); } /* @@ -95,33 +94,57 @@ obuck_unlock(void) /*** FastIO emulation ***/ +struct fb_bucket { + struct fastbuf fb; + sh_off_t start_pos; + uns bucket_size; + byte buffer[0]; +}; +#define FB_BUCKET(f) ((struct fb_bucket *)(f)->is_fastbuf) + +static int obuck_fb_count; + +static void +obuck_fb_close(struct fastbuf *f) +{ + obuck_fb_count--; + xfree(f); +} + /* We need to use pread/pwrite since we work on fd's shared between processes */ static int obuck_fb_refill(struct fastbuf *f) { - unsigned limit = (obuck_io_buflen < obuck_remains) ? obuck_io_buflen : obuck_remains; - unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit; - int l; + uns remains, bufsize, size, datasize; - if (!limit) + remains = FB_BUCKET(f)->bucket_size - (uns)f->pos; + bufsize = f->bufend - f->buffer; + if (!remains) return 0; - l = sh_pread(obuck_fd, f->buffer, size, bucket_current); + sh_off_t start = FB_BUCKET(f)->start_pos; + sh_off_t pos = start + sizeof(struct obuck_header) + f->pos; + if (remains <= bufsize) + { + datasize = remains; + size = start + ALIGN(FB_BUCKET(f)->bucket_size + sizeof(struct obuck_header) + 4, OBUCK_ALIGN) - pos; + } + else + size = datasize = bufsize; + int l = sh_pread(obuck_fd, f->buffer, size, pos); if (l < 0) die("Error reading bucket: %m"); if ((unsigned) l != size) - obuck_broken("Short read"); + obuck_broken("Short read", FB_BUCKET(f)->start_pos); f->bptr = f->buffer; - f->bstop = f->buffer + limit; - bucket_current += limit; - f->pos = bucket_current - bucket_start - sizeof(obuck_hdr); - obuck_remains -= limit; - if (!obuck_remains) /* Should check the trailer */ + f->bstop = f->buffer + datasize; + f->pos += datasize; + if (datasize < size) { if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER) - obuck_broken("Missing trailer"); + obuck_broken("Missing trailer", FB_BUCKET(f)->start_pos); } - return limit; + return datasize; } static void @@ -132,15 +155,14 @@ obuck_fb_spout(struct fastbuf *f) while (l) { - int z = sh_pwrite(obuck_fd, c, l, bucket_current); + int z = sh_pwrite(obuck_fd, c, l, FB_BUCKET(f)->start_pos + sizeof(struct obuck_header) + f->pos); if (z <= 0) die("Error writing bucket: %m"); - bucket_current += z; + f->pos += z; l -= z; c += z; } f->bptr = f->buffer; - f->pos = bucket_current - bucket_start - sizeof(obuck_hdr); } /*** Exported functions ***/ @@ -148,29 +170,20 @@ obuck_fb_spout(struct fastbuf *f) void obuck_init(int writeable) { - struct fastbuf *b; sh_off_t size; obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666); if (obuck_fd < 0) die("Unable to open bucket file %s: %m", obuck_name); - obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4); - b->buffer = (char *)(b+1); - b->bptr = b->bstop = b->buffer; - b->bufend = b->buffer + obuck_io_buflen; - b->name = "bucket"; - b->refill = obuck_fb_refill; - b->spout = obuck_fb_spout; obuck_lock_read(); size = sh_seek(obuck_fd, 0, SEEK_END); if (size) { /* If the bucket pool is not empty, check consistency of its end */ u32 check; - bucket_start = size - 4; /* for error reporting */ if (sh_pread(obuck_fd, &check, 4, size-4) != 4 || check != OBUCK_TRAILER) - obuck_broken("Missing trailer of last object"); + obuck_broken("Missing trailer of last object", size - 4); } obuck_unlock(); } @@ -178,34 +191,33 @@ obuck_init(int writeable) void obuck_cleanup(void) { - bclose(obuck_fb); close(obuck_fd); - xfree(obuck_fb); + if (obuck_fb_count) + log(L_ERROR, "Bug: Unbalanced bucket opens/closes: %d streams remain", obuck_fb_count); + if (obuck_write_fb) + log(L_ERROR, "Bug: Forgot to close bucket write stream"); } void obuck_sync(void) { - bflush(obuck_fb); + if (obuck_write_fb) + bflush(obuck_write_fb); fsync(obuck_fd); } static void obuck_get(oid_t oid) { - struct fastbuf *b = obuck_fb; - - bucket_start = obuck_get_pos(oid); - bflush(b); - if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr)) - obuck_broken("Short header read"); - bucket_current = bucket_start + sizeof(obuck_hdr); + bucket_find_pos = obuck_get_pos(oid); + if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos) != sizeof(obuck_hdr)) + obuck_broken("Short header read", bucket_find_pos); if (obuck_hdr.magic != OBUCK_MAGIC) - obuck_broken("Missing magic number"); + obuck_broken("Missing magic number", bucket_find_pos); if (obuck_hdr.oid == OBUCK_OID_DELETED) - obuck_broken("Access to deleted bucket"); + obuck_broken("Access to deleted bucket", bucket_find_pos); if (obuck_hdr.oid != oid) - obuck_broken("Invalid backlink"); + obuck_broken("Invalid backlink", bucket_find_pos); } void @@ -223,7 +235,7 @@ obuck_find_by_oid(struct obuck_header *hdrp) int obuck_find_first(struct obuck_header *hdrp, int full) { - bucket_start = 0; + bucket_find_pos = 0; obuck_hdr.magic = 0; return obuck_find_next(hdrp, full); } @@ -232,24 +244,21 @@ int obuck_find_next(struct obuck_header *hdrp, int full) { int c; - struct fastbuf *b = obuck_fb; for(;;) { if (obuck_hdr.magic) - bucket_start = (bucket_start + sizeof(obuck_hdr) + obuck_hdr.length + - 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1)); - bflush(b); + bucket_find_pos = (bucket_find_pos + sizeof(obuck_hdr) + obuck_hdr.length + + 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1)); obuck_lock_read(); - c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start); + c = sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos); obuck_unlock(); if (!c) return 0; if (c != sizeof(obuck_hdr)) - obuck_broken("Short header read"); - bucket_current = bucket_start + sizeof(obuck_hdr); + obuck_broken("Short header read", bucket_find_pos); if (obuck_hdr.magic != OBUCK_MAGIC) - obuck_broken("Missing magic number"); + obuck_broken("Missing magic number", bucket_find_pos); if (obuck_hdr.oid != OBUCK_OID_DELETED || full) { memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr)); @@ -261,15 +270,30 @@ obuck_find_next(struct obuck_header *hdrp, int full) struct fastbuf * obuck_fetch(void) { - obuck_fb->pos = 0; - obuck_remains = obuck_hdr.length; - obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1); - return obuck_fb; + struct fastbuf *b; + uns official_buflen = ALIGN(MIN(obuck_hdr.length, obuck_io_buflen), OBUCK_ALIGN); + uns real_buflen = official_buflen + OBUCK_ALIGN; + + b = xmalloc(sizeof(struct fb_bucket) + real_buflen); + b->buffer = b->bptr = b->bstop = FB_BUCKET(b)->buffer; + b->bufend = b->buffer + official_buflen; + b->name = "bucket-read"; + b->pos = 0; + b->refill = obuck_fb_refill; + b->spout = NULL; + b->seek = NULL; + b->close = obuck_fb_close; + b->config = NULL; + FB_BUCKET(b)->start_pos = bucket_find_pos; + FB_BUCKET(b)->bucket_size = obuck_hdr.length; + obuck_fb_count++; + return b; } oid_t obuck_predict_last_oid(void) { + /* BEWARE: This is not fork-safe. */ sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END); return size >> OBUCK_SHIFT; } @@ -277,36 +301,55 @@ obuck_predict_last_oid(void) struct fastbuf * obuck_create(u32 type) { + ASSERT(!obuck_write_fb); + obuck_lock_write(); - bflush(obuck_fb); - bucket_start = sh_seek(obuck_fd, 0, SEEK_END); - if (bucket_start & (OBUCK_ALIGN - 1)) - obuck_broken("Misaligned file"); - obuck_hdr.magic = OBUCK_INCOMPLETE_MAGIC; - obuck_hdr.oid = bucket_start >> OBUCK_SHIFT; - obuck_hdr.length = 0; - obuck_hdr.type = type; - bucket_current = bucket_start; - bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr)); - obuck_fb->pos = -sizeof(obuck_hdr); - return obuck_fb; + sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END); + if (start & (OBUCK_ALIGN - 1)) + obuck_broken("Misaligned file", start); + obuck_create_hdr.magic = OBUCK_INCOMPLETE_MAGIC; + obuck_create_hdr.oid = start >> OBUCK_SHIFT; + obuck_create_hdr.length = 0; + obuck_create_hdr.type = type; + + struct fastbuf *b = xmalloc(sizeof(struct fb_bucket) + obuck_io_buflen); + obuck_write_fb = b; + b->buffer = FB_BUCKET(b)->buffer; + b->bptr = b->bstop = b->buffer; + b->bufend = b->buffer + obuck_io_buflen; + b->pos = -(int)sizeof(obuck_create_hdr); + b->name = "bucket-write"; + b->refill = NULL; + b->spout = obuck_fb_spout; + b->seek = NULL; + b->close = NULL; + b->config = NULL; + FB_BUCKET(b)->start_pos = start; + FB_BUCKET(b)->bucket_size = 0; + bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr)); + + return b; } void -obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp) +obuck_create_end(struct fastbuf *b, struct obuck_header *hdrp) { - int pad; - obuck_hdr.magic = OBUCK_MAGIC; - obuck_hdr.length = btell(obuck_fb); - pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1); + ASSERT(b == obuck_write_fb); + obuck_write_fb = NULL; + + obuck_create_hdr.magic = OBUCK_MAGIC; + obuck_create_hdr.length = btell(b); + int pad = (OBUCK_ALIGN - sizeof(obuck_create_hdr) - obuck_create_hdr.length - 4) & (OBUCK_ALIGN - 1); while (pad--) - bputc(obuck_fb, 0); - bputl(obuck_fb, OBUCK_TRAILER); - bflush(obuck_fb); - ASSERT(!(bucket_current & (OBUCK_ALIGN - 1))); - sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start); + bputc(b, 0); + bputl(b, OBUCK_TRAILER); + bflush(b); + ASSERT(!((FB_BUCKET(b)->start_pos + sizeof(obuck_create_hdr) + b->pos) & (OBUCK_ALIGN - 1))); + if (sh_pwrite(obuck_fd, &obuck_create_hdr, sizeof(obuck_create_hdr), FB_BUCKET(b)->start_pos) != sizeof(obuck_create_hdr)) + die("Bucket header update failed: %m"); obuck_unlock(); - memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr)); + memcpy(hdrp, &obuck_create_hdr, sizeof(obuck_create_hdr)); + xfree(b); } void @@ -315,27 +358,29 @@ obuck_delete(oid_t oid) obuck_lock_write(); obuck_get(oid); obuck_hdr.oid = OBUCK_OID_DELETED; - sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start); + sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_find_pos); obuck_unlock(); } /*** Fast reading of the whole pool ***/ static struct fastbuf *obuck_rpf; +static uns slurp_remains; +static sh_off_t slurp_start, slurp_current; static int obuck_slurp_refill(struct fastbuf *f) { uns l; - if (!obuck_remains) + if (!slurp_remains) return 0; l = bdirect_read_prepare(obuck_rpf, &f->buffer); if (!l) - obuck_broken("Incomplete object"); - l = MIN(l, obuck_remains); + obuck_broken("Incomplete object", slurp_start); + l = MIN(l, slurp_remains); bdirect_read_commit(obuck_rpf, f->buffer + l); - obuck_remains -= l; + slurp_remains -= l; f->bptr = f->buffer; f->bufend = f->bstop = f->buffer + l; return 1; @@ -356,11 +401,11 @@ obuck_slurp_pool(struct obuck_header *hdrp) } else { - bsetpos(obuck_rpf, bucket_current - 4); + bsetpos(obuck_rpf, slurp_current - 4); if (bgetl(obuck_rpf) != OBUCK_TRAILER) - obuck_broken("Missing trailer"); + obuck_broken("Missing trailer", slurp_start); } - bucket_start = btell(obuck_rpf); + slurp_start = btell(obuck_rpf); l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header)); if (!l) { @@ -370,16 +415,16 @@ obuck_slurp_pool(struct obuck_header *hdrp) return NULL; } if (l != sizeof(struct obuck_header)) - obuck_broken("Short header read"); + obuck_broken("Short header read", slurp_start); if (hdrp->magic != OBUCK_MAGIC) - obuck_broken("Missing magic number"); - bucket_current = (bucket_start + sizeof(obuck_hdr) + hdrp->length + - 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1)); + obuck_broken("Missing magic number", slurp_start); + slurp_current = (slurp_start + sizeof(obuck_hdr) + hdrp->length + + 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1)); } while (hdrp->oid == OBUCK_OID_DELETED); - if (obuck_get_pos(hdrp->oid) != bucket_start) - obuck_broken("Invalid backlink"); - obuck_remains = hdrp->length; + if (obuck_get_pos(hdrp->oid) != slurp_start) + obuck_broken("Invalid backlink", slurp_start); + slurp_remains = hdrp->length; limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL; limiter.name = "Bucket"; limiter.pos = 0; @@ -393,7 +438,7 @@ void obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) { byte *rbuf, *wbuf, *msg; - sh_off_t rstart, wstart, w_bucket_start; + sh_off_t rstart, wstart, r_bucket_start, w_bucket_start; int roff, woff, rsize, l; struct obuck_header *rhdr, *whdr; @@ -407,13 +452,13 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) for(;;) { - bucket_start = rstart + roff; + r_bucket_start = rstart + roff; w_bucket_start = wstart + woff; if (rsize - roff < OBUCK_ALIGN) goto reread; rhdr = (struct obuck_header *)(rbuf + roff); if (rhdr->magic != OBUCK_MAGIC || - rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(bucket_start >> OBUCK_SHIFT)) + rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (oid_t)(r_bucket_start >> OBUCK_SHIFT)) { msg = "header mismatch"; goto broken; @@ -426,7 +471,7 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) msg = "bucket longer than ShakeBufSize"; goto broken; } - rstart = bucket_start + l; + rstart = r_bucket_start + l; roff = 0; rsize = 0; goto reread; @@ -442,7 +487,7 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) { if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1))) { - if (bucket_start == w_bucket_start) + if (r_bucket_start == w_bucket_start) { /* No copying needed now nor ever in the past, hence woff==0 */ wstart += l; @@ -502,19 +547,19 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) return; broken: - log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) bucket_start, (uns)(bucket_start >> OBUCK_SHIFT)); + log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) r_bucket_start, (uns)(r_bucket_start >> OBUCK_SHIFT)); if (woff) { sh_pwrite(obuck_fd, wbuf, woff, wstart); wstart += woff; } - while (wstart + OBUCK_ALIGN <= bucket_start) + while (wstart + OBUCK_ALIGN <= r_bucket_start) { u32 check = OBUCK_TRAILER; obuck_hdr.magic = OBUCK_MAGIC; obuck_hdr.oid = OBUCK_OID_DELETED; - if (bucket_start - wstart < 0x40000000) - obuck_hdr.length = bucket_start - wstart - sizeof(obuck_hdr) - 4; + if (r_bucket_start - wstart < 0x40000000) + obuck_hdr.length = r_bucket_start - wstart - sizeof(obuck_hdr) - 4; else obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4; sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart); @@ -552,7 +597,7 @@ int main(int argc, char **argv) obuck_init(1); for(j=0; j