]> mj.ucw.cz Git - libucw.git/commitdiff
Implemented first bits of transactions on fastbufs.
authorPavel Charvat <pchar@ucw.cz>
Wed, 29 Oct 2008 12:10:19 +0000 (13:10 +0100)
committerMartin Mares <mj@ucw.cz>
Tue, 29 Mar 2011 10:55:05 +0000 (12:55 +0200)
ucw/fastbuf.c
ucw/fastbuf.h
ucw/fb-buffer.c
ucw/fb-direct.c
ucw/fb-file.c
ucw/fb-limfd.c
ucw/fb-mem.c
ucw/fb-mmap.c
ucw/fb-param.c
ucw/fb-temp.c
ucw/trans.c

index de38b277353b8a0e95369b4902136ec209fc9b2c..fb2cc49f88513f73e16d4035a0dc26c417baf5d2 100644 (file)
@@ -10,6 +10,7 @@
 #include "ucw/lib.h"
 #include "ucw/fastbuf.h"
 #include "ucw/respool.h"
+#include "ucw/trans.h"
 
 #include <stdio.h>
 #include <stdlib.h>
@@ -18,7 +19,8 @@ void bclose(struct fastbuf *f)
 {
   if (f)
     {
-      bflush(f);
+      if (!(f->flags & FB_DEAD))
+        bflush(f);
       if (f->close)
        f->close(f);
       if (f->res)
@@ -26,10 +28,47 @@ void bclose(struct fastbuf *f)
     }
 }
 
+void NONRET bthrow(struct fastbuf *f, const char *id, const char *fmt, ...)
+{
+  ASSERT(!(f->flags & FB_DEAD));
+  f->flags |= FB_DEAD;
+  va_list args;
+  va_start(args, fmt);
+  trans_vthrow(id, f, fmt, args);
+}
+
+int brefill(struct fastbuf *f, int allow_eof)
+{
+  ASSERT(f->bptr >= f->bstop);
+  if (!f->refill)
+    bthrow(f, "fb.read", "Stream not readable");
+  if (f->refill(f))
+    {
+      ASSERT(f->bptr < f->bstop);
+      return 1;
+    }
+  else
+    {
+      if (!allow_eof && (f->flags & FB_DIE_ON_EOF))
+       bthrow(f, "fb.eof", "Unexpected EOF");
+      ASSERT(f->bptr == f->bstop);
+      return 0;
+    }
+}
+
+void bspout(struct fastbuf *f)
+{
+  ASSERT(f->bptr > f->bstop || f->bptr >= f->bufend);
+  if (!f->spout)
+    bthrow(f, "fb.write", "Stream not writeable");
+  f->spout(f);
+  ASSERT(f->bptr < f->bufend);
+}
+
 void bflush(struct fastbuf *f)
 {
   if (f->bptr > f->bstop)
-    f->spout(f);
+    bspout(f);
   else if (f->bstop > f->buffer)
     f->bptr = f->bstop = f->buffer;
 }
@@ -43,7 +82,7 @@ inline void bsetpos(struct fastbuf *f, ucw_off_t pos)
     {
       bflush(f);
       if (!f->seek || !f->seek(f, pos, SEEK_SET))
-       die("bsetpos: stream not seekable");
+       bthrow(f, "fb.seek", "Stream not seekable");
     }
 }
 
@@ -58,7 +97,7 @@ void bseek(struct fastbuf *f, ucw_off_t pos, int whence)
     case SEEK_END:
       bflush(f);
       if (!f->seek || !f->seek(f, pos, SEEK_END))
-       die("bseek: stream not seekable");
+       bthrow(f, "fb.seek", "Stream not seekable");
       break;
     default:
       die("bseek: invalid whence=%d", whence);
@@ -69,7 +108,7 @@ int bgetc_slow(struct fastbuf *f)
 {
   if (f->bptr < f->bstop)
     return *f->bptr++;
-  if (!f->refill(f))
+  if (!brefill(f, 0))
     return -1;
   return *f->bptr++;
 }
