#include "ucw/lib.h"
#include "ucw/fastbuf.h"
#include "ucw/respool.h"
+#include "ucw/trans.h"
#include <stdio.h>
#include <stdlib.h>
{
if (f)
{
- bflush(f);
+ if (!(f->flags & FB_DEAD))
+ bflush(f);
if (f->close)
f->close(f);
if (f->res)
}
}
+void NONRET bthrow(struct fastbuf *f, const char *id, const char *fmt, ...)
+{
+ ASSERT(!(f->flags & FB_DEAD));
+ f->flags |= FB_DEAD;
+ va_list args;
+ va_start(args, fmt);
+ trans_vthrow(id, f, fmt, args);
+}
+
+int brefill(struct fastbuf *f, int allow_eof)
+{
+ ASSERT(f->bptr >= f->bstop);
+ if (!f->refill)
+ bthrow(f, "fb.read", "Stream not readable");
+ if (f->refill(f))
+ {
+ ASSERT(f->bptr < f->bstop);
+ return 1;
+ }
+ else
+ {
+ if (!allow_eof && (f->flags & FB_DIE_ON_EOF))
+ bthrow(f, "fb.eof", "Unexpected EOF");
+ ASSERT(f->bptr == f->bstop);
+ return 0;
+ }
+}
+
+void bspout(struct fastbuf *f)
+{
+ ASSERT(f->bptr > f->bstop || f->bptr >= f->bufend);
+ if (!f->spout)
+ bthrow(f, "fb.write", "Stream not writeable");
+ f->spout(f);
+ ASSERT(f->bptr < f->bufend);
+}
+
void bflush(struct fastbuf *f)
{
if (f->bptr > f->bstop)
- f->spout(f);
+ bspout(f);
else if (f->bstop > f->buffer)
f->bptr = f->bstop = f->buffer;
}
{
bflush(f);
if (!f->seek || !f->seek(f, pos, SEEK_SET))
- die("bsetpos: stream not seekable");
+ bthrow(f, "fb.seek", "Stream not seekable");
}
}
case SEEK_END:
bflush(f);
if (!f->seek || !f->seek(f, pos, SEEK_END))
- die("bseek: stream not seekable");
+ bthrow(f, "fb.seek", "Stream not seekable");
break;
default:
die("bseek: invalid whence=%d", whence);
{
if (f->bptr < f->bstop)
return *f->bptr++;
- if (!f->refill(f))
+ if (!brefill(f, 0))
return -1;
return *f->bptr++;
}
{
if (f->bptr < f->bstop)
return *f->bptr;
- if (!f->refill(f))
+ if (!brefill(f, 0))
return -1;
return *f->bptr;
}
+int beof_slow(struct fastbuf *f)
+{
+ return f->bptr >= f->bstop && !brefill(f, 1);
+}
+
void bputc_slow(struct fastbuf *f, uns c)
{
if (f->bptr >= f->bufend)
- f->spout(f);
+ bspout(f);
*f->bptr++ = c;
}
if (!k)
{
- f->refill(f);
+ brefill(f, check);
k = f->bstop - f->bptr;
if (!k)
break;
total += k;
}
if (check && total && l)
- die("breadb: short read");
+ bthrow(f, "fb.read", "breadb: short read");
return total;
}
if (!k)
{
- f->spout(f);
+ bspout(f);
k = f->bufend - f->bptr;
}
if (k > l)
byte *buffer, *bufend; /* Start and end of the buffer */
char *name; /* File name (used for error messages) */
ucw_off_t pos; /* Position of bstop in the file */
+ uns flags; /* See enum fb_flags */
int (*refill)(struct fastbuf *); /* Get a buffer with new data, returns 0 on EOF */
void (*spout)(struct fastbuf *); /* Write buffer data to the file */
int (*seek)(struct fastbuf *, ucw_off_t, int);/* Slow path for @bseek(), buffer already flushed; returns success */
void fb_tie(struct fastbuf *b); /* Tie fastbuf to a resource if there is an active pool */
+/**
+ * Fastbuf flags
+ */
+enum fb_flags {
+ FB_DEAD = 0x1, /* Some fastbuf's method has thrown an exception */
+ FB_DIE_ON_EOF = 0x2, /* Most of read operations throw "fb.eof" on EOF */
+};
+
/***
* === Fastbuf on files [[fbparam]]
*
* Can not be used for fastbufs not returned from function (initialized in a parameter, for example the one from `fbbuf_init_read`).
*/
void bclose(struct fastbuf *f);
+void bthrow(struct fastbuf *f, const char *id, const char *fmt, ...) FORMAT_CHECK(printf,3,4) NONRET; /** Throw exception on a given fastbuf **/
+int brefill(struct fastbuf *f, int allow_eof);
+void bspout(struct fastbuf *f);
void bflush(struct fastbuf *f); /** Write data (if it makes any sense, do not use for in-memory buffers). **/
void bseek(struct fastbuf *f, ucw_off_t pos, int whence); /** Seek in the buffer. See `man fseek` for description of @whence. Only for seekable fastbufs. **/
void bsetpos(struct fastbuf *f, ucw_off_t pos); /** Set position to @pos bytes from beginning. Only for seekable fastbufs. **/
return (f->bptr < f->bstop) ? (int) *f->bptr : bpeekc_slow(f);
}
+int beof_slow(struct fastbuf *f);
+static inline int beof(struct fastbuf *f) /** Have I reached EOF? **/
+{
+ return (f->bptr < f->bstop) ? 0 : beof_slow(f);
+}
+
static inline void bungetc(struct fastbuf *f) /** Return last read character back. Only one back is guaranteed to work. **/
{
f->bptr--;
void
fbbuf_init_read(struct fastbuf *f, byte *buf, uns size, uns can_overwrite)
{
- f->buffer = f->bptr = buf;
- f->bstop = f->bufend = buf + size;
- f->name = "fbbuf-read";
- f->pos = size;
- f->refill = fbbuf_refill;
- f->spout = NULL;
- f->seek = fbbuf_seek;
- f->close = NULL;
- f->config = NULL;
- f->can_overwrite_buffer = can_overwrite;
+ *f = (struct fastbuf) {
+ .buffer = buf,
+ .bptr = buf,
+ .bstop = buf + size,
+ .bufend = buf + size,
+ .name = "fbbuf-read",
+ .pos = size,
+ .refill = fbbuf_refill,
+ .seek = fbbuf_seek,
+ .can_overwrite_buffer = can_overwrite };
}
static void
-fbbuf_spout(struct fastbuf *f UNUSED)
+fbbuf_spout(struct fastbuf *f)
{
- die("fbbuf: buffer overflow on write");
+ bthrow(f, "fb.write", "fbbuf: buffer overflow on write");
}
void
fbbuf_init_write(struct fastbuf *f, byte *buf, uns size)
{
- f->buffer = f->bstop = f->bptr = buf;
- f->bufend = buf + size;
- f->name = "fbbuf-write";
- f->pos = size;
- f->refill = NULL;
- f->spout = fbbuf_spout;
- f->seek = NULL;
- f->close = NULL;
- f->config = NULL;
- f->can_overwrite_buffer = 0;
+ *f = (struct fastbuf) {
+ .buffer = buf,
+ .bstop = buf,
+ .bptr = buf,
+ .bufend = buf + size,
+ .name = "fbbuf-write",
+ .pos = size,
+ .spout = fbbuf_spout,
+ };
}
#ifdef TEST
if (!r->status)
return 0;
if (r->status < 0)
- die("Error reading %s: %s", f->name, strerror(r->returned_errno));
+ bthrow(f, "fb.read", "Error reading %s: %s", f->name, strerror(r->returned_errno));
f->bptr = f->buffer = r->buffer;
f->bstop = f->bufend = f->buffer + r->status;
f->pos += r->status;
asio_sync(F->io_queue);
DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos);
if (ucw_ftruncate(F->fd, f->pos) < 0)
- die("Error truncating %s: %m", f->name);
+ bthrow(f, "fb.write", "Error truncating %s: %m", f->name);
}
else
asio_submit(r);
int l = read(F->fd, f->buffer, MIN(skip, blen));
if (unlikely(l <= 0))
if (l < 0)
- die("Error reading %s: %m", f->name);
+ bthrow(f, "fb.read", "Error reading %s: %m", f->name);
else
{
F->wpos -= skip;
/* Do lseek() */
F->wpos = f->pos + (f->buffer - f->bptr);
if (ucw_seek(F->fd, F->wpos, SEEK_SET) < 0)
- die("Error seeking %s: %m", f->name);
+ bthrow(f, "fb.read", "Error seeking %s: %m", f->name);
}
/* Read (part of) buffer */
do
{
int l = read(F->fd, read_ptr, read_len);
if (unlikely(l < 0))
- die("Error reading %s: %m", f->name);
+ bthrow(f, "fb.read", "Error reading %s: %m", f->name);
if (!l)
if (unlikely(read_ptr < f->bptr))
goto eof;
{
/* Do delayed lseek() if needed */
if (FB_FILE(f)->wpos != f->pos && ucw_seek(FB_FILE(f)->fd, f->pos, SEEK_SET) < 0)
- die("Error seeking %s: %m", f->name);
+ bthrow(f, "fb.write", "Error seeking %s: %m", f->name);
int l = f->bptr - f->buffer;
byte *c = f->buffer;
{
int z = write(FB_FILE(f)->fd, c, l);
if (z <= 0)
- die("Error writing %s: %m", f->name);
+ bthrow(f, "fb.write", "Error writing %s: %m", f->name);
l -= z;
c += z;
}
int max = MIN(FB_LIMFD(f)->limit - f->pos, f->bufend - f->buffer);
int l = read(FB_LIMFD(f)->fd, f->buffer, max);
if (l < 0)
- die("Error reading %s: %m", f->name);
+ bthrow(f, "fb.read", "Error reading %s: %m", f->name);
f->bstop = f->buffer + l;
f->pos += l;
return l;
FB_MEM(f)->block = NULL;
return 1;
}
- die("fbmem_seek to invalid offset");
+ bthrow(f, "fb.seek", "fbmem_seek to invalid offset");
}
static void
else
f->buffer = ucw_mmap(f->buffer, ll, prot, MAP_SHARED | MAP_FIXED, F->fd, pos0);
if (f->buffer == (byte *) MAP_FAILED)
- die("mmap(%s): %m", f->name);
+ bthrow(f, "fb.mmap", "mmap(%s): %m", f->name);
#ifdef MADV_SEQUENTIAL
if (ll > CPU_PAGE_SIZE)
madvise(f->buffer, ll, MADV_SEQUENTIAL);
{
F->file_extend = ALIGN_TO(F->file_extend + mmap_extend_size, (ucw_off_t)CPU_PAGE_SIZE);
if (ucw_ftruncate(F->fd, F->file_extend))
- die("ftruncate(%s): %m", f->name);
+ bthrow(f, "fb.write", "ftruncate(%s): %m", f->name);
}
bfmm_map_window(f);
f->bstop = f->bptr;
munmap(f->buffer, F->window_size);
if (F->file_extend > F->file_size &&
ucw_ftruncate(F->fd, F->file_size))
- die("ftruncate(%s): %m", f->name);
+ bthrow(f, "fb.write", "ftruncate(%s): %m", f->name);
bclose_file_helper(f, F->fd, F->is_temp_file);
xfree(f);
}
#include "ucw/conf.h"
#include "ucw/lfs.h"
#include "ucw/fastbuf.h"
+#include "ucw/trans.h"
#include <fcntl.h>
#include <stdio.h>
return fb;
case FB_MMAP:
if (!~mode && (int)(mode = fcntl(fd, F_GETFL)) < 0)
- die("Cannot get flags of fd %d: %m", fd);
+ trans_throw("fb.open", NULL, "Cannot get flags of fd %d: %m", fd);
return bfmmopen_internal(fd, name, mode);
default:
ASSERT(0);
if (try)
return NULL;
else
- die("Unable to %s file %s: %m", (mode & O_CREAT) ? "create" : "open", name);
+ trans_throw("fb.open", NULL, "Unable to %s file %s: %m", (mode & O_CREAT) ? "create" : "open", name);
struct fastbuf *fb = bopen_fd_internal(fd, params, mode, name);
ASSERT(fb);
if (mode & O_APPEND)
msg(L_ERROR, "unlink(%s): %m", f->name);
case 0:
if (close(fd))
- die("close(%s): %m", f->name);
+ msg(L_ERROR, "close(%s): %m", f->name);
}
}
int was_temp = bconfig(fb, BCONFIG_IS_TEMP_FILE, 0);
ASSERT(was_temp == 1);
if (rename(fb->name, name))
- die("Cannot rename %s to %s: %m", fb->name, name);
+ bthrow(fb, "fb.tmp", "Cannot rename %s to %s: %m", fb->name, name);
bclose(fb);
}
void
trans_vthrow(const char *id, void *object, const char *fmt, va_list args)
{
+ trans_init();
struct mempool *mp = trans_get_pool();
struct exception *x = mp_alloc(mp, sizeof(*x));
x->id = id;