]> mj.ucw.cz Git - libucw.git/commitdiff
* dev-threads merged to dev-img
authorPavel Charvat <pavel.charvat@netcentrum.cz>
Wed, 4 Oct 2006 12:36:48 +0000 (14:36 +0200)
committerPavel Charvat <pavel.charvat@netcentrum.cz>
Wed, 4 Oct 2006 12:36:48 +0000 (14:36 +0200)
* scanner generates only one file for image duplicates search
  (I will optimize this later... it has been disabled anyway)
* parallel imagesig analyser

(only tried to compile... not tested yet)

Merge with git+ssh://git.ucw.cz/projects/sherlock/GIT/sherlock.git#dev-threads

lib/Makefile
lib/THREADS [new file with mode: 0644]
lib/fastbuf.h
lib/fb-atomic.c [new file with mode: 0644]
lib/ff-string.c
lib/gbuf.h
lib/lib.h

index 554bfad940253d1f5e2241732483211d77247cdf..15dc0a246d60df199676c55244b9cce182ae0052 100644 (file)
@@ -15,7 +15,7 @@ LIBUCW_MODS= \
        ipaccess \
        profile \
        fastbuf ff-binary ff-string ff-printf ff-utf8 \
-       fb-file carefulio fb-mem fb-temp fb-mmap fb-limfd fb-buffer fb-grow \
+       fb-file carefulio fb-mem fb-temp fb-mmap fb-limfd fb-buffer fb-grow fb-atomic \
        str_ctype str_upper str_lower unicode-utf8 stkstring \
        wildmatch wordsplit ctmatch patimatch patmatch regex \
        prime primetable random timer randomkey \
diff --git a/lib/THREADS b/lib/THREADS
new file mode 100644 (file)
index 0000000..1cb77b6
--- /dev/null
@@ -0,0 +1,10 @@
+Generally, functions in the UCW library are reentrant as long as you call them
+on different data. Calling on the same object is not, unless otherwise told,
+which also includes functions acting on any kind of global state.
+
+There are some exceptions:
+
+- logging functions are safe as long as you don't switch log files
+- setproctitle() is not safe, it modifies global state
+- handle_signal() is not safe, it modifies global state
+- stk_printf() is not safe, it currently uses a global buffer (FIXME!)
index 6dbacce5dcebf52c73b5fdeac47cfd8511ceda23..922798ff183515a3d102191c6e614569373088e1 100644 (file)
@@ -19,7 +19,6 @@
 #include <alloca.h>
 
 #include "lib/unaligned.h"
-#include "lib/bbuf.h"
 
 /*
  *  Generic buffered I/O. You supply hooks to be called for low-level operations
@@ -118,6 +117,26 @@ struct fastbuf *fbgrow_create(unsigned basic_size);
 void fbgrow_reset(struct fastbuf *b);                  /* Reset stream and prepare for writing */
 void fbgrow_rewind(struct fastbuf *b);                 /* Prepare for reading */
 
+/* FastO with atomic writes for multi-threaded programs */
+
+struct fb_atomic {
+  struct fastbuf fb;
+  struct fb_atomic_file *af;
+  byte *expected_max_bptr;
+  uns slack_size;
+};
+#define FB_ATOMIC(f) ((struct fb_atomic *)(f)->is_fastbuf)
+
+struct fastbuf *fbatomic_open(byte *name, struct fastbuf *master, uns bufsize, int record_len);
+void fbatomic_internal_write(struct fastbuf *b);
+
+static inline void
+fbatomic_commit(struct fastbuf *b)
+{
+  if (b->bptr >= ((struct fb_atomic *)b)->expected_max_bptr)
+    fbatomic_internal_write(b);
+}
+
 /* Configuring stream parameters */
 
 int bconfig(struct fastbuf *f, uns type, int data);
@@ -322,7 +341,8 @@ int bgets_nodie(struct fastbuf *f, byte *b, uns l);
 byte *bgets0(struct fastbuf *f, byte *b, uns l);
 
 struct mempool;
