X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=lib%2Fbucket.c;h=e35f99f8cc011805f6a000ad724eeb3aa6a07ff4;hb=ae9721feabb6690778d9bc6751f6a6f434cdbeaf;hp=6fec2a59eea69cb42766111d06e76a0136be2978;hpb=e9aa499507f33db9fac50f3864609a6b292685e0;p=libucw.git diff --git a/lib/bucket.c b/lib/bucket.c index 6fec2a59..e35f99f8 100644 --- a/lib/bucket.c +++ b/lib/bucket.c @@ -8,8 +8,10 @@ #include "lib/bucket.h" #include "lib/fastbuf.h" #include "lib/lfs.h" +#include "lib/conf.h" #include +#include #include #include #include @@ -19,32 +21,71 @@ static unsigned int obuck_remains, obuck_check_pad; static struct fastbuf *obuck_fb; static struct obuck_header obuck_hdr; static sh_off_t bucket_start; -byte *obuck_name = "db/objects"; /* FIXME */ + +/*** Configuration ***/ + +byte *obuck_name = "not/configured"; +static int obuck_io_buflen = 65536; +static int obuck_shake_buflen = 1048576; + +static struct cfitem obuck_config[] = { + { "Buckets", CT_SECTION, NULL }, + { "BucketFile", CT_STRING, &obuck_name }, + { "BufSize", CT_INT, &obuck_io_buflen }, + { "ShakeBufSize", CT_INT, &obuck_shake_buflen }, + { NULL, CT_STOP, NULL } +}; + +static void CONSTRUCTOR obuck_init_config(void) +{ + cf_register(obuck_config); +} /*** Internal operations ***/ static void obuck_broken(char *msg) { - die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start); /* FIXME */ + die("Object pool corrupted: %s (pos=%Lx)", msg, (long long) bucket_start); +} + +/* + * Unfortunately we cannot use flock() here since it happily permits + * locking a shared fd (e.g., after fork()) multiple times. The fcntl + * locks are very ugly and they don't support 64-bit offsets, but we + * can work around the problem by always locking the first header + * in the file. + */ + +static inline void +obuck_do_lock(int type) +{ + struct flock fl; + + fl.l_type = type; + fl.l_whence = SEEK_SET; + fl.l_start = 0; + fl.l_len = sizeof(struct obuck_header); + if (fcntl(obuck_fd, F_SETLKW, &fl) < 0) + die("fcntl lock: %m"); } static inline void obuck_lock_read(void) { - flock(obuck_fd, LOCK_SH); + obuck_do_lock(F_RDLCK); } static inline void obuck_lock_write(void) { - flock(obuck_fd, LOCK_EX); + obuck_do_lock(F_WRLCK); } static inline void obuck_unlock(void) { - flock(obuck_fd, LOCK_UN); + obuck_do_lock(F_UNLCK); } /*** FastIO emulation ***/ @@ -72,9 +113,7 @@ obuck_fb_refill(struct fastbuf *f) obuck_remains -= limit; if (!obuck_remains) /* Should check the trailer */ { - u32 check; - memcpy(&check, f->buffer + size - 4, 4); - if (check != OBUCK_TRAILER) + if (GET_U32(f->buffer + size - 4) != OBUCK_TRAILER) obuck_broken("Missing trailer"); } return limit; @@ -111,15 +150,16 @@ void obuck_init(int writeable) { struct fastbuf *b; - int buflen = 65536; sh_off_t size; - obuck_fd = open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666); - obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + buflen + OBUCK_ALIGN + 4); - b->buflen = buflen; + obuck_fd = sh_open(obuck_name, (writeable ? O_RDWR | O_CREAT : O_RDONLY), 0666); + if (obuck_fd < 0) + die("Unable to open bucket file %s: %m", obuck_name); + obuck_fb = b = xmalloc_zero(sizeof(struct fastbuf) + obuck_io_buflen + OBUCK_ALIGN + 4); + b->buflen = obuck_io_buflen; b->buffer = (char *)(b+1); b->bptr = b->bstop = b->buffer; - b->bufend = b->buffer + buflen; + b->bufend = b->buffer + obuck_io_buflen; b->name = "bucket"; b->fd = obuck_fd; b->refill = obuck_fb_refill; @@ -145,7 +185,7 @@ obuck_cleanup(void) bclose(obuck_fb); } -void /* FIXME: Call somewhere :) */ +void obuck_sync(void) { bflush(obuck_fb); @@ -157,7 +197,7 @@ obuck_get(oid_t oid) { struct fastbuf *b = obuck_fb; - bucket_start = ((sh_off_t) oid) << OBUCK_SHIFT; + bucket_start = obuck_get_pos(oid); bflush(b); if (sh_pread(obuck_fd, &obuck_hdr, sizeof(obuck_hdr), bucket_start) != sizeof(obuck_hdr)) obuck_broken("Short header read"); @@ -175,6 +215,7 @@ obuck_find_by_oid(struct obuck_header *hdrp) { oid_t oid = hdrp->oid; + ASSERT(oid < OBUCK_OID_FIRST_SPECIAL); obuck_lock_read(); obuck_get(oid); obuck_unlock(); @@ -275,21 +316,125 @@ obuck_delete(oid_t oid) obuck_unlock(); } +/*** Shakedown ***/ + +void +obuck_shakedown(int (*kibitz)(struct obuck_header *old, oid_t new, byte *buck)) +{ + byte *rbuf, *wbuf; + sh_off_t rstart, wstart, w_bucket_start; + int roff, woff, rsize, l; + struct obuck_header *rhdr, *whdr; + + rbuf = xmalloc(obuck_shake_buflen); + wbuf = xmalloc(obuck_shake_buflen); + rstart = wstart = 0; + roff = woff = rsize = 0; + + /* We need to be the only accessor, all the object ID's are becoming invalid */ + obuck_lock_write(); + + for(;;) + { + bucket_start = rstart + roff; + w_bucket_start = wstart + woff; + if (rsize - roff < OBUCK_ALIGN) + goto reread; + rhdr = (struct obuck_header *)(rbuf + roff); + if (rhdr->magic != OBUCK_MAGIC || + rhdr->oid != OBUCK_OID_DELETED && rhdr->oid != (bucket_start >> OBUCK_SHIFT)) + obuck_broken("header mismatch during shakedown"); + l = (sizeof(struct obuck_header) + rhdr->length + 4 + OBUCK_ALIGN - 1) & ~(OBUCK_ALIGN-1); + if (rsize - roff < l) + goto reread; + if (GET_U32(rbuf + roff + l - 4) != OBUCK_TRAILER) + obuck_broken("missing trailer during shakedown"); + if (rhdr->oid != OBUCK_OID_DELETED) + { + if (kibitz(rhdr, w_bucket_start >> OBUCK_SHIFT, (byte *)(rhdr+1))) + { + if (bucket_start == w_bucket_start) + { + /* No copying needed now nor ever in the past, hence woff==0 */ + wstart += l; + } + else + { + if (obuck_shake_buflen - woff < l) + { + if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff) + die("obuck_shakedown write failed: %m"); + wstart += woff; + woff = 0; + } + whdr = (struct obuck_header *)(wbuf+woff); + memcpy(whdr, rhdr, l); + whdr->oid = w_bucket_start >> OBUCK_SHIFT; + woff += l; + } + } + } + else + kibitz(rhdr, OBUCK_OID_DELETED, NULL); + roff += l; + continue; + + reread: + if (roff) + { + memmove(rbuf, rbuf+roff, rsize-roff); + rsize -= roff; + rstart += roff; + roff = 0; + } + l = sh_pread(obuck_fd, rbuf+rsize, obuck_shake_buflen-rsize, rstart+rsize); + if (l < 0) + die("obuck_shakedown read error: %m"); + if (!l) + { + if (!rsize) + break; + obuck_broken("unexpected EOF during shakedown"); + } + rsize += l; + } + if (woff) + { + if (sh_pwrite(obuck_fd, wbuf, woff, wstart) != woff) + die("obuck_shakedown write failed: %m"); + wstart += woff; + } + sh_ftruncate(obuck_fd, wstart); + + obuck_unlock(); + xfree(rbuf); + xfree(wbuf); +} + /*** Testing ***/ #ifdef TEST -#define COUNT 100 +#define COUNT 5000 #define MAXLEN 10000 #define KILLPERC 13 #define LEN(i) ((259309*(i))%MAXLEN) -int main(void) +int main(int argc, char **argv) { int ids[COUNT]; unsigned int i, j, cnt; struct obuck_header h; struct fastbuf *b; + + log_init(NULL); + if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0 || + optind < argc) + { + fputs("This program supports only the following command-line arguments:\n" CF_USAGE, stderr); + exit(1); + } + unlink(obuck_name); obuck_init(1); for(j=0; j