From 535e4b8371c566d0542be2188d06a5af59bbfdb8 Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Fri, 5 Jan 2001 23:22:16 +0000 Subject: [PATCH] Rewrote to use pread/pwrite() and to avoid messing up with internals of the fb-file module. It should work with file descriptors shared between processes (no need to re-open the file after fork()) and should be also somewhat faster. Basic functions already work, it remains to create some utilities and implement compression. make obj/lib/bucket-t for a small test program. --- lib/bucket.c | 327 ++++++++++++++++++++++++++++++++++++--------------- lib/bucket.h | 28 +++-- 2 files changed, 250 insertions(+), 105 deletions(-) diff --git a/lib/bucket.c b/lib/bucket.c index 2e6db98b..c5acea70 100644 --- a/lib/bucket.c +++ b/lib/bucket.c @@ -2,13 +2,12 @@ * Sherlock Library -- Object Buckets * * (c) 2001 Martin Mares - * - * Warning: Touches internals of the fb-file module! */ #include "lib/lib.h" #include "lib/bucket.h" #include "lib/fastbuf.h" +#include "lib/lfs.h" #include #include @@ -16,28 +15,18 @@ #include static int obuck_fd; +static unsigned int obuck_remains, obuck_check_pad; static struct fastbuf *obuck_fb; static struct obuck_header obuck_hdr; -static sh_off_t start_of_this, start_of_next; +static sh_off_t bucket_start; static char *obuck_name = "db/objects"; /* FIXME */ -void -obuck_init(int writeable) -{ - obuck_fb = bopen(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 65536); - obuck_fd = obuck_fb->fd; -} - -void -obuck_cleanup(void) -{ - bclose(obuck_fb); -} +/*** Internal operations ***/ static void obuck_broken(char *msg) { - die("Object pool corrupted: %s", msg); /* FIXME */ + die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start); /* FIXME */ } static inline void @@ -58,12 +47,122 @@ obuck_unlock(void) flock(obuck_fd, LOCK_UN); } +/*** FastIO emulation ***/ + +/* We need to use pread/pwrite since we work on fd's shared between processes */ + +static int +obuck_fb_refill(struct fastbuf *f) +{ + unsigned limit = (f->buflen < obuck_remains) ? f->buflen : obuck_remains; + unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit; + int l; + + if (!limit) + return 0; + l = pread(f->fd, f->buffer, size, f->fdpos); + if (l < 0) + die("Error reading bucket: %m"); + if ((unsigned) l != size) + obuck_broken("Short read"); + f->bptr = f->buffer; + f->bstop = f->buffer + limit; + f->pos = f->fdpos; + f->fdpos += limit; + obuck_remains -= limit; + if (!obuck_remains) /* Should check the trailer */ + { + u32 check; + memcpy(&check, f->buffer + size - 4, 4); + if (check != OBUCK_TRAILER) + obuck_broken("Missing trailer"); + } + return limit; +} + static void -obuck_fetch_header(oid_t oid) +obuck_fb_spout(struct fastbuf *f) { - start_of_this = ((sh_off_t) oid) << OBUCK_SHIFT; - bsetpos(obuck_fb, start_of_this); - bread(obuck_fb, &obuck_hdr, sizeof(obuck_hdr)); + int l = f->bptr - f->buffer; + char *c = f->buffer; + + while (l) + { + int z = pwrite(f->fd, c, l, f->fdpos); + if (z <= 0) + die("Error writing bucket: %m"); + f->fdpos += z; + l -= z; + c += z; + } + f->bptr = f->buffer; + f->pos = f->fdpos; +} + +static void +obuck_fb_close(struct fastbuf *f) +{ + close(f->fd); +} + +/*** Exported functions ***/ + +void +obuck_init(int writeable) +{ + struct fastbuf *b; + int buflen = 65536; + sh_off_t size; + + obuck_fd = open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666); + obuck_fb = b = xmalloc(sizeof(struct fastbuf) + buflen + OBUCK_ALIGN + 4); + bzero(b, sizeof(struct fastbuf)); + b->buflen = buflen; + b->buffer = (char *)(b+1); + b->bptr = b->bstop = b->buffer; + b->bufend = b->buffer + buflen; + b->name = "bucket"; + b->fd = obuck_fd; + b->refill = obuck_fb_refill; + b->spout = obuck_fb_spout; + b->close = obuck_fb_close; + obuck_lock_read(); + size = sh_seek(obuck_fd, 0, SEEK_END); + if (size) + { + /* If the bucket pool is not empty, check consistency of its end */ + u32 check; + bucket_start = size - 4; /* for error reporting */ + if (pread(obuck_fd, &check, 4, size-4) != 4 || + check != OBUCK_TRAILER) + obuck_broken("Missing trailer of last object"); + } + obuck_unlock(); +} + +void +obuck_cleanup(void) +{ + bclose(obuck_fb); +} + +void /* FIXME: Call somewhere :) */ +obuck_sync(void) +{ + bflush(obuck_fb); + fsync(obuck_fd); +} + +static void +obuck_get(oid_t oid) +{ + struct fastbuf *b = obuck_fb; + + bucket_start = ((sh_off_t) oid) << OBUCK_SHIFT; + bflush(b); + if (pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr)) + obuck_broken("Short header read"); + b->fdpos = bucket_start + sizeof(obuck_hdr); if (obuck_hdr.magic != OBUCK_MAGIC) obuck_broken("Missing magic number"); if (obuck_hdr.oid == OBUCK_OID_DELETED) @@ -72,59 +171,97 @@ obuck_fetch_header(oid_t oid) obuck_broken("Invalid backlink"); } -struct fastbuf * -obuck_fetch(struct obuck_header *hdrp) +void +obuck_find_by_oid(struct obuck_header *hdrp) { + oid_t oid = hdrp->oid; + obuck_lock_read(); - obuck_fetch_header(hdrp->oid); + obuck_get(oid); + obuck_unlock(); memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr)); - return obuck_fb; } -void -obuck_fetch_abort(struct fastbuf *b UNUSED) +int +obuck_find_first(struct obuck_header *hdrp) { - obuck_unlock(); + bucket_start = 0; + obuck_hdr.magic = 0; + return obuck_find_next(hdrp); +} + +int +obuck_find_next(struct obuck_header *hdrp) +{ + int c; + struct fastbuf *b = obuck_fb; + + for(;;) + { + if (obuck_hdr.magic) + bucket_start = (bucket_start + sizeof(obuck_hdr) + obuck_hdr.length + + 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1)); + bflush(b); + obuck_lock_read(); + c = pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start); + obuck_unlock(); + if (!c) + return 0; + if (c != sizeof(obuck_hdr)) + obuck_broken("Short header read"); + b->fdpos = bucket_start + sizeof(obuck_hdr); + if (obuck_hdr.magic != OBUCK_MAGIC) + obuck_broken("Missing magic number"); + if (obuck_hdr.oid != OBUCK_OID_DELETED) + { + memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr)); + return 1; + } + } +} + +struct fastbuf * +obuck_fetch(void) +{ + obuck_remains = obuck_hdr.length; + obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1); + return obuck_fb; } void obuck_fetch_end(struct fastbuf *b UNUSED) { - if (bgetl(b) != OBUCK_TRAILER) - obuck_broken("Corrupted trailer"); - obuck_unlock(); } struct fastbuf * -obuck_write(void) +obuck_create(void) { obuck_lock_write(); - bseek(obuck_fb, 0, SEEK_END); - start_of_this = btell(obuck_fb); - if (start_of_this & (OBUCK_ALIGN - 1)) + bflush(obuck_fb); + bucket_start = sh_seek(obuck_fd, 0, SEEK_END); + if (bucket_start & (OBUCK_ALIGN - 1)) obuck_broken("Misaligned file"); obuck_hdr.magic = 0; - obuck_hdr.oid = start_of_this >> OBUCK_SHIFT; + obuck_hdr.oid = bucket_start >> OBUCK_SHIFT; obuck_hdr.length = obuck_hdr.orig_length = 0; + obuck_fb->fdpos = obuck_fb->pos = bucket_start; bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr)); return obuck_fb; } void -obuck_write_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp) +obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp) { int pad; obuck_hdr.magic = OBUCK_MAGIC; - obuck_hdr.length = obuck_hdr.orig_length = btell(obuck_fb) - start_of_this - sizeof(obuck_hdr); - bputl(obuck_fb, OBUCK_TRAILER); + obuck_hdr.length = obuck_hdr.orig_length = btell(obuck_fb) - bucket_start - sizeof(obuck_hdr); pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1); while (pad--) bputc(obuck_fb, 0); + bputl(obuck_fb, OBUCK_TRAILER); bflush(obuck_fb); - bsetpos(obuck_fb, start_of_this); - /* FIXME: Can be replaced with single pwrite */ - bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr)); - bflush(obuck_fb); + ASSERT(!(btell(obuck_fb) & (OBUCK_ALIGN - 1))); + pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start); obuck_unlock(); memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr)); } @@ -133,75 +270,73 @@ void obuck_delete(oid_t oid) { obuck_lock_write(); - obuck_fetch_header(oid); + obuck_get(oid); obuck_hdr.oid = OBUCK_OID_DELETED; - bflush(obuck_fb); - bsetpos(obuck_fb, start_of_this); - bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr)); - bflush(obuck_fb); + pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start); obuck_unlock(); } -struct fastbuf * -obuck_walk_init(void) -{ - start_of_this = start_of_next = 0; - obuck_lock_read(); - return obuck_fb; -} - -struct fastbuf * -obuck_walk_next(struct fastbuf *b, struct obuck_header *hdrp) -{ - int c; +/*** Testing ***/ -restart: - start_of_this = start_of_next; - bsetpos(b, start_of_this); - c = bgetc(b); - if (c < 0) - return NULL; - bungetc(b, c); - bread(b, &obuck_hdr, sizeof(obuck_hdr)); - if (obuck_hdr.magic != OBUCK_MAGIC) - obuck_broken("Missing magic number"); - start_of_next = (start_of_this + sizeof(obuck_hdr) + obuck_hdr.orig_length + - 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1)); - if (obuck_hdr.oid == OBUCK_OID_DELETED) - goto restart; - memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr)); - return b; -} +#ifdef TEST -void -obuck_walk_end(struct fastbuf *b UNUSED) -{ - obuck_unlock(); -} +#define COUNT 100 +#define MAXLEN 10000 +#define KILLPERC 13 +#define LEN(i) ((259309*(i))%MAXLEN) -#ifdef TEST int main(void) { - int i, j; + int ids[COUNT]; + unsigned int i, j, cnt; struct obuck_header h; struct fastbuf *b; + unlink(obuck_name); obuck_init(1); - for(j=0; j<100; j++) + for(j=0; j %d\n", h.oid, h.orig_length, h.length); + ids[j] = h.oid; } - obuck_delete(0); - b = obuck_walk_init(); - while (b = obuck_walk_next(b, &h)) - { - printf("<<< %08x\t%d\n", h.oid, h.orig_length); - } - obuck_walk_end(b); + for(j=0; j= KILLPERC) + { + cnt++; + h.oid = ids[j]; + obuck_find_by_oid(&h); + b = obuck_fetch(); + printf("Reading %08x %d -> %d\n", h.oid, h.orig_length, h.length); + if (h.orig_length != LEN(j)) + die("Invalid length"); + for(i=0; i