]> mj.ucw.cz Git - libucw.git/blob - ucw/fb-atomic.c
Libucw: Cleaned up logging in fb-atomic.
[libucw.git] / ucw / fb-atomic.c
1 /*
2  *      UCW Library -- Atomic Buffered Write to Files
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 /*
11  *      This fastbuf backend is intended for cases where several threads
12  *      of a single program append records to a single file and while the
13  *      record can mix in an arbitrary way, the bytes inside a single
14  *      record must remain uninterrupted.
15  *
16  *      In case of files with fixed record size, we just allocate the
17  *      buffer to hold a whole number of records and take advantage
18  *      of the atomicity of the write() system call.
19  *
20  *      With variable-sized records, we need another solution: when
21  *      writing a record, we keep the fastbuf in a locked state, which
22  *      prevents buffer flushing (and if the buffer becomes full, we extend it),
23  *      and we wait for an explicit commit operation which write()s the buffer
24  *      if the free space in the buffer falls below the expected maximum record
25  *      length.
26  *
27  *      fbatomic_open() is called with the following parameters:
28  *          name - name of the file to open
29  *          master - fbatomic for the master thread or NULL if it's the first open
30  *          bufsize - initial buffer size
31  *          record_len - record length for fixed-size records;
32  *              or -(expected maximum record length) for variable-sized ones.
33  */
34
35 #include "ucw/lib.h"
36 #include "ucw/fastbuf.h"
37 #include "ucw/lfs.h"
38 #include "ucw/conf.h"
39
40 #include <string.h>
41 #include <fcntl.h>
42 #include <unistd.h>
43
44 static uns trace;
45
46 static struct cf_section fbatomic_config = {
47   CF_ITEMS {
48     CF_UNS("Trace", &trace)
49   }
50 };
51
52 static void CONSTRUCTOR fbatomic_init_config(void)
53 {
54   cf_declare_section("FBAtomic", &fbatomic_config, 1);
55 }
56
57 #define TRACE(m...) do { if(trace) msg(L_DEBUG, "FB_ATOMIC: " m); } while(0)
58
59 struct fb_atomic_file {
60   int fd;
61   int use_count;
62   int record_len;
63   uns locked;
64   byte name[1];
65 };
66
67 void
68 fbatomic_internal_write(struct fastbuf *f)
69 {
70   struct fb_atomic_file *af = FB_ATOMIC(f)->af;
71   int size = f->bptr - f->buffer;
72   if (size)
73     {
74       ASSERT(af->record_len < 0 || !(size % af->record_len));
75       int res = write(af->fd, f->buffer, size);
76       if (res < 0)
77         die("Error writing %s: %m", f->name);
78       if (res != size)
79         die("Unexpected partial write to %s: written only %d bytes of %d", f->name, res, size);
80       f->bptr = f->buffer;
81     }
82 }
83
84 static void
85 fbatomic_spout(struct fastbuf *f)
86 {
87   if (f->bptr < f->bufend)              /* Explicit flushes should be ignored */
88     return;
89
90   struct fb_atomic *F = FB_ATOMIC(f);
91   if (F->af->locked)
92     {
93       uns written = f->bptr - f->buffer;
94       uns size = f->bufend - f->buffer + F->slack_size;
95       F->slack_size *= 2;
96       TRACE("Reallocating buffer for atomic file %s with slack %d", f->name, F->slack_size);
97       f->buffer = xrealloc(f->buffer, size);
98       f->bufend = f->buffer + size;
99       f->bptr = f->buffer + written;
100       F->expected_max_bptr = f->bufend - F->slack_size;
101     }
102   else
103     fbatomic_internal_write(f);
104 }
105
106 static void
107 fbatomic_close(struct fastbuf *f)
108 {
109   struct fb_atomic_file *af = FB_ATOMIC(f)->af;
110   fbatomic_internal_write(f);   /* Need to flush explicitly, because the file can be locked */
111   if (!--af->use_count)
112     {
113       close(af->fd);
114       xfree(af);
115     }
116   xfree(f);
117 }
118
119 struct fastbuf *
120 fbatomic_open(const char *name, struct fastbuf *master, uns bufsize, int record_len)
121 {
122   struct fb_atomic *F = xmalloc_zero(sizeof(*F));
123   struct fastbuf *f = &F->fb;
124   struct fb_atomic_file *af;
125   if (master)
126     {
127       af = FB_ATOMIC(master)->af;
128       af->use_count++;
129       ASSERT(af->record_len == record_len);
130     }
131   else
132     {
133       af = xmalloc_zero(sizeof(*af) + strlen(name));
134       if ((af->fd = ucw_open(name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666)) < 0)
135         die("Cannot create %s: %m", name);
136       af->use_count = 1;
137       af->record_len = record_len;
138       af->locked = (record_len < 0);
139       strcpy(af->name, name);
140     }
141   F->af = af;
142   if (record_len > 0 && bufsize % record_len)
143     bufsize += record_len - (bufsize % record_len);
144   f->buffer = xmalloc(bufsize);
145   f->bufend = f->buffer + bufsize;
146   F->slack_size = (record_len < 0) ? -record_len : 0;
147   ASSERT(bufsize > F->slack_size);
148   F->expected_max_bptr = f->bufend - F->slack_size;
149   f->bptr = f->bstop = f->buffer;
150   f->name = af->name;
151   f->spout = fbatomic_spout;
152   f->close = fbatomic_close;
153   return f;
154 }
155
156 #ifdef TEST
157
158 int main(int argc UNUSED, char **argv UNUSED)
159 {
160   struct fastbuf *f, *g;
161
162   // Always trace in the test
163   trace = 1;
164
165   msg(L_INFO, "Testing block writes");
166   f = fbatomic_open("test", NULL, 16, 4);
167   for (u32 i=0; i<17; i++)
168     bwrite(f, &i, 4);
169   bclose(f);
170
171   msg(L_INFO, "Testing interleaved var-size writes");
172   f = fbatomic_open("test2", NULL, 23, -5);
173   g = fbatomic_open("test2", f, 23, -5);
174   for (int i=0; i<100; i++)
175     {
176       struct fastbuf *x = (i%2) ? g : f;
177       bprintf(x, "%c<%d>\n", "fg"[i%2], ((259309*i) % 1000000) >> (i % 8));
178       fbatomic_commit(x);
179     }
180   bclose(f);
181   bclose(g);
182
183   return 0;
184 }
185
186 #endif