-uns bgets_bb(struct fastbuf *f, bb_t *b, uns limit);
+struct bb_t;
+uns bgets_bb(struct fastbuf *f, struct bb_t *b, uns limit);
 byte *bgets_mp(struct fastbuf *f, struct mempool *mp);
 
 struct bgets_stk_struct {
diff --git a/lib/fb-atomic.c b/lib/fb-atomic.c
new file mode 100644 (file)
index 0000000..73dd8b1
--- /dev/null
@@ -0,0 +1,169 @@
+/*
+ *     UCW Library -- Atomic Buffered Write to Files
+ *
+ *     (c) 2006 Martin Mares <mj@ucw.cz>
+ *
+ *     This software may be freely distributed and used according to the terms
+ *     of the GNU Lesser General Public License.
+ */
+
+/*
+ *     This fastbuf backend is intended for cases where several threads
+ *     of a single program append records to a single file and while the
+ *     record can mix in an arbitrary way, the bytes inside a single
+ *     record must remain uninterrupted.
+ *
+ *     In case of files with fixed record size, we just allocate the
+ *     buffer to hold a whole number of records and take advantage
+ *     of the atomicity of the write() system call.
+ *
+ *     With variable-sized records, we need another solution: when
+ *     writing a record, we keep the fastbuf in a locked state, which
+ *     prevents buffer flushing (and if the buffer becomes full, we extend it),
+ *     and we wait for an explicit commit operation which write()s the buffer
+ *     if the free space in the buffer falls below the expected maximum record
+ *     length.
+ *
+ *     fbatomic_open() is called with the following parameters:
+ *         name - name of the file to open
+ *         master - fbatomic for the master thread or NULL if it's the first open
+ *         bufsize - initial buffer size
+ *         record_len - record length for fixed-size records;
+ *             or -(expected maximum record length) for variable-sized ones.
+ */
+
+#define LOCAL_DEBUG
+
+#include "lib/lib.h"
+#include "lib/fastbuf.h"
+#include "lib/lfs.h"
+
+#include <string.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+struct fb_atomic_file {
+  int fd;
+  int use_count;
+  int record_len;
+  uns locked;
+  byte name[1];
+};
+
+void
+fbatomic_internal_write(struct fastbuf *f)
+{
+  struct fb_atomic_file *af = FB_ATOMIC(f)->af;
+  int size = f->bptr - f->buffer;
+  if (size)
+    {
+      ASSERT(af->record_len < 0 || !(size % af->record_len));
+      int res = write(af->fd, f->buffer, size);
+      if (res < 0)
+       die("Error writing %s: %m", f->name);
+      if (res != size)
+       die("Unexpected partial write to %s: written only %d bytes of %d", f->name, res, size);
+      f->bptr = f->buffer;
+    }
+}
+
+static void
+fbatomic_spout(struct fastbuf *f)
+{
+  if (f->bptr < f->bufend)             /* Explicit flushes should be ignored */
+    return;
+
+  struct fb_atomic *F = FB_ATOMIC(f);
+  if (F->af->locked)
+    {
+      uns written = f->bptr - f->buffer;
+      uns size = f->bufend - f->buffer + F->slack_size;
+      F->slack_size *= 2;
+      DBG("Reallocating buffer for atomic file %s with slack %d", f->name, F->slack_size);
+      f->buffer = xrealloc(f->buffer, size);
+      f->bufend = f->buffer + size;
+      f->bptr = f->buffer + written;
+      F->expected_max_bptr = f->bufend - F->slack_size;
+    }
+  else
+    fbatomic_internal_write(f);
+}
+
+static void
+fbatomic_close(struct fastbuf *f)
+{
+  struct fb_atomic_file *af = FB_ATOMIC(f)->af;
+  fbatomic_internal_write(f);  /* Need to flush explicitly, because the file can be locked */
+  if (!--af->use_count)
+    {
+      close(af->fd);
+      xfree(af);
+    }
+  xfree(f);
+}
+
+struct fastbuf *
+fbatomic_open(byte *name, struct fastbuf *master, uns bufsize, int record_len)
+{
+  struct fb_atomic *F = xmalloc_zero(sizeof(*F));
+  struct fastbuf *f = &F->fb;
+  struct fb_atomic_file *af;
+  if (master)
+    {
+      af = FB_ATOMIC(master)->af;
+      af->use_count++;
+      ASSERT(af->record_len == record_len);
+    }
+  else
+    {
+      af = xmalloc_zero(sizeof(*af) + strlen(name));
+      if ((af->fd = sh_open(name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666)) < 0)
+       die("Cannot create %s: %m", name);
+      af->use_count = 1;
+      af->record_len = record_len;
+      af->locked = (record_len < 0);
+      strcpy(af->name, name);
+    }
+  F->af = af;
+  if (record_len > 0 && bufsize % record_len)
+    bufsize += record_len - (bufsize % record_len);
+  f->buffer = xmalloc(bufsize);
+  f->bufend = f->buffer + bufsize;
+  F->slack_size = (record_len < 0) ? -record_len : 0;
+  ASSERT(bufsize > F->slack_size);
+  F->expected_max_bptr = f->bufend - F->slack_size;
+  f->bptr = f->bstop = f->buffer;
+  f->name = af->name;
+  f->spout = fbatomic_spout;
+  f->close = fbatomic_close;
+  return f;
+}
+
+#ifdef TEST
+
+int main(int argc UNUSED, char **argv UNUSED)
+{
+  struct fastbuf *f, *g;
+
+  log(L_INFO, "Testing block writes");
+  f = fbatomic_open("test", NULL, 16, 4);
+  for (u32 i=0; i<17; i++)
+    bwrite(f, &i, 4);
+  bclose(f);
+
+  log(L_INFO, "Testing interleaved var-size writes");
+  f = fbatomic_open("test2", NULL, 23, -5);
+  g = fbatomic_open("test2", f, 23, -5);
+  for (int i=0; i<100; i++)
+    {
+      struct fastbuf *x = (i%2) ? g : f;
+      bprintf(x, "%c<%d>\n", "fg"[i%2], ((259309*i) % 1000000) >> (i % 8));
+      fbatomic_commit(x);
+    }
+  bclose(f);
+  bclose(g);
+
+  return 0;
+}
+
+#endif
index cb41b8c7c0f5ffa73a23f1b229f7f9874af433e9..c8fa0fb35b48cc06a4bc44c6ee4280af5587f2ff 100644 (file)
@@ -11,6 +11,7 @@
 #include "lib/lib.h"
 #include "lib/fastbuf.h"
 #include "lib/mempool.h"
