X-Git-Url: http://mj.ucw.cz/gitweb/?a=blobdiff_plain;f=lib%2Fbucket.c;h=4a5533cfa6ef19e485e8a50d6d43429799b540f1;hb=61da975c346c50ccf9bb9cb7271ef538dc15ab32;hp=3516e85126eaa3b0d914245b0cc3dbcc940d236d;hpb=f50cb528043f2b11b073445965368ce1dbdae0ad;p=libucw.git diff --git a/lib/bucket.c b/lib/bucket.c index 3516e851..4a5533cf 100644 --- a/lib/bucket.c +++ b/lib/bucket.c @@ -2,6 +2,7 @@ * Sherlock Library -- Object Buckets * * (c) 2001--2004 Martin Mares + * (c) 2004 Robert Spalek * * This software may be freely distributed and used according to the terms * of the GNU Lesser General Public License. @@ -59,22 +60,39 @@ obuck_broken(char *msg, sh_off_t pos) } /* - * Unfortunately we cannot use flock() here since it happily permits - * locking a shared fd (e.g., after fork()) multiple times. The fcntl - * locks are very ugly and they don't support 64-bit offsets, but we - * can work around the problem by always locking the first header - * in the file. + * We need several types of locks: + * + * Read lock reading parts of bucket file + * Write lock any write operations + * Append lock appending to the end of the file + * Scan lock reading parts which we are certain they exist + * + * Multiple read and scan locks can co-exist together. + * Scan locks can co-exist with an append lock. + * There can be at most one write/append lock at a time. + * + * These lock types map to a pair of normal read-write locks which + * we represent as fcntl() locks on the first and second byte of the + * bucket file. [We cannot use flock() since it happily permits + * locking a shared fd (e.g., after fork()) multiple times at it also + * doesn't offer multiple locks on a single file.] + * + * byte0 byte1 + * Read + * Write + * Append - + * Scan - */ static inline void -obuck_do_lock(int type) +obuck_do_lock(int type, int start, int len) { struct flock fl; fl.l_type = type; fl.l_whence = SEEK_SET; - fl.l_start = 0; - fl.l_len = sizeof(struct obuck_header); + fl.l_start = start; + fl.l_len = len; if (fcntl(obuck_fd, F_SETLKW, &fl) < 0) die("fcntl lock: %m"); } @@ -82,25 +100,38 @@ obuck_do_lock(int type) inline void obuck_lock_read(void) { - obuck_do_lock(F_RDLCK); + obuck_do_lock(F_RDLCK, 0, 2); } inline void obuck_lock_write(void) { - obuck_do_lock(F_WRLCK); + obuck_do_lock(F_WRLCK, 0, 2); +} + +static inline void +obuck_lock_append(void) +{ + obuck_do_lock(F_WRLCK, 0, 1); +} + +static inline void +obuck_lock_read_to_scan(void) +{ + obuck_do_lock(F_UNLCK, 0, 1); } inline void obuck_unlock(void) { - obuck_do_lock(F_UNLCK); + obuck_do_lock(F_UNLCK, 0, 2); } /*** FastIO emulation ***/ struct fb_bucket { struct fastbuf fb; + int can_overwrite; sh_off_t start_pos; uns bucket_size; byte buffer[0]; @@ -271,6 +302,21 @@ obuck_find_next(struct obuck_header *hdrp, int full) } } +static int +obuck_bconfig(struct fastbuf *f, uns item, int value) +{ + switch (item) + { + case BCONFIG_CAN_OVERWRITE: ; + int old_value = FB_BUCKET(f)->can_overwrite; + if (value >= 0 && value <= 2) + FB_BUCKET(f)->can_overwrite = value; + return old_value; + default: + return -1; + } +} + struct fastbuf * obuck_fetch(void) { @@ -287,7 +333,7 @@ obuck_fetch(void) b->spout = NULL; b->seek = NULL; b->close = obuck_fb_close; - b->config = NULL; + b->config = obuck_bconfig; FB_BUCKET(b)->start_pos = bucket_find_pos; FB_BUCKET(b)->bucket_size = obuck_hdr.length; obuck_fb_count++; @@ -306,7 +352,7 @@ obuck_create(u32 type) { ASSERT(!obuck_write_fb); - obuck_lock_write(); + obuck_lock_append(); sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END); if (start & (OBUCK_ALIGN - 1)) obuck_broken("Misaligned file", start); @@ -329,6 +375,7 @@ obuck_create(u32 type) b->config = NULL; FB_BUCKET(b)->start_pos = start; FB_BUCKET(b)->bucket_size = 0; + FB_BUCKET(b)->can_overwrite = 2; bwrite(b, &obuck_create_hdr, sizeof(obuck_create_hdr)); return b; @@ -369,7 +416,7 @@ obuck_delete(oid_t oid) static struct fastbuf *obuck_rpf; static uns slurp_remains; -static sh_off_t slurp_start, slurp_current; +static sh_off_t slurp_start, slurp_current, slurp_end; static int obuck_slurp_refill(struct fastbuf *f) @@ -401,6 +448,10 @@ obuck_slurp_pool(struct obuck_header *hdrp) { obuck_lock_read(); obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen); + bseek(obuck_rpf, 0, SEEK_END); + slurp_end = btell(obuck_rpf); + bsetpos(obuck_rpf, 0); + obuck_lock_read_to_scan(); } else { @@ -409,7 +460,10 @@ obuck_slurp_pool(struct obuck_header *hdrp) obuck_broken("Missing trailer", slurp_start); } slurp_start = btell(obuck_rpf); - l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header)); + if (slurp_start < slurp_end) + l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header)); + else + l = 0; if (!l) { bclose(obuck_rpf);