]> mj.ucw.cz Git - libucw.git/blob - lib/sorter/govern.c
d8006e423ea7118f4d46a70d358517133a56a3c1
[libucw.git] / lib / sorter / govern.c
1 /*
2  *      UCW Library -- Universal Sorter: Governing Routines
3  *
4  *      (c) 2007 Martin Mares <mj@ucw.cz>
5  *
6  *      This software may be freely distributed and used according to the terms
7  *      of the GNU Lesser General Public License.
8  */
9
10 #include "lib/lib.h"
11 #include "lib/fastbuf.h"
12 #include "lib/mempool.h"
13 #include "lib/sorter/common.h"
14
15 #include <string.h>
16
17 static int
18 sorter_presort(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only)
19 {
20   sorter_alloc_buf(ctx);
21   if (in->flags & SBF_CUSTOM_PRESORT)
22     {
23       struct fastbuf *f = sbuck_write(out);
24       out->runs++;
25       return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size);   // FIXME: out_only optimization?
26     }
27   return ctx->internal_sort(ctx, in, out, out_only);
28 }
29
30 static inline struct sort_bucket *
31 sbuck_join_to(struct sort_bucket *b)
32 {
33   if (sorter_debug & SORT_DEBUG_NO_JOIN)
34     return NULL;
35
36   struct sort_bucket *out = (struct sort_bucket *) b->n.prev;   // Such bucket is guaranteed to exist
37   return (out->flags & SBF_FINAL) ? out : NULL;
38 }
39
40 static void
41 sorter_join(struct sort_bucket *b)
42 {
43   struct sort_bucket *join = (struct sort_bucket *) b->n.prev;
44   ASSERT(join->flags & SBF_FINAL);
45   ASSERT(b->runs == 1);
46
47   if (!sbuck_has_file(join))
48     {
49       // The final bucket doesn't have any file associated yet, so replace
50       // it with the new bucket.
51       SORT_XTRACE(2, "Replaced final bucket");
52       b->flags |= SBF_FINAL;
53       sbuck_drop(join);
54     }
55   else
56     {
57       SORT_TRACE("Copying to output file: %s", F_BSIZE(b));
58       struct fastbuf *src = sbuck_read(b);
59       struct fastbuf *dest = sbuck_write(join);
60       bbcopy(src, dest, ~0U);
61       sbuck_drop(b);
62     }
63 }
64
65 static void
66 sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
67 {
68   struct sort_bucket *ins[3] = { NULL }, *outs[3] = { NULL };
69   cnode *list_pos = b->n.prev;
70   struct sort_bucket *join = sbuck_join_to(b);
71
72   if (!(sorter_debug & SORT_DEBUG_NO_PRESORT) || (b->flags & SBF_CUSTOM_PRESORT))
73     {
74       SORT_XTRACE(2, "%s", ((b->flags & SBF_CUSTOM_PRESORT) ? "Custom presorting" : "Presorting"));
75       ins[0] = sbuck_new(ctx);
76       if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
77         {
78           SORT_TRACE("Sorted in memory");
79           if (join)
80             sbuck_drop(ins[0]);
81           else
82             clist_insert_after(&ins[0]->n, list_pos);
83           sbuck_drop(b);
84           return;
85         }
86
87       ins[1] = sbuck_new(ctx);
88       int i = 1;
89       while (sorter_presort(ctx, b, ins[i], ins[i]))
90         i = 1-i;
91       sbuck_drop(b);
92       SORT_TRACE("Presorting pass (%d+%d runs, %s+%s)", ins[0]->runs, ins[1]->runs, F_BSIZE(ins[0]), F_BSIZE(ins[1]));
93     }
94   else
95     {
96       SORT_XTRACE(2, "Presorting disabled");
97       ins[0] = b;
98     }
99
100   SORT_XTRACE(2, "Main sorting");
101   uns pass = 0;
102   do {
103     ++pass;
104     if (ins[0]->runs == 1 && ins[1]->runs == 1 && join)
105       {
106         // This is guaranteed to produce a single run, so join if possible
107         outs[0] = join;
108         outs[1] = NULL;
109         ctx->twoway_merge(ctx, ins, outs);
110         ASSERT(outs[0]->runs == 2);
111         outs[0]->runs--;
112         SORT_TRACE("Mergesort pass %d (final run, %s)", pass, F_BSIZE(outs[0]));
113         sbuck_drop(ins[0]);
114         sbuck_drop(ins[1]);
115         return;
116       }
117     outs[0] = sbuck_new(ctx);
118     outs[1] = sbuck_new(ctx);
119     outs[2] = NULL;
120     ctx->twoway_merge(ctx, ins, outs);
121     SORT_TRACE("Mergesort pass %d (%d+%d runs, %s+%s)", pass, outs[0]->runs, outs[1]->runs, F_BSIZE(outs[0]), F_BSIZE(outs[1]));
122     sbuck_drop(ins[0]);
123     sbuck_drop(ins[1]);
124     memcpy(ins, outs, 3*sizeof(struct sort_bucket *));
125   } while (sbuck_have(ins[1]));
126
127   sbuck_drop(ins[1]);
128   clist_insert_after(&ins[0]->n, list_pos);
129 }
130
131 void
132 sorter_run(struct sort_context *ctx)
133 {
134   ctx->pool = mp_new(4096);
135   clist_init(&ctx->bucket_list);
136
137   /* FIXME: There should be a way how to detect size of the input file */
138   /* FIXME: Remember to test sorting of empty files */
139
140   // Create bucket containing the source
141   struct sort_bucket *bin = sbuck_new(ctx);
142   bin->flags = SBF_SOURCE | SBF_OPEN_READ;
143   if (ctx->custom_presort)
144     bin->flags |= SBF_CUSTOM_PRESORT;
145   else
146     bin->fb = ctx->in_fb;
147   bin->ident = "in";
148   bin->size = ~(u64)0;
149   bin->hash_bits = ctx->hash_bits;
150   clist_add_tail(&ctx->bucket_list, &bin->n);
151
152   // Create bucket for the output
153   struct sort_bucket *bout = sbuck_new(ctx);
154   bout->flags = SBF_FINAL;
155   if (bout->fb = ctx->out_fb)
156     bout->flags |= SBF_OPEN_WRITE;
157   bout->ident = "out";
158   bout->runs = 1;
159   clist_add_head(&ctx->bucket_list, &bout->n);
160
161   struct sort_bucket *b;
162   while (bout = clist_head(&ctx->bucket_list), b = clist_next(&ctx->bucket_list, &bout->n))
163     {
164       if (!sbuck_have(b))
165         sbuck_drop(b);
166       else if (b->runs == 1)
167         sorter_join(b);
168       else
169         sorter_twoway(ctx, b);
170     }
171
172   sorter_free_buf(ctx);
173   sbuck_write(bout);            // Force empty bucket to a file
174   SORT_XTRACE(2, "Final size: %s", F_BSIZE(bout));
175   ctx->out_fb = sbuck_read(bout);
176 }