+#include "lib/bbuf.h"
 
 byte *                                 /* Non-standard */
 bgets(struct fastbuf *f, byte *b, uns l)
@@ -79,7 +80,7 @@ exit:
 }
 
 uns
-bgets_bb(struct fastbuf *f, bb_t *bb, uns limit)
+bgets_bb(struct fastbuf *f, struct bb_t *bb, uns limit)
 {
   ASSERT(limit);
   byte *src;
index f0e618bf616d40e7a688b58461cb67d05fe28bc0..daf0bfbb6b74be7729c3aa34b56e31d46cae1772 100644 (file)
@@ -16,7 +16,7 @@
 
 #define        BUF_T   GBUF_PREFIX(t)
 
-typedef struct
+typedef struct BUF_T
 {
   uns len;
   GBUF_TYPE *ptr;
index 5ab134f0ccbe2d0f3b9756f18942bef6c5413983..ec9d58b9a9d5e80cb1ed7e8581f6ca50d0b09c90 100644 (file)
--- a/lib/lib.h
+++ b/lib/lib.h
 #define likely(x) __builtin_expect((x),1)
 #define unlikely(x) __builtin_expect((x),0)
 
+#if __GNUC__ >= 4 || __GNUC__ == 3 && __GNUC_MINOR__ >= 3
+#define ALWAYS_INLINE inline __attribute__((always_inline))
+#define NO_INLINE __attribute__((noinline))
+#else
+#define ALWAYS_INLINE inline
+#endif
+
 #if __GNUC__ >= 4
 #define LIKE_MALLOC __attribute__((malloc))
 #define SENTINEL_CHECK __attribute__((sentinel))