]> mj.ucw.cz Git - libucw.git/blob - lib/sorter/govern.c
f962810b3ba0ce72a47d8d63c4f3f8b4b54329eb
[libucw.git] / lib / sorter / govern.c
1 /*
2  *      UCW Library -- Universal Sorter: Governing Routines
3  *
4  *      (c) 2007 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/fastbuf.h"
12 #include "lib/mempool.h"
13 #include "lib/sorter/common.h"
14
15 void *
16 sorter_alloc(struct sort_context *ctx, uns size)
17 {
18   return mp_alloc_zero(ctx->pool, size);
19 }
20
21 struct sort_bucket *
22 sbuck_new(struct sort_context *ctx)
23 {
24   return sorter_alloc(ctx, sizeof(struct sort_bucket));
25 }
26
27 void
28 sbuck_drop(struct sort_bucket *b)
29 {
30   if (b)
31     {
32       ASSERT(!(b->flags & SBF_DESTROYED));
33       if (b->n.prev)
34         clist_remove(&b->n);
35       bclose(b->fb);
36       bzero(b, sizeof(*b));
37       b->flags = SBF_DESTROYED;
38     }
39 }
40
41 sh_off_t
42 sbuck_size(struct sort_bucket *b)
43 {
44   if (b->flags & SBF_OPEN_WRITE)
45     return btell(b->fb);
46   else
47     return b->size;
48 }
49
50 int
51 sbuck_have(struct sort_bucket *b)
52 {
53   return b && sbuck_size(b);
54 }
55
56 struct fastbuf *
57 sbuck_read(struct sort_bucket *b)
58 {
59   /* FIXME: These functions should handle buckets with no fb and only name. */
60   ASSERT(b->fb);
61   if (b->flags & SBF_OPEN_READ)
62     return b->fb;
63   else if (b->flags & SBF_OPEN_WRITE)
64     {
65       b->size = btell(b->fb);
66       b->flags = (b->flags & ~SBF_OPEN_WRITE) | SBF_OPEN_READ;
67       brewind(b->fb);
68       return b->fb;
69     }
70   else
71     ASSERT(0);
72 }
73
74 struct fastbuf *
75 sbuck_write(struct sort_bucket *b)
76 {
77   if (b->flags & SBF_OPEN_WRITE)
78     ASSERT(b->fb);
79   else
80     {
81       ASSERT(!(b->flags & (SBF_OPEN_READ | SBF_DESTROYED)));
82       b->fb = bopen_tmp(sorter_stream_bufsize);
83       b->flags |= SBF_OPEN_WRITE;
84     }
85   return b->fb;
86 }
87
88 void
89 sorter_alloc_buf(struct sort_context *ctx)
90 {
91   if (ctx->big_buf)
92     return;
93   u64 bs = MAX(sorter_bufsize/2, 1);
94   bs = ALIGN_TO(bs, (u64)CPU_PAGE_SIZE);
95   ctx->big_buf = big_alloc(2*bs);
96   ctx->big_buf_size = 2*bs;
97   ctx->big_buf_half = ((byte*) ctx->big_buf) + bs;
98   ctx->big_buf_half_size = bs;
99   SORT_XTRACE("Allocated sorting buffer (%jd bytes)", (uintmax_t) bs);
100 }
101
102 void
103 sorter_free_buf(struct sort_context *ctx)
104 {
105   if (!ctx->big_buf)
106     return;
107   big_free(ctx->big_buf, ctx->big_buf_size);
108   ctx->big_buf = NULL;
109   SORT_XTRACE("Freed sorting buffer");
110 }
111
112 static int sorter_presort(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only)
113 {
114   /* FIXME: Mode with no presorting (mostly for debugging) */
115   sorter_alloc_buf(ctx);
116   if (in->flags & SBF_CUSTOM_PRESORT)
117     {
118       struct fastbuf *f = sbuck_write(out);
119       return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size);   // FIXME: out_only optimization?
120     }
121   return ctx->internal_sort(ctx, in, out, out_only);
122 }
123
124 static inline struct sort_bucket *
125 sbuck_join_to(struct sort_bucket *b)
126 {
127   struct sort_bucket *out = (struct sort_bucket *) b->n.prev;   // Such bucket is guaranteed to exist
128   return (out->flags & SBF_FINAL) ? out : NULL;
129 }
130
131 static void
132 sorter_join(struct sort_bucket *b)
133 {
134   struct sort_bucket *join = sbuck_join_to(b);
135   ASSERT(join);
136
137   // FIXME: What if the final bucket doesn't contain any file yet?
138
139   SORT_TRACE("Copying %jd bytes to output file", (uintmax_t) sbuck_size(b));
140   struct fastbuf *src = sbuck_read(b);
141   struct fastbuf *dest = sbuck_write(join);
142   bbcopy(src, dest, ~0U);
143   sbuck_drop(b);
144 }
145
146 static void
147 sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
148 {
149   struct sort_bucket *ins[3] = { NULL }, *outs[3] = { NULL };
150   cnode *list_pos = b->n.prev;
151   struct sort_bucket *join = sbuck_join_to(b);
152   if (sorter_debug & SORT_DEBUG_NO_JOIN)
153     join = NULL;
154
155   if (!(sorter_debug & SORT_DEBUG_NO_PRESORT) || (b->flags & SBF_CUSTOM_PRESORT))
156     {
157       SORT_TRACE("Presorting");
158       ins[0] = sbuck_new(ctx);
159       sbuck_read(b);
160       if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
161         {
162           if (join)
163             sbuck_drop(ins[0]);
164           else
165             clist_insert_after(&ins[0]->n, list_pos);
166           sbuck_drop(b);
167           return;
168         }
169
170       ins[1] = sbuck_new(ctx);
171       int i = 1;
172       while (sorter_presort(ctx, b, ins[i], ins[i]))
173         i = 1-i;
174       sbuck_drop(b);
175     }
176   else
177     {
178       SORT_TRACE("Skipped presorting");
179       ins[0] = b;
180     }
181
182   SORT_TRACE("Main sorting");
183   do {
184     if (ins[0]->runs == 1 && ins[1]->runs == 1 && join)
185       {
186         // This is guaranteed to produce a single run, so join if possible
187         outs[0] = join;
188         outs[1] = NULL;
189         ctx->twoway_merge(ctx, ins, outs);
190         ASSERT(outs[0]->runs == 2);
191         outs[0]->runs--;
192         SORT_TRACE("Pass done (joined final run)");
193         sbuck_drop(ins[0]);
194         sbuck_drop(ins[1]);
195         return;
196       }
197     outs[0] = sbuck_new(ctx);
198     outs[1] = sbuck_new(ctx);
199     outs[2] = NULL;
200     ctx->twoway_merge(ctx, ins, outs);
201     SORT_TRACE("Pass done (%d+%d runs, %jd+%jd bytes)", outs[0]->runs, outs[1]->runs, (uintmax_t) sbuck_size(outs[0]), (uintmax_t) sbuck_size(outs[1]));
202     sbuck_drop(ins[0]);
203     sbuck_drop(ins[1]);
204     memcpy(ins, outs, 3*sizeof(struct sort_bucket *));
205   } while (sbuck_have(ins[1]));
206
207   sbuck_drop(ins[1]);
208   clist_insert_after(&ins[0]->n, list_pos);
209 }
210
211 void
212 sorter_run(struct sort_context *ctx)
213 {
214   ctx->pool = mp_new(4096);
215   clist_init(&ctx->bucket_list);
216
217   /* FIXME: There should be a way how to detect size of the input file */
218   /* FIXME: Remember to test sorting of empty files */
219
220   // Create bucket containing the source
221   struct sort_bucket *bin = sbuck_new(ctx);
222   bin->flags = SBF_SOURCE | SBF_OPEN_READ;
223   if (ctx->custom_presort)
224     bin->flags |= SBF_CUSTOM_PRESORT;
225   else
226     bin->fb = ctx->in_fb;
227   bin->ident = "in";
228   bin->size = ~(u64)0;
229   bin->hash_bits = ctx->hash_bits;
230   clist_add_tail(&ctx->bucket_list, &bin->n);
231
232   // Create bucket for the output
233   struct sort_bucket *bout = sbuck_new(ctx);
234   bout->flags = SBF_FINAL;
235   bout->fb = ctx->out_fb;
236   bout->ident = "out";
237   bout->runs = 1;
238   clist_add_head(&ctx->bucket_list, &bout->n);
239
240   struct sort_bucket *b;
241   while (b = clist_next(&ctx->bucket_list, &bout->n))
242     {
243       if (!sbuck_have(b))
244         sbuck_drop(b);
245       else if (b->runs == 1)
246         sorter_join(b);
247       else
248         sorter_twoway(ctx, b);
249     }
250
251   sorter_free_buf(ctx);
252   SORT_XTRACE("Final size: %jd", (uintmax_t) sbuck_size(bout));
253   ctx->out_fb = sbuck_read(bout);
254 }