}
/*
- * Unfortunately we cannot use flock() here since it happily permits
- * locking a shared fd (e.g., after fork()) multiple times. The fcntl
- * locks are very ugly and they don't support 64-bit offsets, but we
- * can work around the problem by always locking the first header
- * in the file.
+ * We need several types of locks:
+ *
+ * Read lock reading parts of bucket file
+ * Write lock any write operations
+ * Append lock appending to the end of the file
+ * Scan lock reading parts which we are certain they exist
+ *
+ * Multiple read and scan locks can co-exist together.
+ * Scan locks can co-exist with an append lock.
+ * There can be at most one write/append lock at a time.
+ *
+ * These lock types map to a pair of normal read-write locks which
+ * we represent as fcntl() locks on the first and second byte of the
+ * bucket file. [We cannot use flock() since it happily permits
+ * locking a shared fd (e.g., after fork()) multiple times at it also
+ * doesn't offer multiple locks on a single file.]
+ *
+ * byte0 byte1
+ * Read <read> <read>
+ * Write <write> <write>
+ * Append - <write>
+ * Scan <read> -
*/
static inline void
-obuck_do_lock(int type)
+obuck_do_lock(int type, int start, int len)
{
struct flock fl;
fl.l_type = type;
fl.l_whence = SEEK_SET;
- fl.l_start = 0;
- fl.l_len = sizeof(struct obuck_header);
+ fl.l_start = start;
+ fl.l_len = len;
if (fcntl(obuck_fd, F_SETLKW, &fl) < 0)
die("fcntl lock: %m");
}
inline void
obuck_lock_read(void)
{
- obuck_do_lock(F_RDLCK);
+ obuck_do_lock(F_RDLCK, 0, 2);
}
inline void
obuck_lock_write(void)
{
- obuck_do_lock(F_WRLCK);
+ obuck_do_lock(F_WRLCK, 0, 2);
+}
+
+static inline void
+obuck_lock_append(void)
+{
+ obuck_do_lock(F_WRLCK, 0, 1);
+}
+
+static inline void
+obuck_lock_read_to_scan(void)
+{
+ obuck_do_lock(F_UNLCK, 0, 1);
}
inline void
obuck_unlock(void)
{
- obuck_do_lock(F_UNLCK);
+ obuck_do_lock(F_UNLCK, 0, 2);
}
/*** FastIO emulation ***/
{
ASSERT(!obuck_write_fb);
- obuck_lock_write();
+ obuck_lock_append();
sh_off_t start = sh_seek(obuck_fd, 0, SEEK_END);
if (start & (OBUCK_ALIGN - 1))
obuck_broken("Misaligned file", start);
static struct fastbuf *obuck_rpf;
static uns slurp_remains;
-static sh_off_t slurp_start, slurp_current;
+static sh_off_t slurp_start, slurp_current, slurp_end;
static int
obuck_slurp_refill(struct fastbuf *f)
{
obuck_lock_read();
obuck_rpf = bopen(obuck_name, O_RDONLY, obuck_slurp_buflen);
+ bseek(obuck_rpf, 0, SEEK_END);
+ slurp_end = btell(obuck_rpf);
+ bsetpos(obuck_rpf, 0);
+ obuck_lock_read_to_scan();
}
else
{
obuck_broken("Missing trailer", slurp_start);
}
slurp_start = btell(obuck_rpf);
- l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
+ if (slurp_start < slurp_end)
+ l = bread(obuck_rpf, hdrp, sizeof(struct obuck_header));
+ else
+ l = 0;
if (!l)
{
bclose(obuck_rpf);