]> mj.ucw.cz Git - libucw.git/blob - ucw/fb-atomic.c
03f0a65f8c893837305e9d5120e37ebaa57fbd67
[libucw.git] / ucw / fb-atomic.c
1 /*
2  *      UCW Library -- Atomic Buffered Write to Files
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include <ucw/lib.h>
11 #include <ucw/fastbuf.h>
12 #include <ucw/io.h>
13 #include <ucw/conf.h>
14 #include <ucw/trans.h>
15
16 #include <string.h>
17 #include <fcntl.h>
18 #include <unistd.h>
19
20 static uns trace;
21
22 #ifndef TEST
23
24 static struct cf_section fbatomic_config = {
25   CF_ITEMS {
26     CF_UNS("Trace", &trace),
27     CF_END
28   }
29 };
30
31 static void CONSTRUCTOR fbatomic_init_config(void)
32 {
33   cf_declare_section("FBAtomic", &fbatomic_config, 1);
34 }
35
36 #endif
37
38 #define FB_ATOMIC(f) ((struct fb_atomic *)(f))
39 #define TRACE(m...) do { if(trace) msg(L_DEBUG, "FB_ATOMIC: " m); } while(0)
40
41 struct fb_atomic_file {
42   int fd;
43   int use_count;
44   int record_len;
45   uns locked;
46   byte name[1];
47 };
48
49 void
50 fbatomic_internal_write(struct fastbuf *f)
51 {
52   struct fb_atomic_file *af = FB_ATOMIC(f)->af;
53   int size = f->bptr - f->buffer;
54   if (size)
55     {
56       ASSERT(af->record_len < 0 || !(size % af->record_len));
57       int res = write(af->fd, f->buffer, size);
58       if (res < 0)
59         bthrow(f, "write", "Error writing %s: %m", f->name);
60       if (res != size)
61         bthrow(f, "write", "Unexpected partial write to %s: written only %d bytes of %d", f->name, res, size);
62       f->bptr = f->buffer;
63     }
64 }
65
66 static void
67 fbatomic_spout(struct fastbuf *f)
68 {
69   if (f->bptr < f->bufend)              /* Explicit flushes should be ignored */
70     return;
71
72   struct fb_atomic *F = FB_ATOMIC(f);
73   if (F->af->locked)
74     {
75       uns written = f->bptr - f->buffer;
76       uns size = f->bufend - f->buffer + F->slack_size;
77       F->slack_size *= 2;
78       TRACE("Reallocating buffer for atomic file %s with slack %d", f->name, F->slack_size);
79       f->buffer = xrealloc(f->buffer, size);
80       f->bufend = f->buffer + size;
81       f->bptr = f->buffer + written;
82       f->bstop = f->buffer;
83       F->expected_max_bptr = f->bufend - F->slack_size;
84     }
85   else
86     fbatomic_internal_write(f);
87 }
88
89 static void
90 fbatomic_close(struct fastbuf *f)
91 {
92   struct fb_atomic_file *af = FB_ATOMIC(f)->af;
93   if (!(f->flags & FB_DEAD))
94     fbatomic_internal_write(f); /* Need to flush explicitly, because the file can be locked */
95   if (!--af->use_count)
96     {
97       close(af->fd);
98       xfree(af);
99     }
100   xfree(f);
101 }
102
103 struct fastbuf *
104 fbatomic_open(const char *name, struct fastbuf *master, uns bufsize, int record_len)
105 {
106   struct fb_atomic *F = xmalloc_zero(sizeof(*F));
107   struct fastbuf *f = &F->fb;
108   struct fb_atomic_file *af;
109   if (master)
110     {
111       af = FB_ATOMIC(master)->af;
112       af->use_count++;
113       ASSERT(af->record_len == record_len);
114     }
115   else
116     {
117       int fd = ucw_open(name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666);
118       if (fd < 0)
119         trans_throw("ucw.fb.open", NULL, "Cannot create %s: %m", name);
120       af = xmalloc_zero(sizeof(*af) + strlen(name));
121       af->fd = fd;
122       af->use_count = 1;
123       af->record_len = record_len;
124       af->locked = (record_len < 0);
125       strcpy(af->name, name);
126     }
127   F->af = af;
128   if (record_len > 0 && bufsize % record_len)
129     bufsize += record_len - (bufsize % record_len);
130   f->buffer = xmalloc(bufsize);
131   f->bufend = f->buffer + bufsize;
132   F->slack_size = (record_len < 0) ? -record_len : 0;
133   ASSERT(bufsize > F->slack_size);
134   F->expected_max_bptr = f->bufend - F->slack_size;
135   f->bptr = f->bstop = f->buffer;
136   f->name = af->name;
137   f->spout = fbatomic_spout;
138   f->close = fbatomic_close;
139   return f;
140 }
141
142 #ifdef TEST
143
144 int main(int argc UNUSED, char **argv UNUSED)
145 {
146   struct fastbuf *f, *g;
147
148   // Always trace in the test
149   trace = 1;
150
151   msg(L_INFO, "Testing block writes");
152   f = fbatomic_open("test", NULL, 16, 4);
153   for (u32 i=0; i<17; i++)
154     bwrite(f, &i, 4);
155   bclose(f);
156
157   msg(L_INFO, "Testing interleaved var-size writes");
158   f = fbatomic_open("test2", NULL, 23, -5);
159   g = fbatomic_open("test2", f, 23, -5);
160   for (int i=0; i<100; i++)
161     {
162       struct fastbuf *x = (i%2) ? g : f;
163       bprintf(x, "%c<%d>\n", "fg"[i%2], ((259309*i) % 1000000) >> (i % 8));
164       fbatomic_commit(x);
165     }
166   bclose(f);
167   bclose(g);
168
169   return 0;
170 }
171
172 #endif