From: Pavel Charvat Date: Wed, 4 Oct 2006 12:36:48 +0000 (+0200) Subject: * dev-threads merged to dev-img X-Git-Tag: holmes-import~512 X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=4142115bf20293bef998d1f683ac7ce3008178fb;hp=e4c695d49529d03352d241872493a5161e646288;p=libucw.git * dev-threads merged to dev-img * scanner generates only one file for image duplicates search (I will optimize this later... it has been disabled anyway) * parallel imagesig analyser (only tried to compile... not tested yet) Merge with git+ssh://git.ucw.cz/projects/sherlock/GIT/sherlock.git#dev-threads --- diff --git a/lib/Makefile b/lib/Makefile index 554bfad9..15dc0a24 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -15,7 +15,7 @@ LIBUCW_MODS= \ ipaccess \ profile \ fastbuf ff-binary ff-string ff-printf ff-utf8 \ - fb-file carefulio fb-mem fb-temp fb-mmap fb-limfd fb-buffer fb-grow \ + fb-file carefulio fb-mem fb-temp fb-mmap fb-limfd fb-buffer fb-grow fb-atomic \ str_ctype str_upper str_lower unicode-utf8 stkstring \ wildmatch wordsplit ctmatch patimatch patmatch regex \ prime primetable random timer randomkey \ diff --git a/lib/THREADS b/lib/THREADS new file mode 100644 index 00000000..1cb77b65 --- /dev/null +++ b/lib/THREADS @@ -0,0 +1,10 @@ +Generally, functions in the UCW library are reentrant as long as you call them +on different data. Calling on the same object is not, unless otherwise told, +which also includes functions acting on any kind of global state. + +There are some exceptions: + +- logging functions are safe as long as you don't switch log files +- setproctitle() is not safe, it modifies global state +- handle_signal() is not safe, it modifies global state +- stk_printf() is not safe, it currently uses a global buffer (FIXME!) diff --git a/lib/fastbuf.h b/lib/fastbuf.h index 6dbacce5..922798ff 100644 --- a/lib/fastbuf.h +++ b/lib/fastbuf.h @@ -19,7 +19,6 @@ #include #include "lib/unaligned.h" -#include "lib/bbuf.h" /* * Generic buffered I/O. You supply hooks to be called for low-level operations @@ -118,6 +117,26 @@ struct fastbuf *fbgrow_create(unsigned basic_size); void fbgrow_reset(struct fastbuf *b); /* Reset stream and prepare for writing */ void fbgrow_rewind(struct fastbuf *b); /* Prepare for reading */ +/* FastO with atomic writes for multi-threaded programs */ + +struct fb_atomic { + struct fastbuf fb; + struct fb_atomic_file *af; + byte *expected_max_bptr; + uns slack_size; +}; +#define FB_ATOMIC(f) ((struct fb_atomic *)(f)->is_fastbuf) + +struct fastbuf *fbatomic_open(byte *name, struct fastbuf *master, uns bufsize, int record_len); +void fbatomic_internal_write(struct fastbuf *b); + +static inline void +fbatomic_commit(struct fastbuf *b) +{ + if (b->bptr >= ((struct fb_atomic *)b)->expected_max_bptr) + fbatomic_internal_write(b); +} + /* Configuring stream parameters */ int bconfig(struct fastbuf *f, uns type, int data); @@ -322,7 +341,8 @@ int bgets_nodie(struct fastbuf *f, byte *b, uns l); byte *bgets0(struct fastbuf *f, byte *b, uns l); struct mempool; -uns bgets_bb(struct fastbuf *f, bb_t *b, uns limit); +struct bb_t; +uns bgets_bb(struct fastbuf *f, struct bb_t *b, uns limit); byte *bgets_mp(struct fastbuf *f, struct mempool *mp); struct bgets_stk_struct { diff --git a/lib/fb-atomic.c b/lib/fb-atomic.c new file mode 100644 index 00000000..73dd8b13 --- /dev/null +++ b/lib/fb-atomic.c @@ -0,0 +1,169 @@ +/* + * UCW Library -- Atomic Buffered Write to Files + * + * (c) 2006 Martin Mares + * + * This software may be freely distributed and used according to the terms + * of the GNU Lesser General Public License. + */ + +/* + * This fastbuf backend is intended for cases where several threads + * of a single program append records to a single file and while the + * record can mix in an arbitrary way, the bytes inside a single + * record must remain uninterrupted. + * + * In case of files with fixed record size, we just allocate the + * buffer to hold a whole number of records and take advantage + * of the atomicity of the write() system call. + * + * With variable-sized records, we need another solution: when + * writing a record, we keep the fastbuf in a locked state, which + * prevents buffer flushing (and if the buffer becomes full, we extend it), + * and we wait for an explicit commit operation which write()s the buffer + * if the free space in the buffer falls below the expected maximum record + * length. + * + * fbatomic_open() is called with the following parameters: + * name - name of the file to open + * master - fbatomic for the master thread or NULL if it's the first open + * bufsize - initial buffer size + * record_len - record length for fixed-size records; + * or -(expected maximum record length) for variable-sized ones. + */ + +#define LOCAL_DEBUG + +#include "lib/lib.h" +#include "lib/fastbuf.h" +#include "lib/lfs.h" + +#include +#include +#include + +struct fb_atomic_file { + int fd; + int use_count; + int record_len; + uns locked; + byte name[1]; +}; + +void +fbatomic_internal_write(struct fastbuf *f) +{ + struct fb_atomic_file *af = FB_ATOMIC(f)->af; + int size = f->bptr - f->buffer; + if (size) + { + ASSERT(af->record_len < 0 || !(size % af->record_len)); + int res = write(af->fd, f->buffer, size); + if (res < 0) + die("Error writing %s: %m", f->name); + if (res != size) + die("Unexpected partial write to %s: written only %d bytes of %d", f->name, res, size); + f->bptr = f->buffer; + } +} + +static void +fbatomic_spout(struct fastbuf *f) +{ + if (f->bptr < f->bufend) /* Explicit flushes should be ignored */ + return; + + struct fb_atomic *F = FB_ATOMIC(f); + if (F->af->locked) + { + uns written = f->bptr - f->buffer; + uns size = f->bufend - f->buffer + F->slack_size; + F->slack_size *= 2; + DBG("Reallocating buffer for atomic file %s with slack %d", f->name, F->slack_size); + f->buffer = xrealloc(f->buffer, size); + f->bufend = f->buffer + size; + f->bptr = f->buffer + written; + F->expected_max_bptr = f->bufend - F->slack_size; + } + else + fbatomic_internal_write(f); +} + +static void +fbatomic_close(struct fastbuf *f) +{ + struct fb_atomic_file *af = FB_ATOMIC(f)->af; + fbatomic_internal_write(f); /* Need to flush explicitly, because the file can be locked */ + if (!--af->use_count) + { + close(af->fd); + xfree(af); + } + xfree(f); +} + +struct fastbuf * +fbatomic_open(byte *name, struct fastbuf *master, uns bufsize, int record_len) +{ + struct fb_atomic *F = xmalloc_zero(sizeof(*F)); + struct fastbuf *f = &F->fb; + struct fb_atomic_file *af; + if (master) + { + af = FB_ATOMIC(master)->af; + af->use_count++; + ASSERT(af->record_len == record_len); + } + else + { + af = xmalloc_zero(sizeof(*af) + strlen(name)); + if ((af->fd = sh_open(name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666)) < 0) + die("Cannot create %s: %m", name); + af->use_count = 1; + af->record_len = record_len; + af->locked = (record_len < 0); + strcpy(af->name, name); + } + F->af = af; + if (record_len > 0 && bufsize % record_len) + bufsize += record_len - (bufsize % record_len); + f->buffer = xmalloc(bufsize); + f->bufend = f->buffer + bufsize; + F->slack_size = (record_len < 0) ? -record_len : 0; + ASSERT(bufsize > F->slack_size); + F->expected_max_bptr = f->bufend - F->slack_size; + f->bptr = f->bstop = f->buffer; + f->name = af->name; + f->spout = fbatomic_spout; + f->close = fbatomic_close; + return f; +} + +#ifdef TEST + +int main(int argc UNUSED, char **argv UNUSED) +{ + struct fastbuf *f, *g; + + log(L_INFO, "Testing block writes"); + f = fbatomic_open("test", NULL, 16, 4); + for (u32 i=0; i<17; i++) + bwrite(f, &i, 4); + bclose(f); + + log(L_INFO, "Testing interleaved var-size writes"); + f = fbatomic_open("test2", NULL, 23, -5); + g = fbatomic_open("test2", f, 23, -5); + for (int i=0; i<100; i++) + { + struct fastbuf *x = (i%2) ? g : f; + bprintf(x, "%c<%d>\n", "fg"[i%2], ((259309*i) % 1000000) >> (i % 8)); + fbatomic_commit(x); + } + bclose(f); + bclose(g); + + return 0; +} + +#endif diff --git a/lib/ff-string.c b/lib/ff-string.c index cb41b8c7..c8fa0fb3 100644 --- a/lib/ff-string.c +++ b/lib/ff-string.c @@ -11,6 +11,7 @@ #include "lib/lib.h" #include "lib/fastbuf.h" #include "lib/mempool.h" +#include "lib/bbuf.h" byte * /* Non-standard */ bgets(struct fastbuf *f, byte *b, uns l) @@ -79,7 +80,7 @@ exit: } uns -bgets_bb(struct fastbuf *f, bb_t *bb, uns limit) +bgets_bb(struct fastbuf *f, struct bb_t *bb, uns limit) { ASSERT(limit); byte *src; diff --git a/lib/gbuf.h b/lib/gbuf.h index f0e618bf..daf0bfbb 100644 --- a/lib/gbuf.h +++ b/lib/gbuf.h @@ -16,7 +16,7 @@ #define BUF_T GBUF_PREFIX(t) -typedef struct +typedef struct BUF_T { uns len; GBUF_TYPE *ptr; diff --git a/lib/lib.h b/lib/lib.h index 5ab134f0..ec9d58b9 100644 --- a/lib/lib.h +++ b/lib/lib.h @@ -65,6 +65,13 @@ #define likely(x) __builtin_expect((x),1) #define unlikely(x) __builtin_expect((x),0) +#if __GNUC__ >= 4 || __GNUC__ == 3 && __GNUC_MINOR__ >= 3 +#define ALWAYS_INLINE inline __attribute__((always_inline)) +#define NO_INLINE __attribute__((noinline)) +#else +#define ALWAYS_INLINE inline +#endif + #if __GNUC__ >= 4 #define LIKE_MALLOC __attribute__((malloc)) #define SENTINEL_CHECK __attribute__((sentinel))