@@ -78,15 +117,20 @@ int bpeekc_slow(struct fastbuf *f)
 {
   if (f->bptr < f->bstop)
     return *f->bptr;
-  if (!f->refill(f))
+  if (!brefill(f, 0))
     return -1;
   return *f->bptr;
 }
 
+int beof_slow(struct fastbuf *f)
+{
+  return f->bptr >= f->bstop && !brefill(f, 1);
+}
+
 void bputc_slow(struct fastbuf *f, uns c)
 {
   if (f->bptr >= f->bufend)
-    f->spout(f);
+    bspout(f);
   *f->bptr++ = c;
 }
 
@@ -99,7 +143,7 @@ uns bread_slow(struct fastbuf *f, void *b, uns l, uns check)
 
       if (!k)
        {
-         f->refill(f);
+         brefill(f, check);
          k = f->bstop - f->bptr;
          if (!k)
            break;
@@ -113,7 +157,7 @@ uns bread_slow(struct fastbuf *f, void *b, uns l, uns check)
       total += k;
     }
   if (check && total && l)
-    die("breadb: short read");
+    bthrow(f, "fb.read", "breadb: short read");
   return total;
 }
 
@@ -125,7 +169,7 @@ void bwrite_slow(struct fastbuf *f, const void *b, uns l)
 
       if (!k)
        {
-         f->spout(f);
+         bspout(f);
          k = f->bufend - f->bptr;
        }
       if (k > l)
