From 681b277c0aabea785a21e109dc07338847c0cb32 Mon Sep 17 00:00:00 2001 From: Pavel Charvat Date: Wed, 29 Oct 2008 13:10:19 +0100 Subject: [PATCH] Implemented first bits of transactions on fastbufs. --- ucw/fastbuf.c | 64 +++++++++++++++++++++++++++++++++++++++++-------- ucw/fastbuf.h | 18 ++++++++++++++ ucw/fb-buffer.c | 43 ++++++++++++++++----------------- ucw/fb-direct.c | 4 ++-- ucw/fb-file.c | 10 ++++---- ucw/fb-limfd.c | 2 +- ucw/fb-mem.c | 2 +- ucw/fb-mmap.c | 6 ++--- ucw/fb-param.c | 7 +++--- ucw/fb-temp.c | 2 +- ucw/trans.c | 1 + 11 files changed, 111 insertions(+), 48 deletions(-) diff --git a/ucw/fastbuf.c b/ucw/fastbuf.c index de38b277..fb2cc49f 100644 --- a/ucw/fastbuf.c +++ b/ucw/fastbuf.c @@ -10,6 +10,7 @@ #include "ucw/lib.h" #include "ucw/fastbuf.h" #include "ucw/respool.h" +#include "ucw/trans.h" #include #include @@ -18,7 +19,8 @@ void bclose(struct fastbuf *f) { if (f) { - bflush(f); + if (!(f->flags & FB_DEAD)) + bflush(f); if (f->close) f->close(f); if (f->res) @@ -26,10 +28,47 @@ void bclose(struct fastbuf *f) } } +void NONRET bthrow(struct fastbuf *f, const char *id, const char *fmt, ...) +{ + ASSERT(!(f->flags & FB_DEAD)); + f->flags |= FB_DEAD; + va_list args; + va_start(args, fmt); + trans_vthrow(id, f, fmt, args); +} + +int brefill(struct fastbuf *f, int allow_eof) +{ + ASSERT(f->bptr >= f->bstop); + if (!f->refill) + bthrow(f, "fb.read", "Stream not readable"); + if (f->refill(f)) + { + ASSERT(f->bptr < f->bstop); + return 1; + } + else + { + if (!allow_eof && (f->flags & FB_DIE_ON_EOF)) + bthrow(f, "fb.eof", "Unexpected EOF"); + ASSERT(f->bptr == f->bstop); + return 0; + } +} + +void bspout(struct fastbuf *f) +{ + ASSERT(f->bptr > f->bstop || f->bptr >= f->bufend); + if (!f->spout) + bthrow(f, "fb.write", "Stream not writeable"); + f->spout(f); + ASSERT(f->bptr < f->bufend); +} + void bflush(struct fastbuf *f) { if (f->bptr > f->bstop) - f->spout(f); + bspout(f); else if (f->bstop > f->buffer) f->bptr = f->bstop = f->buffer; } @@ -43,7 +82,7 @@ inline void bsetpos(struct fastbuf *f, ucw_off_t pos) { bflush(f); if (!f->seek || !f->seek(f, pos, SEEK_SET)) - die("bsetpos: stream not seekable"); + bthrow(f, "fb.seek", "Stream not seekable"); } } @@ -58,7 +97,7 @@ void bseek(struct fastbuf *f, ucw_off_t pos, int whence) case SEEK_END: bflush(f); if (!f->seek || !f->seek(f, pos, SEEK_END)) - die("bseek: stream not seekable"); + bthrow(f, "fb.seek", "Stream not seekable"); break; default: die("bseek: invalid whence=%d", whence); @@ -69,7 +108,7 @@ int bgetc_slow(struct fastbuf *f) { if (f->bptr < f->bstop) return *f->bptr++; - if (!f->refill(f)) + if (!brefill(f, 0)) return -1; return *f->bptr++; } @@ -78,15 +117,20 @@ int bpeekc_slow(struct fastbuf *f) { if (f->bptr < f->bstop) return *f->bptr; - if (!f->refill(f)) + if (!brefill(f, 0)) return -1; return *f->bptr; } +int beof_slow(struct fastbuf *f) +{ + return f->bptr >= f->bstop && !brefill(f, 1); +} + void bputc_slow(struct fastbuf *f, uns c) { if (f->bptr >= f->bufend) - f->spout(f); + bspout(f); *f->bptr++ = c; } @@ -99,7 +143,7 @@ uns bread_slow(struct fastbuf *f, void *b, uns l, uns check) if (!k) { - f->refill(f); + brefill(f, check); k = f->bstop - f->bptr; if (!k) break; @@ -113,7 +157,7 @@ uns bread_slow(struct fastbuf *f, void *b, uns l, uns check) total += k; } if (check && total && l) - die("breadb: short read"); + bthrow(f, "fb.read", "breadb: short read"); return total; } @@ -125,7 +169,7 @@ void bwrite_slow(struct fastbuf *f, const void *b, uns l) if (!k) { - f->spout(f); + bspout(f); k = f->bufend - f->bptr; } if (k > l) diff --git a/ucw/fastbuf.h b/ucw/fastbuf.h index 6e3b2626..90c7648e 100644 --- a/ucw/fastbuf.h +++ b/ucw/fastbuf.h @@ -133,6 +133,7 @@ struct fastbuf { byte *buffer, *bufend; /* Start and end of the buffer */ char *name; /* File name (used for error messages) */ ucw_off_t pos; /* Position of bstop in the file */ + uns flags; /* See enum fb_flags */ int (*refill)(struct fastbuf *); /* Get a buffer with new data, returns 0 on EOF */ void (*spout)(struct fastbuf *); /* Write buffer data to the file */ int (*seek)(struct fastbuf *, ucw_off_t, int);/* Slow path for @bseek(), buffer already flushed; returns success */ @@ -144,6 +145,14 @@ struct fastbuf { void fb_tie(struct fastbuf *b); /* Tie fastbuf to a resource if there is an active pool */ +/** + * Fastbuf flags + */ +enum fb_flags { + FB_DEAD = 0x1, /* Some fastbuf's method has thrown an exception */ + FB_DIE_ON_EOF = 0x2, /* Most of read operations throw "fb.eof" on EOF */ +}; + /*** * === Fastbuf on files [[fbparam]] * @@ -481,6 +490,9 @@ int bconfig(struct fastbuf *f, uns type, int data); /** Configure a fastbuf. Ret * Can not be used for fastbufs not returned from function (initialized in a parameter, for example the one from `fbbuf_init_read`). */ void bclose(struct fastbuf *f); +void bthrow(struct fastbuf *f, const char *id, const char *fmt, ...) FORMAT_CHECK(printf,3,4) NONRET; /** Throw exception on a given fastbuf **/ +int brefill(struct fastbuf *f, int allow_eof); +void bspout(struct fastbuf *f); void bflush(struct fastbuf *f); /** Write data (if it makes any sense, do not use for in-memory buffers). **/ void bseek(struct fastbuf *f, ucw_off_t pos, int whence); /** Seek in the buffer. See `man fseek` for description of @whence. Only for seekable fastbufs. **/ void bsetpos(struct fastbuf *f, ucw_off_t pos); /** Set position to @pos bytes from beginning. Only for seekable fastbufs. **/ @@ -504,6 +516,12 @@ static inline int bpeekc(struct fastbuf *f) /** Return next character from the return (f->bptr < f->bstop) ? (int) *f->bptr : bpeekc_slow(f); } +int beof_slow(struct fastbuf *f); +static inline int beof(struct fastbuf *f) /** Have I reached EOF? **/ +{ + return (f->bptr < f->bstop) ? 0 : beof_slow(f); +} + static inline void bungetc(struct fastbuf *f) /** Return last read character back. Only one back is guaranteed to work. **/ { f->bptr--; diff --git a/ucw/fb-buffer.c b/ucw/fb-buffer.c index 1888cd51..7fff1a1c 100644 --- a/ucw/fb-buffer.c +++ b/ucw/fb-buffer.c @@ -36,37 +36,36 @@ fbbuf_seek(struct fastbuf *f, ucw_off_t pos, int whence) void fbbuf_init_read(struct fastbuf *f, byte *buf, uns size, uns can_overwrite) { - f->buffer = f->bptr = buf; - f->bstop = f->bufend = buf + size; - f->name = "fbbuf-read"; - f->pos = size; - f->refill = fbbuf_refill; - f->spout = NULL; - f->seek = fbbuf_seek; - f->close = NULL; - f->config = NULL; - f->can_overwrite_buffer = can_overwrite; + *f = (struct fastbuf) { + .buffer = buf, + .bptr = buf, + .bstop = buf + size, + .bufend = buf + size, + .name = "fbbuf-read", + .pos = size, + .refill = fbbuf_refill, + .seek = fbbuf_seek, + .can_overwrite_buffer = can_overwrite }; } static void -fbbuf_spout(struct fastbuf *f UNUSED) +fbbuf_spout(struct fastbuf *f) { - die("fbbuf: buffer overflow on write"); + bthrow(f, "fb.write", "fbbuf: buffer overflow on write"); } void fbbuf_init_write(struct fastbuf *f, byte *buf, uns size) { - f->buffer = f->bstop = f->bptr = buf; - f->bufend = buf + size; - f->name = "fbbuf-write"; - f->pos = size; - f->refill = NULL; - f->spout = fbbuf_spout; - f->seek = NULL; - f->close = NULL; - f->config = NULL; - f->can_overwrite_buffer = 0; + *f = (struct fastbuf) { + .buffer = buf, + .bstop = buf, + .bptr = buf, + .bufend = buf + size, + .name = "fbbuf-write", + .pos = size, + .spout = fbbuf_spout, + }; } #ifdef TEST diff --git a/ucw/fb-direct.c b/ucw/fb-direct.c index 71e7351b..26490579 100644 --- a/ucw/fb-direct.c +++ b/ucw/fb-direct.c @@ -159,7 +159,7 @@ fbdir_refill(struct fastbuf *f) if (!r->status) return 0; if (r->status < 0) - die("Error reading %s: %s", f->name, strerror(r->returned_errno)); + bthrow(f, "fb.read", "Error reading %s: %s", f->name, strerror(r->returned_errno)); f->bptr = f->buffer = r->buffer; f->bstop = f->bufend = f->buffer + r->status; f->pos += r->status; @@ -193,7 +193,7 @@ fbdir_spout(struct fastbuf *f) asio_sync(F->io_queue); DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos); if (ucw_ftruncate(F->fd, f->pos) < 0) - die("Error truncating %s: %m", f->name); + bthrow(f, "fb.write", "Error truncating %s: %m", f->name); } else asio_submit(r); diff --git a/ucw/fb-file.c b/ucw/fb-file.c index fff44378..46b7e71b 100644 --- a/ucw/fb-file.c +++ b/ucw/fb-file.c @@ -56,7 +56,7 @@ long_seek: int l = read(F->fd, f->buffer, MIN(skip, blen)); if (unlikely(l <= 0)) if (l < 0) - die("Error reading %s: %m", f->name); + bthrow(f, "fb.read", "Error reading %s: %m", f->name); else { F->wpos -= skip; @@ -114,14 +114,14 @@ seek: /* Do lseek() */ F->wpos = f->pos + (f->buffer - f->bptr); if (ucw_seek(F->fd, F->wpos, SEEK_SET) < 0) - die("Error seeking %s: %m", f->name); + bthrow(f, "fb.read", "Error seeking %s: %m", f->name); } /* Read (part of) buffer */ do { int l = read(F->fd, read_ptr, read_len); if (unlikely(l < 0)) - die("Error reading %s: %m", f->name); + bthrow(f, "fb.read", "Error reading %s: %m", f->name); if (!l) if (unlikely(read_ptr < f->bptr)) goto eof; @@ -149,7 +149,7 @@ bfd_spout(struct fastbuf *f) { /* Do delayed lseek() if needed */ if (FB_FILE(f)->wpos != f->pos && ucw_seek(FB_FILE(f)->fd, f->pos, SEEK_SET) < 0) - die("Error seeking %s: %m", f->name); + bthrow(f, "fb.write", "Error seeking %s: %m", f->name); int l = f->bptr - f->buffer; byte *c = f->buffer; @@ -161,7 +161,7 @@ bfd_spout(struct fastbuf *f) { int z = write(FB_FILE(f)->fd, c, l); if (z <= 0) - die("Error writing %s: %m", f->name); + bthrow(f, "fb.write", "Error writing %s: %m", f->name); l -= z; c += z; } diff --git a/ucw/fb-limfd.c b/ucw/fb-limfd.c index 4e7d406b..f909da3b 100644 --- a/ucw/fb-limfd.c +++ b/ucw/fb-limfd.c @@ -27,7 +27,7 @@ bfl_refill(struct fastbuf *f) int max = MIN(FB_LIMFD(f)->limit - f->pos, f->bufend - f->buffer); int l = read(FB_LIMFD(f)->fd, f->buffer, max); if (l < 0) - die("Error reading %s: %m", f->name); + bthrow(f, "fb.read", "Error reading %s: %m", f->name); f->bstop = f->buffer + l; f->pos += l; return l; diff --git a/ucw/fb-mem.c b/ucw/fb-mem.c index a100d481..2356e86b 100644 --- a/ucw/fb-mem.c +++ b/ucw/fb-mem.c @@ -128,7 +128,7 @@ fbmem_seek(struct fastbuf *f, ucw_off_t pos, int whence) FB_MEM(f)->block = NULL; return 1; } - die("fbmem_seek to invalid offset"); + bthrow(f, "fb.seek", "fbmem_seek to invalid offset"); } static void diff --git a/ucw/fb-mmap.c b/ucw/fb-mmap.c index 25209721..96d482aa 100644 --- a/ucw/fb-mmap.c +++ b/ucw/fb-mmap.c @@ -70,7 +70,7 @@ bfmm_map_window(struct fastbuf *f) else f->buffer = ucw_mmap(f->buffer, ll, prot, MAP_SHARED | MAP_FIXED, F->fd, pos0); if (f->buffer == (byte *) MAP_FAILED) - die("mmap(%s): %m", f->name); + bthrow(f, "fb.mmap", "mmap(%s): %m", f->name); #ifdef MADV_SEQUENTIAL if (ll > CPU_PAGE_SIZE) madvise(f->buffer, ll, MADV_SEQUENTIAL); @@ -115,7 +115,7 @@ bfmm_spout(struct fastbuf *f) { F->file_extend = ALIGN_TO(F->file_extend + mmap_extend_size, (ucw_off_t)CPU_PAGE_SIZE); if (ucw_ftruncate(F->fd, F->file_extend)) - die("ftruncate(%s): %m", f->name); + bthrow(f, "fb.write", "ftruncate(%s): %m", f->name); } bfmm_map_window(f); f->bstop = f->bptr; @@ -145,7 +145,7 @@ bfmm_close(struct fastbuf *f) munmap(f->buffer, F->window_size); if (F->file_extend > F->file_size && ucw_ftruncate(F->fd, F->file_size)) - die("ftruncate(%s): %m", f->name); + bthrow(f, "fb.write", "ftruncate(%s): %m", f->name); bclose_file_helper(f, F->fd, F->is_temp_file); xfree(f); } diff --git a/ucw/fb-param.c b/ucw/fb-param.c index 22f988c6..7543220f 100644 --- a/ucw/fb-param.c +++ b/ucw/fb-param.c @@ -12,6 +12,7 @@ #include "ucw/conf.h" #include "ucw/lfs.h" #include "ucw/fastbuf.h" +#include "ucw/trans.h" #include #include @@ -101,7 +102,7 @@ bopen_fd_internal(int fd, struct fb_params *params, uns mode, const char *name) return fb; case FB_MMAP: if (!~mode && (int)(mode = fcntl(fd, F_GETFL)) < 0) - die("Cannot get flags of fd %d: %m", fd); + trans_throw("fb.open", NULL, "Cannot get flags of fd %d: %m", fd); return bfmmopen_internal(fd, name, mode); default: ASSERT(0); @@ -124,7 +125,7 @@ bopen_file_internal(const char *name, int mode, struct fb_params *params, int tr if (try) return NULL; else - die("Unable to %s file %s: %m", (mode & O_CREAT) ? "create" : "open", name); + trans_throw("fb.open", NULL, "Unable to %s file %s: %m", (mode & O_CREAT) ? "create" : "open", name); struct fastbuf *fb = bopen_fd_internal(fd, params, mode, name); ASSERT(fb); if (mode & O_APPEND) @@ -162,7 +163,7 @@ bclose_file_helper(struct fastbuf *f, int fd, int is_temp_file) msg(L_ERROR, "unlink(%s): %m", f->name); case 0: if (close(fd)) - die("close(%s): %m", f->name); + msg(L_ERROR, "close(%s): %m", f->name); } } diff --git a/ucw/fb-temp.c b/ucw/fb-temp.c index 05fb655b..a68eb582 100644 --- a/ucw/fb-temp.c +++ b/ucw/fb-temp.c @@ -35,7 +35,7 @@ void bfix_tmp_file(struct fastbuf *fb, const char *name) int was_temp = bconfig(fb, BCONFIG_IS_TEMP_FILE, 0); ASSERT(was_temp == 1); if (rename(fb->name, name)) - die("Cannot rename %s to %s: %m", fb->name, name); + bthrow(fb, "fb.tmp", "Cannot rename %s to %s: %m", fb->name, name); bclose(fb); } diff --git a/ucw/trans.c b/ucw/trans.c index 67643b26..1888ad89 100644 --- a/ucw/trans.c +++ b/ucw/trans.c @@ -213,6 +213,7 @@ trans_throw(const char *id, void *object, const char *fmt, ...) void trans_vthrow(const char *id, void *object, const char *fmt, va_list args) { + trans_init(); struct mempool *mp = trans_get_pool(); struct exception *x = mp_alloc(mp, sizeof(*x)); x->id = id; -- 2.39.2