X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=lib%2Fbucket.c;h=f81f03e8d3f5444d0af9d3176edf932f0fe1b5a3;hb=0d56b467cc2184f00a4e390f95596704d819dfa3;hp=3516e85126eaa3b0d914245b0cc3dbcc940d236d;hpb=f50cb528043f2b11b073445965368ce1dbdae0ad;p=libucw.git diff --git a/lib/bucket.c b/lib/bucket.c index 3516e851..f81f03e8 100644 --- a/lib/bucket.c +++ b/lib/bucket.c @@ -2,6 +2,7 @@ * Sherlock Library -- Object Buckets * * (c) 2001--2004 Martin Mares + * (c) 2004 Robert Spalek * * This software may be freely distributed and used according to the terms * of the GNU Lesser General Public License. @@ -59,22 +60,39 @@ obuck_broken(char *msg, sh_off_t pos) } /* - * Unfortunately we cannot use flock() here since it happily permits - * locking a shared fd (e.g., after fork()) multiple times. The fcntl - * locks are very ugly and they don't support 64-bit offsets, but we - * can work around the problem by always locking the first header - * in the file. + * We need several types of locks: + * + * Read lock reading parts of bucket file + * Write lock any write operations + * Append lock appending to the end of the file + * Scan lock reading parts which we are certain they exist + * + * Multiple read and scan locks can co-exist together. + * Scan locks can co-exist with an append lock. + * There can be at most one write/append lock at a time. + * + * These lock types map to a pair of normal read-write locks which + * we represent as fcntl() locks on the first and second byte of the + * bucket file. [We cannot use flock() since it happily permits + * locking a shared fd (e.g., after fork()) multiple times at it also + * doesn't offer multiple locks on a single file.] + * + * byte0 byte1 + * Read + * Write + * Append - + * Scan - */ static inline void -obuck_do_lock(int type) +obuck_do_lock(int type, int start, int len) { struct flock fl; fl.l_type = type; fl.l_whence = SEEK_SET; - fl.l_start = 0; - fl.l_len = sizeof(struct obuck_header); + fl.l_start = start; + fl.l_len = len; if (fcntl(obuck_fd, F_SETLKW, &fl) < 0) die("fcntl lock: %m"); } @@ -82,19 +100,31 @@ obuck_do_lock(int type) inline void obuck_lock_read(void) { - obuck_do_lock(F_RDLCK); + obuck_do_lock(F_RDLCK, 0, 2); } inline void obuck_lock_write(void) { - obuck_do_lock(F_WRLCK); + obuck_do_lock(F_WRLCK, 0, 2); +} + +static inline void +obuck_lock_append(void) +{ + obuck_do_lock(F_WRLCK, 0, 1); +} + +static inline void +obuck_lock_read_to_scan(void) +{ + obuck_do_lock(F_UNLCK, 0, 1); } inline void obuck_unlock(void) { - obuck_do_lock(F_UNLCK); + obuck_do_lock(F_UNLCK, 0, 2); } /*** FastIO emulation ***/ @@ -124,9 +154,10 @@ obuck_fb_refill(struct fastbuf *f) uns remains, bufsize, size, datasize; remains = FB_BUCKET(f)->bucket_size - (uns)f->pos; - bufsize = f->bufend - f->buffer; if (!remains) return 0; + f->buffer = FB_BUCKET(f)->buffer; /* Could have been trimmed by bdirect_read_commit_modified() */ + bufsize = f->bufend - f->buffer; sh_off_t start = FB_BUCKET(f)->start_pos; sh_off_t pos = start + sizeof(struct obuck_header) + f->pos; if (remains <= bufsize) @@ -288,6 +319,7 @@ obuck_fetch(void) b->seek = NULL; b->close = obuck_fb_close; b->config = NULL; + b->can_overwrite_buffer = 2; FB_BUCKET(b)->start_pos = bucket_find_pos; FB_BUCKET(b)->bucket_size = obuck_hdr.length; obuck_fb_count++; @@ -306,7 +338,7 @@ obuck_create(u32 type) { ASSERT(!obuck_write_fb); - obuck_lock_write(); + obuck_lock_append(); sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END); if (start & (OBUCK_ALIGN - 1)) obuck_broken("Misaligned file", start); @@ -327,6 +359,7 @@ obuck_create(u32 type) b->seek = NULL; b->close = NULL; b->config = NULL; + b->can_overwrite_buffer = 0; FB_BUCKET(b)->start_pos = start; FB_BUCKET(b)->bucket_size = 0; bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr)); @@ -369,19 +402,20 @@ obuck_delete(oid_t oid) static struct fastbuf *obuck_rpf; static uns slurp_remains; -static sh_off_t slurp_start, slurp_current; +static sh_off_t slurp_start, slurp_current, slurp_end; static int obuck_slurp_refill(struct fastbuf *f) { - uns l; - if (!slurp_remains) return 0; - l = bdirect_read_prepare(obuck_rpf, &f->buffer); + uns l = bdirect_read_prepare(obuck_rpf, &f->buffer); if (!l) obuck_broken("Incomplete object", slurp_start); l = MIN(l, slurp_remains); + /* XXX: This probably should be bdirect_read_commit_modified() in some cases, + * but it doesn't hurt since we aren't going to seek. + */ bdirect_read_commit(obuck_rpf, f->buffer + l); slurp_remains -= l; f->bptr = f->buffer; @@ -401,6 +435,8 @@ obuck_slurp_pool(struct obuck_header *hdrp) { obuck_lock_read(); obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen); + slurp_end = bfilesize(obuck_rpf); + obuck_lock_read_to_scan(); } else { @@ -409,7 +445,10 @@ obuck_slurp_pool(struct obuck_header *hdrp) obuck_broken("Missing trailer", slurp_start); } slurp_start = btell(obuck_rpf); - l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header)); + if (slurp_start < slurp_end) + l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header)); + else + l = 0; if (!l) { bclose(obuck_rpf); @@ -431,6 +470,7 @@ obuck_slurp_pool(struct obuck_header *hdrp) limiter.name = "Bucket"; limiter.pos = 0; limiter.refill = obuck_slurp_refill; + limiter.can_overwrite_buffer = obuck_rpf->can_overwrite_buffer; return &limiter; }