]> mj.ucw.cz Git - libucw.git/blob - lib/sorter/govern.c
d325a0298ad06c82ea3a1049432dd859d9a43dad
[libucw.git] / lib / sorter / govern.c
1 /*
2  *      UCW Library -- Universal Sorter: Governing Routines
3  *
4  *      (c) 2007 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/fastbuf.h"
12 #include "lib/mempool.h"
13 #include "lib/sorter/common.h"
14
15 #include <fcntl.h>
16
17 void *
18 sorter_alloc(struct sort_context *ctx, uns size)
19 {
20   return mp_alloc_zero(ctx->pool, size);
21 }
22
23 struct sort_bucket *
24 sbuck_new(struct sort_context *ctx)
25 {
26   struct sort_bucket *b = sorter_alloc(ctx, sizeof(struct sort_bucket));
27   b->ctx = ctx;
28   return b;
29 }
30
31 void
32 sbuck_drop(struct sort_bucket *b)
33 {
34   if (b)
35     {
36       ASSERT(!(b->flags & SBF_DESTROYED));
37       if (b->n.prev)
38         clist_remove(&b->n);
39       bclose(b->fb);
40       bzero(b, sizeof(*b));
41       b->flags = SBF_DESTROYED;
42     }
43 }
44
45 sh_off_t
46 sbuck_size(struct sort_bucket *b)
47 {
48   if ((b->flags & SBF_OPEN_WRITE) && !(b->flags & SBF_SWAPPED_OUT))
49     return btell(b->fb);
50   else
51     return b->size;
52 }
53
54 int
55 sbuck_have(struct sort_bucket *b)
56 {
57   return b && sbuck_size(b);
58 }
59
60 static int
61 sbuck_has_file(struct sort_bucket *b)
62 {
63   return (b->fb || (b->flags & SBF_SWAPPED_OUT));
64 }
65
66 static void
67 sbuck_swap_in(struct sort_bucket *b)
68 {
69   if (b->flags & SBF_SWAPPED_OUT)
70     {
71       b->fb = bopen(b->filename, O_RDWR, sorter_stream_bufsize);
72       if (b->flags & SBF_OPEN_WRITE)
73         bseek(b->fb, 0, SEEK_END);
74       bconfig(b->fb, BCONFIG_IS_TEMP_FILE, 1);
75       b->flags &= ~SBF_SWAPPED_OUT;
76       SORT_XTRACE("Swapped in %s", b->filename);
77     }
78 }
79
80 struct fastbuf *
81 sbuck_read(struct sort_bucket *b)
82 {
83   sbuck_swap_in(b);
84   if (b->flags & SBF_OPEN_READ)
85     return b->fb;
86   else if (b->flags & SBF_OPEN_WRITE)
87     {
88       b->size = btell(b->fb);
89       b->flags = (b->flags & ~SBF_OPEN_WRITE) | SBF_OPEN_READ;
90       brewind(b->fb);
91       return b->fb;
92     }
93   else
94     ASSERT(0);
95 }
96
97 struct fastbuf *
98 sbuck_write(struct sort_bucket *b)
99 {
100   sbuck_swap_in(b);
101   if (b->flags & SBF_OPEN_WRITE)
102     ASSERT(b->fb);
103   else
104     {
105       ASSERT(!(b->flags & (SBF_OPEN_READ | SBF_DESTROYED)));
106       b->fb = bopen_tmp(sorter_stream_bufsize);
107       b->flags |= SBF_OPEN_WRITE;
108       b->filename = mp_strdup(b->ctx->pool, b->fb->name);
109     }
110   return b->fb;
111 }
112
113 void
114 sbuck_swap_out(struct sort_bucket *b)
115 {
116   if ((b->flags & (SBF_OPEN_READ | SBF_OPEN_WRITE)) && b->fb)
117     {
118       if (b->flags & SBF_OPEN_WRITE)
119         b->size = btell(b->fb);
120       bconfig(b->fb, BCONFIG_IS_TEMP_FILE, 0);
121       bclose(b->fb);
122       b->fb = NULL;
123       b->flags |= SBF_SWAPPED_OUT;
124       SORT_XTRACE("Swapped out %s", b->filename);
125     }
126 }
127
128 void
129 sorter_alloc_buf(struct sort_context *ctx)
130 {
131   if (ctx->big_buf)
132     return;
133   u64 bs = MAX(sorter_bufsize/2, 1);
134   bs = ALIGN_TO(bs, (u64)CPU_PAGE_SIZE);
135   ctx->big_buf = big_alloc(2*bs);
136   ctx->big_buf_size = 2*bs;
137   ctx->big_buf_half = ((byte*) ctx->big_buf) + bs;
138   ctx->big_buf_half_size = bs;
139   SORT_XTRACE("Allocated sorting buffer (%jd bytes)", (uintmax_t) bs);
140 }
141
142 void
143 sorter_free_buf(struct sort_context *ctx)
144 {
145   if (!ctx->big_buf)
146     return;
147   big_free(ctx->big_buf, ctx->big_buf_size);
148   ctx->big_buf = NULL;
149   SORT_XTRACE("Freed sorting buffer");
150 }
151
152 static int sorter_presort(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only)
153 {
154   sorter_alloc_buf(ctx);
155   if (in->flags & SBF_CUSTOM_PRESORT)
156     {
157       struct fastbuf *f = sbuck_write(out);
158       return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size);   // FIXME: out_only optimization?
159     }
160   return ctx->internal_sort(ctx, in, out, out_only);
161 }
162
163 static inline struct sort_bucket *
164 sbuck_join_to(struct sort_bucket *b)
165 {
166   if (sorter_debug & SORT_DEBUG_NO_JOIN)
167     return NULL;
168
169   struct sort_bucket *out = (struct sort_bucket *) b->n.prev;   // Such bucket is guaranteed to exist
170   return (out->flags & SBF_FINAL) ? out : NULL;
171 }
172
173 static void
174 sorter_join(struct sort_bucket *b)
175 {
176   struct sort_bucket *join = (struct sort_bucket *) b->n.prev;
177   ASSERT(join->flags & SBF_FINAL);
178   ASSERT(b->runs == 1);
179
180   if (!sbuck_has_file(join))
181     {
182       // The final bucket doesn't have any file associated yet, so replace
183       // it with the new bucket.
184       SORT_XTRACE("Replaced final bucket");
185       b->flags |= SBF_FINAL;
186       sbuck_drop(join);
187     }
188   else
189     {
190       SORT_TRACE("Copying %jd bytes to output file", (uintmax_t) sbuck_size(b));
191       struct fastbuf *src = sbuck_read(b);
192       struct fastbuf *dest = sbuck_write(join);
193       bbcopy(src, dest, ~0U);
194       sbuck_drop(b);
195     }
196 }
197
198 static void
199 sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
200 {
201   struct sort_bucket *ins[3] = { NULL }, *outs[3] = { NULL };
202   cnode *list_pos = b->n.prev;
203   struct sort_bucket *join = sbuck_join_to(b);
204
205   if (!(sorter_debug & SORT_DEBUG_NO_PRESORT) || (b->flags & SBF_CUSTOM_PRESORT))
206     {
207       SORT_TRACE("Presorting");
208       ins[0] = sbuck_new(ctx);
209       if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
210         {
211           SORT_XTRACE("Sorted in memory");
212           if (join)
213             sbuck_drop(ins[0]);
214           else
215             clist_insert_after(&ins[0]->n, list_pos);
216           sbuck_drop(b);
217           return;
218         }
219
220       ins[1] = sbuck_new(ctx);
221       int i = 1;
222       while (sorter_presort(ctx, b, ins[i], ins[i]))
223         i = 1-i;
224       sbuck_drop(b);
225     }
226   else
227     {
228       SORT_TRACE("Skipped presorting");
229       ins[0] = b;
230     }
231
232   SORT_TRACE("Main sorting");
233   do {
234     if (ins[0]->runs == 1 && ins[1]->runs == 1 && join)
235       {
236         // This is guaranteed to produce a single run, so join if possible
237         outs[0] = join;
238         outs[1] = NULL;
239         ctx->twoway_merge(ctx, ins, outs);
240         ASSERT(outs[0]->runs == 2);
241         outs[0]->runs--;
242         SORT_TRACE("Pass done (joined final run)");
243         sbuck_drop(ins[0]);
244         sbuck_drop(ins[1]);
245         return;
246       }
247     outs[0] = sbuck_new(ctx);
248     outs[1] = sbuck_new(ctx);
249     outs[2] = NULL;
250     ctx->twoway_merge(ctx, ins, outs);
251     SORT_TRACE("Pass done (%d+%d runs, %jd+%jd bytes)", outs[0]->runs, outs[1]->runs, (uintmax_t) sbuck_size(outs[0]), (uintmax_t) sbuck_size(outs[1]));
252     sbuck_drop(ins[0]);
253     sbuck_drop(ins[1]);
254     memcpy(ins, outs, 3*sizeof(struct sort_bucket *));
255   } while (sbuck_have(ins[1]));
256
257   sbuck_drop(ins[1]);
258   clist_insert_after(&ins[0]->n, list_pos);
259 }
260
261 void
262 sorter_run(struct sort_context *ctx)
263 {
264   ctx->pool = mp_new(4096);
265   clist_init(&ctx->bucket_list);
266
267   /* FIXME: There should be a way how to detect size of the input file */
268   /* FIXME: Remember to test sorting of empty files */
269
270   // Create bucket containing the source
271   struct sort_bucket *bin = sbuck_new(ctx);
272   bin->flags = SBF_SOURCE | SBF_OPEN_READ;
273   if (ctx->custom_presort)
274     bin->flags |= SBF_CUSTOM_PRESORT;
275   else
276     bin->fb = ctx->in_fb;
277   bin->ident = "in";
278   bin->size = ~(u64)0;
279   bin->hash_bits = ctx->hash_bits;
280   clist_add_tail(&ctx->bucket_list, &bin->n);
281
282   // Create bucket for the output
283   struct sort_bucket *bout = sbuck_new(ctx);
284   bout->flags = SBF_FINAL;
285   bout->fb = ctx->out_fb;
286   bout->ident = "out";
287   bout->runs = 1;
288   clist_add_head(&ctx->bucket_list, &bout->n);
289
290   struct sort_bucket *b;
291   while (bout = clist_head(&ctx->bucket_list), b = clist_next(&ctx->bucket_list, &bout->n))
292     {
293       if (!sbuck_have(b))
294         sbuck_drop(b);
295       else if (b->runs == 1)
296         sorter_join(b);
297       else
298         sorter_twoway(ctx, b);
299     }
300
301   sorter_free_buf(ctx);
302   sbuck_write(bout);            // Force empty bucket to a file
303   SORT_XTRACE("Final size: %jd", (uintmax_t) sbuck_size(bout));
304   ctx->out_fb = sbuck_read(bout);
305 }