]> mj.ucw.cz Git - libucw.git/blob - lib/db.c
Simplified the asio module.
[libucw.git] / lib / db.c
1 /*
2  *      UCW Library -- Fast Database Management Routines
3  *
4  *      (c) 1999--2001 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 /*
11  *  This library uses the standard algorithm for external hashing (page directory
12  *  mapping topmost K bits of hash value to page address, directory splits and
13  *  so on). Peculiarities of this implementation (aka design decisions):
14  *
15  *   o  We allow both fixed and variable length keys and values (this includes
16  *      zero size values for cases you want to represent only a set of keys).
17  *   o  We assume that key_size + val_size < page_size.
18  *   o  We never shrink the directory nor free empty pages. (The reason is that
19  *      if the database was once large, it's likely it will again become large soon.)
20  *   o  The only pages which can be freed are those of the directory (during
21  *      directory split), so we keep only a simple 32-entry free block list
22  *      and we assume it's sorted.
23  *   o  All pointers are always given in pages from start of the file.
24  *      This gives us page_size*2^32 limit for file size which should be enough.
25  */
26
27 #include "lib/lib.h"
28 #include "lib/lfs.h"
29 #include "lib/pagecache.h"
30 #include "lib/db.h"
31 #include "lib/db_internal.h"
32
33 #include <stdio.h>
34 #include <string.h>
35 #include <fcntl.h>
36 #include <unistd.h>
37
38 #define GET_PAGE(d,x) pgc_get((d)->cache, (d)->fd, ((sh_off_t)(x)) << (d)->page_order)
39 #define GET_ZERO_PAGE(d,x) pgc_get_zero((d)->cache, (d)->fd, ((sh_off_t)(x)) << (d)->page_order)
40 #define READ_PAGE(d,x) pgc_read((d)->cache, (d)->fd, ((sh_off_t)(x)) << (d)->page_order)
41 #define READ_DIR(d,off) pgc_read((d)->cache, (d)->fd, (((sh_off_t)(d)->root->dir_start) << (d)->page_order) + (off))
42
43 struct sdbm *
44 sdbm_open(struct sdbm_options *o)
45 {
46   struct sdbm *d;
47   struct sdbm_root root, *r;
48   uns cache_size = o->cache_size ? o->cache_size : 16;
49
50   d = xmalloc_zero(sizeof(struct sdbm));
51   d->flags = o->flags;
52   d->fd = sh_open(o->name, ((d->flags & SDBM_WRITE) ? O_RDWR : O_RDONLY), 0666);
53   if (d->fd >= 0)                       /* Already exists, let's check it */
54     {
55       if (read(d->fd, &root, sizeof(root)) != sizeof(root))
56         goto bad;
57       if (root.magic != SDBM_MAGIC || root.version != SDBM_VERSION)
58         goto bad;
59       d->file_size = sh_seek(d->fd, 0, SEEK_END) >> root.page_order;
60       d->page_order = root.page_order;
61       d->page_size = 1 << root.page_order;
62       d->cache = pgc_open(d->page_size, cache_size);
63       d->root_page = pgc_read(d->cache, d->fd, 0);
64       d->root = (void *) d->root_page->data;
65     }
66   else if ((d->flags & SDBM_CREAT) && (d->fd = sh_open(o->name, O_RDWR | O_CREAT, 0666)) >= 0)
67     {
68       struct page *q;
69       uns page_order = o->page_order;
70       if (page_order < 10)
71         page_order = 10;
72       d->page_size = 1 << page_order;
73       d->cache = pgc_open(d->page_size, cache_size);
74       d->root_page = GET_ZERO_PAGE(d, 0);
75       r = d->root = (void *) d->root_page->data;                 /* Build root page */
76       r->magic = SDBM_MAGIC;
77       r->version = SDBM_VERSION;
78       r->page_order = d->page_order = page_order;
79       r->key_size = o->key_size;
80       r->val_size = o->val_size;
81       r->dir_start = 1;
82       r->dir_order = 0;
83       d->file_size = 3;
84       q = GET_ZERO_PAGE(d, 1);                                  /* Build page directory */
85       GET32(q->data, 0) = 2;
86       pgc_put(d->cache, q);
87       q = GET_ZERO_PAGE(d, 2);                                  /* Build single data page */
88       pgc_put(d->cache, q);
89     }
90   else
91     goto bad;
92   d->dir_size = 1 << d->root->dir_order;
93   d->dir_shift = 32 - d->root->dir_order;
94   d->page_mask = d->page_size - 1;
95   d->key_size = d->root->key_size;
96   d->val_size = d->root->val_size;
97   return d;
98
99 bad:
100   sdbm_close(d);
101   return NULL;
102 }
103
104 void
105 sdbm_close(struct sdbm *d)
106 {
107   if (d->root_page)
108     pgc_put(d->cache, d->root_page);
109   if (d->cache)
110     pgc_close(d->cache);
111   if (d->fd >= 0)
112     close(d->fd);
113   xfree(d);
114 }
115
116 static uns
117 sdbm_alloc_pages(struct sdbm *d, uns number)
118 {
119   uns where = d->file_size;
120   if (where + number < where)   /* Wrap around? */
121     die("SDB: Database file too large, giving up");
122   d->file_size += number;
123   return where;
124 }
125
126 static uns
127 sdbm_alloc_page(struct sdbm *d)
128 {
129   uns pos;
130
131   if (!d->root->free_pool[0].count)
132     return sdbm_alloc_pages(d, 1);
133   pos = d->root->free_pool[0].first;
134   d->root->free_pool[0].first++;
135   if (!--d->root->free_pool[0].count)
136     {
137       memmove(d->root->free_pool, d->root->free_pool+1, (SDBM_NUM_FREE_PAGE_POOLS-1) * sizeof(d->root->free_pool[0]));
138       d->root->free_pool[SDBM_NUM_FREE_PAGE_POOLS-1].count = 0;
139     }
140   pgc_mark_dirty(d->cache, d->root_page);
141   return pos;
142 }
143
144 static void
145 sdbm_free_pages(struct sdbm *d, uns start, uns number)
146 {
147   uns i = 0;
148
149   while (d->root->free_pool[i].count)
150     i++;
151   ASSERT(i < SDBM_NUM_FREE_PAGE_POOLS);
152   d->root->free_pool[i].first = start;
153   d->root->free_pool[i].count = number;
154   pgc_mark_dirty(d->cache, d->root_page);
155 }
156
157 u32
158 sdbm_hash(byte *key, uns keylen)
159 {
160   /*
161    *  This used to be the same hash function as GDBM uses,
162    *  but it turned out that it tends to give the same results
163    *  on similar keys. Damn it.
164    */
165   u32 value = 0x238F13AF * keylen;
166   while (keylen--)
167     value = 37*value + *key++;
168   return (1103515243 * value + 12345);
169 }
170
171 static int
172 sdbm_get_entry(struct sdbm *d, byte *pos, byte **key, uns *keylen, byte **val, uns *vallen)
173 {
174   byte *p = pos;
175
176   if (d->key_size >= 0)
177     *keylen = d->key_size;
178   else
179     {
180       *keylen = (p[0] << 8) | p[1];
181       p += 2;
182     }
183   *key = p;
184   p += *keylen;
185   if (d->val_size >= 0)
186     *vallen = d->val_size;
187   else
188     {
189       *vallen = (p[0] << 8) | p[1];
190       p += 2;
191     }
192   *val = p;
193   p += *vallen;
194   return p - pos;
195 }
196
197 static int
198 sdbm_entry_len(struct sdbm *d, uns keylen, uns vallen)
199 {
200   uns len = keylen + vallen;
201   if (d->key_size < 0)
202     len += 2;
203   if (d->val_size < 0)
204     len += 2;
205   return len;
206 }
207
208 static void
209 sdbm_store_entry(struct sdbm *d, byte *pos, byte *key, uns keylen, byte *val, uns vallen)
210 {
211   if (d->key_size < 0)
212     {
213       *pos++ = keylen >> 8;
214       *pos++ = keylen;
215     }
216   memmove(pos, key, keylen);
217   pos += keylen;
218   if (d->val_size < 0)
219     {
220       *pos++ = vallen >> 8;
221       *pos++ = vallen;
222     }
223   memmove(pos, val, vallen);
224 }
225
226 static uns
227 sdbm_page_rank(struct sdbm *d, uns dirpos)
228 {
229   struct page *b;
230   u32 pg, x;
231   uns l, r;
232   uns pm = d->page_mask;
233
234   b = READ_DIR(d, dirpos & ~pm);
235   pg = GET32(b->data, dirpos & pm);
236   l = dirpos;
237   while ((l & pm) && GET32(b->data, (l - 4) & pm) == pg)
238     l -= 4;
239   r = dirpos + 4;
240   /* We heavily depend on unused directory entries being zero */
241   while ((r & pm) && GET32(b->data, r & pm) == pg)
242     r += 4;
243   pgc_put(d->cache, b);
244
245   if (!(l & pm) && !(r & pm))
246     {
247       /* Note that if it spans page boundary, it must contain an integer number of pages */
248       while (l)
249         {
250           b = READ_DIR(d, (l - 4) & ~pm);
251           x = GET32(b->data, 0);
252           pgc_put(d->cache, b);
253           if (x != pg)
254             break;
255           l -= d->page_size;
256         }
257       while (r < 4*d->dir_size)
258         {
259           b = READ_DIR(d, r & ~pm);
260           x = GET32(b->data, 0);
261           pgc_put(d->cache, b);
262           if (x != pg)
263             break;
264           r += d->page_size;
265         }
266     }
267   return (r - l) >> 2;
268 }
269
270 static void
271 sdbm_expand_directory(struct sdbm *d)
272 {
273   struct page *b, *c;
274   int i, ent;
275   u32 *dir, *t;
276
277   if (d->root->dir_order >= 31)
278     die("SDB: Database directory too large, giving up");
279
280   if (4*d->dir_size < d->page_size)
281     {
282       /* It still fits within single page */
283       b = READ_DIR(d, 0);
284       dir = (u32 *) b->data;
285       for(i=d->dir_size-1; i>=0; i--)
286         dir[2*i] = dir[2*i+1] = dir[i];
287       pgc_mark_dirty(d->cache, b);
288       pgc_put(d->cache, b);
289     }
290   else
291     {
292       uns old_dir = d->root->dir_start;
293       uns old_dir_pages = 1 << (d->root->dir_order + 2 - d->page_order);
294       uns page, new_dir;
295       new_dir = d->root->dir_start = sdbm_alloc_pages(d, 2*old_dir_pages);
296       ent = 1 << (d->page_order - 3);
297       for(page=0; page < old_dir_pages; page++)
298         {
299           b = READ_PAGE(d, old_dir + page);
300           dir = (u32 *) b->data;
301           c = GET_PAGE(d, new_dir + 2*page);
302           t = (u32 *) c->data;
303           for(i=0; i<ent; i++)
304             t[2*i] = t[2*i+1] = dir[i];
305           pgc_put(d->cache, c);
306           c = GET_PAGE(d, new_dir + 2*page + 1);
307           t = (u32 *) c->data;
308           for(i=0; i<ent; i++)
309             t[2*i] = t[2*i+1] = dir[ent+i];
310           pgc_put(d->cache, c);
311           pgc_put(d->cache, b);
312         }
313       if (!(d->flags & SDBM_FAST))
314         {
315           /*
316            *  Unless in super-fast mode, fill old directory pages with zeroes.
317            *  This slows us down a bit, but allows database reconstruction after
318            *  the free list is lost.
319            */
320           for(page=0; page < old_dir_pages; page++)
321             {
322               b = GET_ZERO_PAGE(d, old_dir + page);
323               pgc_put(d->cache, b);
324             }
325         }
326       sdbm_free_pages(d, old_dir, old_dir_pages);
327     }
328
329   d->root->dir_order++;
330   d->dir_size = 1 << d->root->dir_order;
331   d->dir_shift = 32 - d->root->dir_order;
332   pgc_mark_dirty(d->cache, d->root_page);
333   if (!(d->flags & SDBM_FAST))
334     sdbm_sync(d);
335 }
336
337 static void
338 sdbm_split_data(struct sdbm *d, struct sdbm_bucket *s, struct sdbm_bucket *d0, struct sdbm_bucket *d1, uns sigbit)
339 {
340   byte *sp = s->data;
341   byte *dp[2] = { d0->data, d1->data };
342   byte *K, *D;
343   uns Kl, Dl, sz, i;
344
345   while (sp < s->data + s->used)
346     {
347       sz = sdbm_get_entry(d, sp, &K, &Kl, &D, &Dl);
348       sp += sz;
349       i = (sdbm_hash(K, Kl) & (1 << sigbit)) ? 1 : 0;
350       sdbm_store_entry(d, dp[i], K, Kl, D, Dl);
351       dp[i] += sz;
352     }
353   d0->used = dp[0] - d0->data;
354   d1->used = dp[1] - d1->data;
355 }
356
357 static void
358 sdbm_split_dir(struct sdbm *d, uns dirpos, uns count, uns pos)
359 {
360   struct page *b;
361   uns i;
362
363   count *= 4;
364   while (count)
365     {
366       b = READ_DIR(d, dirpos & ~d->page_mask);
367       i = d->page_size - (dirpos & d->page_mask);
368       if (i > count)
369         i = count;
370       count -= i;
371       while (i)
372         {
373           GET32(b->data, dirpos & d->page_mask) = pos;
374           dirpos += 4;
375           i -= 4;
376         }
377       pgc_mark_dirty(d->cache, b);
378       pgc_put(d->cache, b);
379     }
380 }
381
382 static inline uns
383 sdbm_dirpos(struct sdbm *d, uns hash)
384 {
385   if (d->dir_shift != 32)               /* avoid shifting by 32 bits */
386     return (hash >> d->dir_shift) << 2; /* offset in the directory */
387   else
388     return 0;
389 }
390
391 static struct page *
392 sdbm_split_page(struct sdbm *d, struct page *b, u32 hash)
393 {
394   struct page *p[2];
395   uns i, rank, sigbit, rank_log, dirpos, newpg;
396
397   dirpos = sdbm_dirpos(d, hash);
398   rank = sdbm_page_rank(d, dirpos);     /* rank = # of pointers to this page */
399   if (rank == 1)
400     {
401       sdbm_expand_directory(d);
402       rank = 2;
403       dirpos *= 2;
404     }
405   rank_log = 1;                         /* rank_log = log2(rank) */
406   while ((1U << rank_log) < rank)
407     rank_log++;
408   sigbit = d->dir_shift + rank_log - 1; /* sigbit = bit we split on */
409   p[0] = b;
410   newpg = sdbm_alloc_page(d);
411   p[1] = GET_PAGE(d, newpg);
412   sdbm_split_data(d, (void *) b->data, (void *) p[0]->data, (void *) p[1]->data, sigbit);
413   sdbm_split_dir(d, (dirpos & ~(4*rank - 1))+2*rank, rank/2, newpg);
414   pgc_mark_dirty(d->cache, p[0]);
415   i = (hash & (1 << sigbit)) ? 1 : 0;
416   pgc_put(d->cache, p[!i]);
417   return p[i];
418 }
419
420 static int
421 sdbm_put_user(byte *D, uns Dl, byte *val, uns *vallen)
422 {
423   if (vallen)
424     {
425       if (*vallen < Dl)
426         return 1;
427       *vallen = Dl;
428     }
429   if (val)
430     memcpy(val, D, Dl);
431   return 0;
432 }
433
434 static int
435 sdbm_access(struct sdbm *d, byte *key, uns keylen, byte *val, uns *vallen, uns mode)    /* 0=read, 1=store, 2=replace */
436 {
437   struct page *p, *q;
438   u32 hash, h, pos, size;
439   struct sdbm_bucket *b;
440   byte *c, *e;
441   int rc;
442
443   if ((d->key_size >= 0 && keylen != (uns) d->key_size) || keylen > 65535)
444     return SDBM_ERROR_BAD_KEY_SIZE;
445   if (val && ((d->val_size >= 0 && *vallen != (uns) d->val_size) || *vallen >= 65535) && mode)
446     return SDBM_ERROR_BAD_VAL_SIZE;
447   if (!mode && !(d->flags & SDBM_WRITE))
448     return SDBM_ERROR_READ_ONLY;
449   hash = sdbm_hash(key, keylen);
450   h = sdbm_dirpos(d, hash);
451   p = READ_DIR(d, h & ~d->page_mask);
452   pos = GET32(p->data, h & d->page_mask);
453   pgc_put(d->cache, p);
454   q = READ_PAGE(d, pos);
455   b = (void *) q->data;
456   c = b->data;
457   e = c + b->used;
458   while (c < e)
459     {
460       byte *K, *D;
461       uns Kl, Dl, s;
462       s = sdbm_get_entry(d, c, &K, &Kl, &D, &Dl);
463       if (Kl == keylen && !memcmp(K, key, Kl))
464         {
465           /* Gotcha! */
466           switch (mode)
467             {
468             case 0:                     /* fetch: found */
469               rc = sdbm_put_user(D, Dl, val, vallen);
470               pgc_put(d->cache, q);
471               return rc ? SDBM_ERROR_TOO_LARGE : 1;
472             case 1:                     /* store: already present */
473               pgc_put(d->cache, q);
474               return 0;
475             default:                    /* replace: delete the old one */
476               memmove(c, c+s, e-(c+s));
477               b->used -= s;
478               goto insert;
479             }
480         }
481       c += s;
482     }
483   if (!mode || !val)            /* fetch or delete: no success */
484     {
485       pgc_put(d->cache, q);
486       return 0;
487     }
488
489 insert:
490   if (val)
491     {
492       size = sdbm_entry_len(d, keylen, *vallen);
493       while (b->used + size > d->page_size - sizeof(struct sdbm_bucket))
494         {
495           /* Page overflow, need to split */
496           if (size >= d->page_size - sizeof(struct sdbm_bucket))
497             {
498               pgc_put(d->cache, q);
499               return SDBM_ERROR_GIANT;
500             }
501           q = sdbm_split_page(d, q, hash);
502           b = (void *) q->data;
503         }
504       sdbm_store_entry(d, b->data + b->used, key, keylen, val, *vallen);
505       b->used += size;
506     }
507   pgc_mark_dirty(d->cache, q);
508   pgc_put(d->cache, q);
509   if (d->flags & SDBM_SYNC)
510     sdbm_sync(d);
511   return 1;
512 }
513
514 int
515 sdbm_store(struct sdbm *d, byte *key, uns keylen, byte *val, uns vallen)
516 {
517   return sdbm_access(d, key, keylen, val, &vallen, 1);
518 }
519
520 int
521 sdbm_replace(struct sdbm *d, byte *key, uns keylen, byte *val, uns vallen)
522 {
523   return sdbm_access(d, key, keylen, val, &vallen, 2);
524 }
525
526 int
527 sdbm_delete(struct sdbm *d, byte *key, uns keylen)
528 {
529   return sdbm_access(d, key, keylen, NULL, NULL, 2);
530 }
531
532 int
533 sdbm_fetch(struct sdbm *d, byte *key, uns keylen, byte *val, uns *vallen)
534 {
535   return sdbm_access(d, key, keylen, val, vallen, 0);
536 }
537
538 void
539 sdbm_rewind(struct sdbm *d)
540 {
541   d->find_page = 1;
542   d->find_pos = 0;
543   d->find_free_list = 0;
544 }
545
546 int
547 sdbm_get_next(struct sdbm *d, byte *key, uns *keylen, byte *val, uns *vallen)
548 {
549   uns page = d->find_page;
550   uns pos = d->find_pos;
551   byte *K, *V;
552   uns c, Kl, Vl;
553   struct page *p;
554   struct sdbm_bucket *b;
555
556   for(;;)
557     {
558       if (!pos)
559         {
560           if (page >= d->file_size)
561             break;
562           if (page == d->root->dir_start)
563             page += (4*d->dir_size + d->page_size - 1) >> d->page_order;
564           else if (page == d->root->free_pool[d->find_free_list].first)
565             page += d->root->free_pool[d->find_free_list++].count;
566           else
567             pos = 4;
568           continue;
569         }
570       p = READ_PAGE(d, page);
571       b = (void *) p->data;
572       if (pos - 4 >= b->used)
573         {
574           pos = 0;
575           page++;
576           pgc_put(d->cache, p);
577           continue;
578         }
579       c = sdbm_get_entry(d, p->data + pos, &K, &Kl, &V, &Vl);
580       d->find_page = page;
581       d->find_pos = pos + c;
582       c = sdbm_put_user(K, Kl, key, keylen) ||
583           sdbm_put_user(V, Vl, val, vallen);
584       pgc_put(d->cache, p);
585       return c ? SDBM_ERROR_TOO_LARGE : 1;
586     }
587   d->find_page = page;
588   d->find_pos = pos;
589   return 0;
590 }
591
592 void
593 sdbm_sync(struct sdbm *d)
594 {
595   pgc_flush(d->cache);
596   if (d->flags & SDBM_FSYNC)
597     fsync(d->fd);
598 }