index 6e3b262636729ff71da5b4fe6401aabf02463eab..90c7648eafeac8ff28509a07892a198d7fec5e80 100644 (file)
@@ -133,6 +133,7 @@ struct fastbuf {
   byte *buffer, *bufend;                       /* Start and end of the buffer */
   char *name;                                  /* File name (used for error messages) */
   ucw_off_t pos;                               /* Position of bstop in the file */
+  uns flags;                                   /* See enum fb_flags */
   int (*refill)(struct fastbuf *);             /* Get a buffer with new data, returns 0 on EOF */
   void (*spout)(struct fastbuf *);             /* Write buffer data to the file */
   int (*seek)(struct fastbuf *, ucw_off_t, int);/* Slow path for @bseek(), buffer already flushed; returns success */
@@ -144,6 +145,14 @@ struct fastbuf {
 
 void fb_tie(struct fastbuf *b);                        /* Tie fastbuf to a resource if there is an active pool */
 
+/**
+ * Fastbuf flags
+ */
+enum fb_flags {
+  FB_DEAD = 0x1,                               /* Some fastbuf's method has thrown an exception */
+  FB_DIE_ON_EOF = 0x2,                         /* Most of read operations throw "fb.eof" on EOF */
+};
+
 /***
  * === Fastbuf on files [[fbparam]]
  *
@@ -481,6 +490,9 @@ int bconfig(struct fastbuf *f, uns type, int data); /** Configure a fastbuf. Ret
  * Can not be used for fastbufs not returned from function (initialized in a parameter, for example the one from `fbbuf_init_read`).
  */
 void bclose(struct fastbuf *f);
+void bthrow(struct fastbuf *f, const char *id, const char *fmt, ...) FORMAT_CHECK(printf,3,4) NONRET;  /** Throw exception on a given fastbuf **/
+int brefill(struct fastbuf *f, int allow_eof);
+void bspout(struct fastbuf *f);
 void bflush(struct fastbuf *f);                                        /** Write data (if it makes any sense, do not use for in-memory buffers). **/
 void bseek(struct fastbuf *f, ucw_off_t pos, int whence);      /** Seek in the buffer. See `man fseek` for description of @whence. Only for seekable fastbufs. **/
 void bsetpos(struct fastbuf *f, ucw_off_t pos);                        /** Set position to @pos bytes from beginning. Only for seekable fastbufs. **/
@@ -504,6 +516,12 @@ static inline int bpeekc(struct fastbuf *f)                        /** Return next character from the
   return (f->bptr < f->bstop) ? (int) *f->bptr : bpeekc_slow(f);
 }
 
+int beof_slow(struct fastbuf *f);
+static inline int beof(struct fastbuf *f)                      /** Have I reached EOF? **/
+{
+  return (f->bptr < f->bstop) ? 0 : beof_slow(f);
+}
+
 static inline void bungetc(struct fastbuf *f)                  /** Return last read character back. Only one back is guaranteed to work. **/
 {
   f->bptr--;
index 1888cd516b0d9bdced19b85cfe60d2c5f07d27f9..7fff1a1cfd94574bf4c1ce7f554c2cc0e827f471 100644 (file)
@@ -36,37 +36,36 @@ fbbuf_seek(struct fastbuf *f, ucw_off_t pos, int whence)
 void
 fbbuf_init_read(struct fastbuf *f, byte *buf, uns size, uns can_overwrite)
 {
-  f->buffer = f->bptr = buf;
-  f->bstop = f->bufend = buf + size;
-  f->name = "fbbuf-read";
-  f->pos = size;
-  f->refill = fbbuf_refill;
-  f->spout = NULL;
-  f->seek = fbbuf_seek;
-  f->close = NULL;
-  f->config = NULL;
-  f->can_overwrite_buffer = can_overwrite;
+  *f = (struct fastbuf) {
+    .buffer = buf,
+    .bptr = buf,
+    .bstop = buf + size,
+    .bufend = buf + size,
+    .name = "fbbuf-read",
+    .pos = size,
+    .refill = fbbuf_refill,
+    .seek = fbbuf_seek,
+    .can_overwrite_buffer = can_overwrite };
 }
 
 static void
-fbbuf_spout(struct fastbuf *f UNUSED)
+fbbuf_spout(struct fastbuf *f)
 {
-  die("fbbuf: buffer overflow on write");
+  bthrow(f, "fb.write", "fbbuf: buffer overflow on write");
 }
 
 void
 fbbuf_init_write(struct fastbuf *f, byte *buf, uns size)
 {
-  f->buffer = f->bstop = f->bptr = buf;
-  f->bufend = buf + size;
-  f->name = "fbbuf-write";
-  f->pos = size;
-  f->refill = NULL;
-  f->spout = fbbuf_spout;
-  f->seek = NULL;
-  f->close = NULL;
-  f->config = NULL;
-  f->can_overwrite_buffer = 0;
+  *f = (struct fastbuf) {
+    .buffer = buf,
+    .bstop = buf,
+    .bptr = buf,
+    .bufend = buf + size,
+    .name = "fbbuf-write",
+    .pos = size,
+    .spout = fbbuf_spout,
+  };
 }
 
 #ifdef TEST
index 71e7351b449d6ab51414a217a8f8f9538e357fc5..264905792acdf4b7e59aee727be86dc7890199a4 100644 (file)
@@ -159,7 +159,7 @@ fbdir_refill(struct fastbuf *f)
   if (!r->status)
     return 0;
   if (r->status < 0)
-    die("Error reading %s: %s", f->name, strerror(r->returned_errno));
+    bthrow(f, "fb.read", "Error reading %s: %s", f->name, strerror(r->returned_errno));
   f->bptr = f->buffer = r->buffer;
   f->bstop = f->bufend = f->buffer + r->status;
   f->pos += r->status;
@@ -193,7 +193,7 @@ fbdir_spout(struct fastbuf *f)
          asio_sync(F->io_queue);
          DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos);
          if (ucw_ftruncate(F->fd, f->pos) < 0)
-           die("Error truncating %s: %m", f->name);
+           bthrow(f, "fb.write", "Error truncating %s: %m", f->name);
        }
       else
        asio_submit(r);
index fff443786b7623f3881e18b631f567a6c355ded0..46b7e71bc256d6689243798a66b120cbd235e9f7 100644 (file)
@@ -56,7 +56,7 @@ long_seek:
              int l = read(F->fd, f->buffer, MIN(skip, blen));
              if (unlikely(l <= 0))
                if (l < 0)
-                 die("Error reading %s: %m", f->name);
+                 bthrow(f, "fb.read", "Error reading %s: %m", f->name);
                else
                  {
                    F->wpos -= skip;
@@ -114,14 +114,14 @@ seek:
       /* Do lseek() */
       F->wpos = f->pos + (f->buffer - f->bptr);
       if (ucw_seek(F->fd, F->wpos, SEEK_SET) < 0)
-       die("Error seeking %s: %m", f->name);
+       bthrow(f, "fb.read", "Error seeking %s: %m", f->name);
     }
   /* Read (part of) buffer */
   do
     {
       int l = read(F->fd, read_ptr, read_len);
       if (unlikely(l < 0))
-       die("Error reading %s: %m", f->name);
+       bthrow(f, "fb.read", "Error reading %s: %m", f->name);
       if (!l)
        if (unlikely(read_ptr < f->bptr))
          goto eof;
@@ -149,7 +149,7 @@ bfd_spout(struct fastbuf *f)
 {
   /* Do delayed lseek() if needed */
   if (FB_FILE(f)->wpos != f->pos && ucw_seek(FB_FILE(f)->fd, f->pos, SEEK_SET) < 0)
-    die("Error seeking %s: %m", f->name);
+    bthrow(f, "fb.write", "Error seeking %s: %m", f->name);
 
   int l = f->bptr - f->buffer;
   byte *c = f->buffer;
@@ -161,7 +161,7 @@ bfd_spout(struct fastbuf *f)
     {
       int z = write(FB_FILE(f)->fd, c, l);
       if (z <= 0)
-       die("Error writing %s: %m", f->name);
+       bthrow(f, "fb.write", "Error writing %s: %m", f->name);
       l -= z;
       c += z;
     }
index 4e7d406b4824507ea05d049e0329cf7e562602f3..f909da3bb027f77f733f1835a302c319975ebeb7 100644 (file)
@@ -27,7 +27,7 @@ bfl_refill(struct fastbuf *f)
   int max = MIN(FB_LIMFD(f)->limit - f->pos, f->bufend - f->buffer);
   int l = read(FB_LIMFD(f)->fd, f->buffer, max);
   if (l < 0)
-    die("Error reading %s: %m", f->name);
+    bthrow(f, "fb.read", "Error reading %s: %m", f->name);
   f->bstop = f->buffer + l;
   f->pos += l;
   return l;
index a100d481f05016f720295ea1183499804f635e1c..2356e86bd128729175fe4c95b62babbfa5379ad0 100644 (file)
@@ -128,7 +128,7 @@ fbmem_seek(struct fastbuf *f, ucw_off_t pos, int whence)
       FB_MEM(f)->block = NULL;
       return 1;
     }
-  die("fbmem_seek to invalid offset");
+  bthrow(f, "fb.seek", "fbmem_seek to invalid offset");
 }
 
 static void
index 25209721a4039cc06f604e546c6183ae8b64dd34..96d482aa3cf65f87e9bd9765231af1c107963074 100644 (file)
@@ -70,7 +70,7 @@ bfmm_map_window(struct fastbuf *f)
   else
     f->buffer = ucw_mmap(f->buffer, ll, prot, MAP_SHARED | MAP_FIXED, F->fd, pos0);
   if (f->buffer == (byte *) MAP_FAILED)
-    die("mmap(%s): %m", f->name);
+    bthrow(f, "fb.mmap", "mmap(%s): %m", f->name);
 #ifdef MADV_SEQUENTIAL
   if (ll > CPU_PAGE_SIZE)
     madvise(f->buffer, ll, MADV_SEQUENTIAL);
@@ -115,7 +115,7 @@ bfmm_spout(struct fastbuf *f)
     {
       F->file_extend = ALIGN_TO(F->file_extend + mmap_extend_size, (ucw_off_t)CPU_PAGE_SIZE);
       if (ucw_ftruncate(F->fd, F->file_extend))
-       die("ftruncate(%s): %m", f->name);
+       bthrow(f, "fb.write", "ftruncate(%s): %m", f->name);
     }
   bfmm_map_window(f);
   f->bstop = f->bptr;
@@ -145,7 +145,7 @@ bfmm_close(struct fastbuf *f)
     munmap(f->buffer, F->window_size);
   if (F->file_extend > F->file_size &&
       ucw_ftruncate(F->fd, F->file_size))
-    die("ftruncate(%s): %m", f->name);
+    bthrow(f, "fb.write", "ftruncate(%s): %m", f->name);
   bclose_file_helper(f, F->fd, F->is_temp_file);
   xfree(f);
 }
