X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=lib%2Fbucket.c;h=af6b46bf1ffcd6a40ca9624ea481bfa97d548943;hb=f5be42dd4a3346059819fdba379e7abac9549625;hp=5e974eb24e448407157cfe426e8920893d690b7a;hpb=1f22635601383f5dc6763340435c7247590c8f01;p=libucw.git diff --git a/lib/bucket.c b/lib/bucket.c index 5e974eb2..af6b46bf 100644 --- a/lib/bucket.c +++ b/lib/bucket.c @@ -1,7 +1,7 @@ /* * Sherlock Library -- Object Buckets * - * (c) 2001 Martin Mares + * (c) 2001--2003 Martin Mares * * This software may be freely distributed and used according to the terms * of the GNU Lesser General Public License. @@ -23,19 +23,21 @@ static int obuck_fd; static unsigned int obuck_remains, obuck_check_pad; static struct fastbuf *obuck_fb; static struct obuck_header obuck_hdr; -static sh_off_t bucket_start; +static sh_off_t bucket_start, bucket_current; /*** Configuration ***/ byte *obuck_name = "not/configured"; -static int obuck_io_buflen = 65536; +static uns obuck_io_buflen = 65536; static int obuck_shake_buflen = 1048576; +static uns obuck_slurp_buflen = 65536; static struct cfitem obuck_config[] = { { "Buckets", CT_SECTION, NULL }, { "BucketFile", CT_STRING, &obuck_name }, { "BufSize", CT_INT, &obuck_io_buflen }, { "ShakeBufSize", CT_INT, &obuck_shake_buflen }, + { "SlurpBufSize", CT_INT, &obuck_slurp_buflen }, { NULL, CT_STOP, NULL } }; @@ -73,19 +75,19 @@ obuck_do_lock(int type) die("fcntl lock: %m"); } -static inline void +inline void obuck_lock_read(void) { obuck_do_lock(F_RDLCK); } -static inline void +inline void obuck_lock_write(void) { obuck_do_lock(F_WRLCK); } -static inline void +inline void obuck_unlock(void) { obuck_do_lock(F_UNLCK); @@ -98,21 +100,21 @@ obuck_unlock(void) static int obuck_fb_refill(struct fastbuf *f) { - unsigned limit = (f->buflen < obuck_remains) ? f->buflen : obuck_remains; + unsigned limit = (obuck_io_buflen < obuck_remains) ? obuck_io_buflen : obuck_remains; unsigned size = (limit == obuck_remains) ? (limit+obuck_check_pad+4) : limit; int l; if (!limit) return 0; - l = sh_pread(f->fd, f->buffer, size, f->fdpos); + l = sh_pread(obuck_fd, f->buffer, size, bucket_current); if (l < 0) die("Error reading bucket: %m"); if ((unsigned) l != size) obuck_broken("Short read"); f->bptr = f->buffer; f->bstop = f->buffer + limit; - f->pos = f->fdpos; - f->fdpos += limit; + bucket_current += limit; + f->pos = bucket_current - bucket_start - sizeof(obuck_hdr); obuck_remains -= limit; if (!obuck_remains) /* Should check the trailer */ { @@ -130,21 +132,15 @@ obuck_fb_spout(struct fastbuf *f) while (l) { - int z = sh_pwrite(f->fd, c, l, f->fdpos); + int z = sh_pwrite(obuck_fd, c, l, bucket_current); if (z <= 0) die("Error writing bucket: %m"); - f->fdpos += z; + bucket_current += z; l -= z; c += z; } f->bptr = f->buffer; - f->pos = f->fdpos; -} - -static void -obuck_fb_close(struct fastbuf *f) -{ - close(f->fd); + f->pos = bucket_current - bucket_start - sizeof(obuck_hdr); } /*** Exported functions ***/ @@ -159,15 +155,12 @@ obuck_init(int writeable) if (obuck_fd < 0) die("Unable to open bucket file %s: %m", obuck_name); obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4); - b->buflen = obuck_io_buflen; b->buffer = (char *)(b+1); b->bptr = b->bstop = b->buffer; b->bufend = b->buffer + obuck_io_buflen; b->name = "bucket"; - b->fd = obuck_fd; b->refill = obuck_fb_refill; b->spout = obuck_fb_spout; - b->close = obuck_fb_close; obuck_lock_read(); size = sh_seek(obuck_fd, 0, SEEK_END); if (size) @@ -186,6 +179,8 @@ void obuck_cleanup(void) { bclose(obuck_fb); + close(obuck_fd); + xfree(obuck_fb); } void @@ -204,7 +199,7 @@ obuck_get(oid_t oid) bflush(b); if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr)) obuck_broken("Short header read"); - b->fdpos = bucket_start + sizeof(obuck_hdr); + bucket_current = bucket_start + sizeof(obuck_hdr); if (obuck_hdr.magic != OBUCK_MAGIC) obuck_broken("Missing magic number"); if (obuck_hdr.oid == OBUCK_OID_DELETED) @@ -252,7 +247,7 @@ obuck_find_next(struct obuck_header *hdrp, int full) return 0; if (c != sizeof(obuck_hdr)) obuck_broken("Short header read"); - b->fdpos = bucket_start + sizeof(obuck_hdr); + bucket_current = bucket_start + sizeof(obuck_hdr); if (obuck_hdr.magic != OBUCK_MAGIC) obuck_broken("Missing magic number"); if (obuck_hdr.oid != OBUCK_OID_DELETED || full) @@ -266,6 +261,7 @@ obuck_find_next(struct obuck_header *hdrp, int full) struct fastbuf * obuck_fetch(void) { + obuck_fb->pos = 0; obuck_remains = obuck_hdr.length; obuck_check_pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1); return obuck_fb; @@ -276,8 +272,15 @@ obuck_fetch_end(struct fastbuf *b UNUSED) { } +oid_t +obuck_predict_last_oid(void) +{ + sh_off_t size = sh_seek(obuck_fd, 0, SEEK_END); + return size >> OBUCK_SHIFT; +} + struct fastbuf * -obuck_create(void) +obuck_create(u32 type) { obuck_lock_write(); bflush(obuck_fb); @@ -286,9 +289,11 @@ obuck_create(void) obuck_broken("Misaligned file"); obuck_hdr.magic = OBUCK_INCOMPLETE_MAGIC; obuck_hdr.oid = bucket_start >> OBUCK_SHIFT; - obuck_hdr.length = obuck_hdr.orig_length = 0; - obuck_fb->fdpos = obuck_fb->pos = bucket_start; + obuck_hdr.length = 0; + obuck_hdr.type = type; + bucket_current = bucket_start; bwrite(obuck_fb, &obuck_hdr, sizeof(obuck_hdr)); + obuck_fb->pos = -sizeof(obuck_hdr); return obuck_fb; } @@ -297,13 +302,13 @@ obuck_create_end(struct fastbuf *b UNUSED, struct obuck_header *hdrp) { int pad; obuck_hdr.magic = OBUCK_MAGIC; - obuck_hdr.length = obuck_hdr.orig_length = btell(obuck_fb) - bucket_start - sizeof(obuck_hdr); + obuck_hdr.length = btell(obuck_fb); pad = (OBUCK_ALIGN - sizeof(obuck_hdr) - obuck_hdr.length - 4) & (OBUCK_ALIGN - 1); while (pad--) bputc(obuck_fb, 0); bputl(obuck_fb, OBUCK_TRAILER); bflush(obuck_fb); - ASSERT(!(btell(obuck_fb) & (OBUCK_ALIGN - 1))); + ASSERT(!(bucket_current & (OBUCK_ALIGN - 1))); sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start); obuck_unlock(); memcpy(hdrp, &obuck_hdr, sizeof(obuck_hdr)); @@ -319,6 +324,74 @@ obuck_delete(oid_t oid) obuck_unlock(); } +/*** Fast reading of the whole pool ***/ + +static struct fastbuf *obuck_rpf; + +static int +obuck_slurp_refill(struct fastbuf *f) +{ + uns l; + + if (!obuck_remains) + return 0; + l = bdirect_read_prepare(obuck_rpf, &f->buffer); + if (!l) + obuck_broken("Incomplete object"); + l = MIN(l, obuck_remains); + bdirect_read_commit(obuck_rpf, f->buffer + l); + obuck_remains -= l; + f->bptr = f->buffer; + f->bufend = f->bstop = f->buffer + l; + return 1; +} + +struct fastbuf * +obuck_slurp_pool(struct obuck_header *hdrp) +{ + static struct fastbuf limiter; + uns l; + + do + { + if (!obuck_rpf) + { + obuck_lock_read(); + obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen); + } + else + { + bsetpos(obuck_rpf, bucket_current - 4); + if (bgetl(obuck_rpf) != OBUCK_TRAILER) + obuck_broken("Missing trailer"); + } + bucket_start = btell(obuck_rpf); + l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header)); + if (!l) + { + bclose(obuck_rpf); + obuck_rpf = NULL; + obuck_unlock(); + return NULL; + } + if (l != sizeof(struct obuck_header)) + obuck_broken("Short header read"); + if (hdrp->magic != OBUCK_MAGIC) + obuck_broken("Missing magic number"); + bucket_current = (bucket_start + sizeof(obuck_hdr) + hdrp->length + + 4 + OBUCK_ALIGN - 1) & ~((sh_off_t)(OBUCK_ALIGN - 1)); + } + while (hdrp->oid == OBUCK_OID_DELETED); + if (obuck_get_pos(hdrp->oid) != bucket_start) + obuck_broken("Invalid backlink"); + obuck_remains = hdrp->length; + limiter.bptr = limiter.bstop = limiter.buffer = limiter.bufend = NULL; + limiter.name = "Bucket"; + limiter.pos = 0; + limiter.refill = obuck_slurp_refill; + return &limiter; +} + /*** Shakedown ***/ void @@ -351,6 +424,18 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) goto broken; } l = (sizeof(struct obuck_header) + rhdr->length + 4 + OBUCK_ALIGN - 1) & ~(OBUCK_ALIGN-1); + if (l > obuck_shake_buflen) + { + if (rhdr->oid != OBUCK_OID_DELETED) + { + msg = "bucket longer than ShakeBufSize"; + goto broken; + } + rstart = bucket_start + l; + roff = 0; + rsize = 0; + goto reread; + } if (rsize - roff < l) goto reread; if (GET_U32(rbuf + roff + l - 4) != OBUCK_TRAILER) @@ -422,7 +507,7 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) return; broken: - log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld), gathering debris", msg, (long long) bucket_start); + log(L_ERROR, "Error during object pool shakedown: %s (pos=%Ld, id=%x), gathering debris", msg, (long long) bucket_start, (uns)(bucket_start >> OBUCK_SHIFT)); if (woff) { sh_pwrite(obuck_fd, wbuf, woff, wstart); @@ -437,7 +522,6 @@ obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) obuck_hdr.length = bucket_start - wstart - sizeof(obuck_hdr) - 4; else obuck_hdr.length = 0x40000000 - sizeof(obuck_hdr) - 4; - obuck_hdr.orig_length = obuck_hdr.length; sh_pwrite(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), wstart); wstart += sizeof(obuck_hdr) + obuck_hdr.length + 4; sh_pwrite(obuck_fd, &check, 4, wstart-4); @@ -477,7 +561,7 @@ int main(int argc, char **argv) for(i=0; i %d\n", h.oid, h.orig_length, h.length); + printf("Writing %08x %d\n", h.oid, h.length); ids[j] = h.oid; } for(j=0; j %d\n", h.oid, h.orig_length, h.length); - if (h.orig_length != LEN(j)) + printf("Reading %08x %d\n", h.oid, h.length); + if (h.length != LEN(j)) die("Invalid length"); - for(i=0; i