]> mj.ucw.cz Git - libucw.git/blob - ucw/fb-atomic.c
Fastbuf: Get rid of ->is_fastbuf typechecking hack
[libucw.git] / ucw / fb-atomic.c
1 /*
2  *      UCW Library -- Atomic Buffered Write to Files
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "ucw/lib.h"
11 #include "ucw/fastbuf.h"
12 #include "ucw/io.h"
13 #include "ucw/conf.h"
14 #include "ucw/trans.h"
15
16 #include <string.h>
17 #include <fcntl.h>
18 #include <unistd.h>
19
20 static uns trace;
21
22 #ifndef TEST
23
24 static struct cf_section fbatomic_config = {
25   CF_ITEMS {
26     CF_UNS("Trace", &trace)
27   }
28 };
29
30 static void CONSTRUCTOR fbatomic_init_config(void)
31 {
32   cf_declare_section("FBAtomic", &fbatomic_config, 1);
33 }
34
35 #endif
36
37 #define FB_ATOMIC(f) ((struct fb_atomic *)(f))
38 #define TRACE(m...) do { if(trace) msg(L_DEBUG, "FB_ATOMIC: " m); } while(0)
39
40 struct fb_atomic_file {
41   int fd;
42   int use_count;
43   int record_len;
44   uns locked;
45   byte name[1];
46 };
47
48 void
49 fbatomic_internal_write(struct fastbuf *f)
50 {
51   struct fb_atomic_file *af = FB_ATOMIC(f)->af;
52   int size = f->bptr - f->buffer;
53   if (size)
54     {
55       ASSERT(af->record_len < 0 || !(size % af->record_len));
56       int res = write(af->fd, f->buffer, size);
57       if (res < 0)
58         bthrow(f, "write", "Error writing %s: %m", f->name);
59       if (res != size)
60         bthrow(f, "write", "Unexpected partial write to %s: written only %d bytes of %d", f->name, res, size);
61       f->bptr = f->buffer;
62     }
63 }
64
65 static void
66 fbatomic_spout(struct fastbuf *f)
67 {
68   if (f->bptr < f->bufend)              /* Explicit flushes should be ignored */
69     return;
70
71   struct fb_atomic *F = FB_ATOMIC(f);
72   if (F->af->locked)
73     {
74       uns written = f->bptr - f->buffer;
75       uns size = f->bufend - f->buffer + F->slack_size;
76       F->slack_size *= 2;
77       TRACE("Reallocating buffer for atomic file %s with slack %d", f->name, F->slack_size);
78       f->buffer = xrealloc(f->buffer, size);
79       f->bufend = f->buffer + size;
80       f->bptr = f->buffer + written;
81       F->expected_max_bptr = f->bufend - F->slack_size;
82     }
83   else
84     fbatomic_internal_write(f);
85 }
86
87 static void
88 fbatomic_close(struct fastbuf *f)
89 {
90   struct fb_atomic_file *af = FB_ATOMIC(f)->af;
91   if (!(f->flags & FB_DEAD))
92     fbatomic_internal_write(f); /* Need to flush explicitly, because the file can be locked */
93   if (!--af->use_count)
94     {
95       close(af->fd);
96       xfree(af);
97     }
98   xfree(f);
99 }
100
101 struct fastbuf *
102 fbatomic_open(const char *name, struct fastbuf *master, uns bufsize, int record_len)
103 {
104   struct fb_atomic *F = xmalloc_zero(sizeof(*F));
105   struct fastbuf *f = &F->fb;
106   struct fb_atomic_file *af;
107   if (master)
108     {
109       af = FB_ATOMIC(master)->af;
110       af->use_count++;
111       ASSERT(af->record_len == record_len);
112     }
113   else
114     {
115       int fd = ucw_open(name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666);
116       if (fd < 0)
117         trans_throw("ucw.fb.open", NULL, "Cannot create %s: %m", name);
118       af = xmalloc_zero(sizeof(*af) + strlen(name));
119       af->fd = fd;
120       af->use_count = 1;
121       af->record_len = record_len;
122       af->locked = (record_len < 0);
123       strcpy(af->name, name);
124     }
125   F->af = af;
126   if (record_len > 0 && bufsize % record_len)
127     bufsize += record_len - (bufsize % record_len);
128   f->buffer = xmalloc(bufsize);
129   f->bufend = f->buffer + bufsize;
130   F->slack_size = (record_len < 0) ? -record_len : 0;
131   ASSERT(bufsize > F->slack_size);
132   F->expected_max_bptr = f->bufend - F->slack_size;
133   f->bptr = f->bstop = f->buffer;
134   f->name = af->name;
135   f->spout = fbatomic_spout;
136   f->close = fbatomic_close;
137   return f;
138 }
139
140 #ifdef TEST
141
142 int main(int argc UNUSED, char **argv UNUSED)
143 {
144   struct fastbuf *f, *g;
145
146   // Always trace in the test
147   trace = 1;
148
149   msg(L_INFO, "Testing block writes");
150   f = fbatomic_open("test", NULL, 16, 4);
151   for (u32 i=0; i<17; i++)
152     bwrite(f, &i, 4);
153   bclose(f);
154
155   msg(L_INFO, "Testing interleaved var-size writes");
156   f = fbatomic_open("test2", NULL, 23, -5);
157   g = fbatomic_open("test2", f, 23, -5);
158   for (int i=0; i<100; i++)
159     {
160       struct fastbuf *x = (i%2) ? g : f;
161       bprintf(x, "%c<%d>\n", "fg"[i%2], ((259309*i) % 1000000) >> (i % 8));
162       fbatomic_commit(x);
163     }
164   bclose(f);
165   bclose(g);
166
167   return 0;
168 }
169
170 #endif