]> mj.ucw.cz Git - libucw.git/blob - ucw/fb-atomic.c
Packages: install-ucw-sorter-api make target moved into install-libucw-api.
[libucw.git] / ucw / fb-atomic.c
1 /*
2  *      UCW Library -- Atomic Buffered Write to Files
3  *
4  *      (c) 2006 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include <ucw/lib.h>
11 #include <ucw/fastbuf.h>
12 #include <ucw/io.h>
13 #include <ucw/conf.h>
14 #include <ucw/trans.h>
15
16 #include <string.h>
17 #include <fcntl.h>
18 #include <unistd.h>
19
20 static uns trace;
21
22 #ifndef TEST
23
24 static struct cf_section fbatomic_config = {
25   CF_ITEMS {
26     CF_UNS("Trace", &trace)
27   }
28 };
29
30 static void CONSTRUCTOR fbatomic_init_config(void)
31 {
32   cf_declare_section("FBAtomic", &fbatomic_config, 1);
33 }
34
35 #endif
36
37 #define FB_ATOMIC(f) ((struct fb_atomic *)(f))
38 #define TRACE(m...) do { if(trace) msg(L_DEBUG, "FB_ATOMIC: " m); } while(0)
39
40 struct fb_atomic_file {
41   int fd;
42   int use_count;
43   int record_len;
44   uns locked;
45   byte name[1];
46 };
47
48 void
49 fbatomic_internal_write(struct fastbuf *f)
50 {
51   struct fb_atomic_file *af = FB_ATOMIC(f)->af;
52   int size = f->bptr - f->buffer;
53   if (size)
54     {
55       ASSERT(af->record_len < 0 || !(size % af->record_len));
56       int res = write(af->fd, f->buffer, size);
57       if (res < 0)
58         bthrow(f, "write", "Error writing %s: %m", f->name);
59       if (res != size)
60         bthrow(f, "write", "Unexpected partial write to %s: written only %d bytes of %d", f->name, res, size);
61       f->bptr = f->buffer;
62     }
63 }
64
65 static void
66 fbatomic_spout(struct fastbuf *f)
67 {
68   if (f->bptr < f->bufend)              /* Explicit flushes should be ignored */
69     return;
70
71   struct fb_atomic *F = FB_ATOMIC(f);
72   if (F->af->locked)
73     {
74       uns written = f->bptr - f->buffer;
75       uns size = f->bufend - f->buffer + F->slack_size;
76       F->slack_size *= 2;
77       TRACE("Reallocating buffer for atomic file %s with slack %d", f->name, F->slack_size);
78       f->buffer = xrealloc(f->buffer, size);
79       f->bufend = f->buffer + size;
80       f->bptr = f->buffer + written;
81       f->bstop = f->buffer;
82       F->expected_max_bptr = f->bufend - F->slack_size;
83     }
84   else
85     fbatomic_internal_write(f);
86 }
87
88 static void
89 fbatomic_close(struct fastbuf *f)
90 {
91   struct fb_atomic_file *af = FB_ATOMIC(f)->af;
92   if (!(f->flags & FB_DEAD))
93     fbatomic_internal_write(f); /* Need to flush explicitly, because the file can be locked */
94   if (!--af->use_count)
95     {
96       close(af->fd);
97       xfree(af);
98     }
99   xfree(f);
100 }
101
102 struct fastbuf *
103 fbatomic_open(const char *name, struct fastbuf *master, uns bufsize, int record_len)
104 {
105   struct fb_atomic *F = xmalloc_zero(sizeof(*F));
106   struct fastbuf *f = &F->fb;
107   struct fb_atomic_file *af;
108   if (master)
109     {
110       af = FB_ATOMIC(master)->af;
111       af->use_count++;
112       ASSERT(af->record_len == record_len);
113     }
114   else
115     {
116       int fd = ucw_open(name, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND, 0666);
117       if (fd < 0)
118         trans_throw("ucw.fb.open", NULL, "Cannot create %s: %m", name);
119       af = xmalloc_zero(sizeof(*af) + strlen(name));
120       af->fd = fd;
121       af->use_count = 1;
122       af->record_len = record_len;
123       af->locked = (record_len < 0);
124       strcpy(af->name, name);
125     }
126   F->af = af;
127   if (record_len > 0 && bufsize % record_len)
128     bufsize += record_len - (bufsize % record_len);
129   f->buffer = xmalloc(bufsize);
130   f->bufend = f->buffer + bufsize;
131   F->slack_size = (record_len < 0) ? -record_len : 0;
132   ASSERT(bufsize > F->slack_size);
133   F->expected_max_bptr = f->bufend - F->slack_size;
134   f->bptr = f->bstop = f->buffer;
135   f->name = af->name;
136   f->spout = fbatomic_spout;
137   f->close = fbatomic_close;
138   return f;
139 }
140
141 #ifdef TEST
142
143 int main(int argc UNUSED, char **argv UNUSED)
144 {
145   struct fastbuf *f, *g;
146
147   // Always trace in the test
148   trace = 1;
149
150   msg(L_INFO, "Testing block writes");
151   f = fbatomic_open("test", NULL, 16, 4);
152   for (u32 i=0; i<17; i++)
153     bwrite(f, &i, 4);
154   bclose(f);
155
156   msg(L_INFO, "Testing interleaved var-size writes");
157   f = fbatomic_open("test2", NULL, 23, -5);
158   g = fbatomic_open("test2", f, 23, -5);
159   for (int i=0; i<100; i++)
160     {
161       struct fastbuf *x = (i%2) ? g : f;
162       bprintf(x, "%c<%d>\n", "fg"[i%2], ((259309*i) % 1000000) >> (i % 8));
163       fbatomic_commit(x);
164     }
165   bclose(f);
166   bclose(g);
167
168   return 0;
169 }
170
171 #endif