index 22f988c6d42d3a03effd32c04704893a96d7ba6f..7543220fa0e89a5853626945415453d0f449f2da 100644 (file)
@@ -12,6 +12,7 @@
 #include "ucw/conf.h"
 #include "ucw/lfs.h"
 #include "ucw/fastbuf.h"
+#include "ucw/trans.h"
 
 #include <fcntl.h>
 #include <stdio.h>
@@ -101,7 +102,7 @@ bopen_fd_internal(int fd, struct fb_params *params, uns mode, const char *name)
        return fb;
       case FB_MMAP:
        if (!~mode && (int)(mode = fcntl(fd, F_GETFL)) < 0)
-          die("Cannot get flags of fd %d: %m", fd);
+          trans_throw("fb.open", NULL, "Cannot get flags of fd %d: %m", fd);
        return bfmmopen_internal(fd, name, mode);
       default:
        ASSERT(0);
@@ -124,7 +125,7 @@ bopen_file_internal(const char *name, int mode, struct fb_params *params, int tr
     if (try)
       return NULL;
     else
-      die("Unable to %s file %s: %m", (mode & O_CREAT) ? "create" : "open", name);
+      trans_throw("fb.open", NULL, "Unable to %s file %s: %m", (mode & O_CREAT) ? "create" : "open", name);
   struct fastbuf *fb = bopen_fd_internal(fd, params, mode, name);
   ASSERT(fb);
   if (mode & O_APPEND)
@@ -162,7 +163,7 @@ bclose_file_helper(struct fastbuf *f, int fd, int is_temp_file)
        msg(L_ERROR, "unlink(%s): %m", f->name);
     case 0:
       if (close(fd))
-       die("close(%s): %m", f->name);
+       msg(L_ERROR, "close(%s): %m", f->name);
     }
 }
 
index 05fb655b3f65934afa6a1d40c59ad75e6992d20b..a68eb5824d86cfd11c4a3151c0962896de793314 100644 (file)
@@ -35,7 +35,7 @@ void bfix_tmp_file(struct fastbuf *fb, const char *name)
   int was_temp = bconfig(fb, BCONFIG_IS_TEMP_FILE, 0);
   ASSERT(was_temp == 1);
   if (rename(fb->name, name))
-    die("Cannot rename %s to %s: %m", fb->name, name);
+    bthrow(fb, "fb.tmp", "Cannot rename %s to %s: %m", fb->name, name);
   bclose(fb);
 }
 
index 67643b26fc3e6e737c7b2ea9f58d5325770757d7..1888ad89e6a2abbfdb72ea5f99bbf234fa68fd53 100644 (file)
@@ -213,6 +213,7 @@ trans_throw(const char *id, void *object, const char *fmt, ...)
 void
 trans_vthrow(const char *id, void *object, const char *fmt, va_list args)
 {
+  trans_init();
   struct mempool *mp = trans_get_pool();
   struct exception *x = mp_alloc(mp, sizeof(*x));
   x->id = id;