]> mj.ucw.cz Git - libucw.git/commitdiff
More bits of the sorter.
authorMartin Mares <mj@ucw.cz>
Fri, 2 Feb 2007 18:09:59 +0000 (19:09 +0100)
committerMartin Mares <mj@ucw.cz>
Fri, 2 Feb 2007 18:09:59 +0000 (19:09 +0100)
In-memory presorting (the general case, but without unification) seems to
work. However, I am not happy with the bucket mechanics yet, it's too
complicated.

lib/sorter/common.h
lib/sorter/config.c
lib/sorter/govern.c
lib/sorter/s-internal.h
lib/sorter/s-twoway.h
lib/sorter/sorter.h

index 9747a5e082fec181276af6c29e29e74fdef8b482..15083663f3cb03aead02787bdba87d2b7ae4982b 100644 (file)
 
 /* Configuration, some of the variables are used by the old sorter, too. */
 extern uns sorter_trace, sorter_presort_bufsize, sorter_stream_bufsize;
+extern u64 sorter_bufsize;
+
+#define SORT_TRACE(x...) do { if (sorter_trace) log(L_DEBUG, x); } while(0)
+#define SORT_XTRACE(x...) do { if (sorter_trace > 1) log(L_DEBUG, x); } while(0)
 
 struct sort_bucket {
   cnode n;
@@ -21,14 +25,15 @@ struct sort_bucket {
   struct fastbuf *fb;
   byte *name;
   u64 size;                            // Size in bytes
-  uns runs;                            // Number of runs, 0 if unknown
+  uns runs;                            // Number of runs, 0 if not sorted
   uns hash_bits;                       // Remaining bits of the hash function
   byte *ident;                         // Identifier used in debug messages
 };
 
 enum sort_bucket_flags {
-  SBF_FINAL = 1,                       // This bucket corresponds to the final output file
-  SBF_SOURCE = 2,                      // Contains the source file
+  SBF_FINAL = 1,                       // This bucket corresponds to the final output file (always 1 run)
+  SBF_SOURCE = 2,                      // Contains the source file (always 0 runs)
+  SBF_CUSTOM_PRESORT = 4,              // Contains source to read via custom presorter
 };
 
 struct sort_context {
@@ -38,24 +43,35 @@ struct sort_context {
 
   struct mempool *pool;
   clist bucket_list;
-  byte *big_buf, *big_buf_half;
-  uns big_buf_size, big_buf_half_size;
+  void *big_buf, *big_buf_half;
+  size_t big_buf_size, big_buf_half_size;
 
-  struct fastbuf *(*custom_presort)(void);
+  int (*custom_presort)(struct fastbuf *dest, byte *buf, size_t bufsize);
   // Take as much as possible from the source bucket, sort it in memory and dump to destination bucket.
   // Return 1 if there is more data available in the source bucket.
-  int (*internal_sort)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out);
+  int (*internal_sort)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only);
   // Two-way split/merge: merge up to 2 source buckets to up to 2 destination buckets.
   // Bucket arrays are NULL-terminated.
   void (*twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, struct sort_bucket **outs);
+
+  // State variables of internal_sort
+  void *key_buf;
+  int more_keys;
 };
 
 void sorter_run(struct sort_context *ctx);
 
-struct sort_bucket *sorter_new_bucket(struct sort_context *ctx);
-struct fastbuf *sorter_open_read(struct sort_bucket *b);
-struct fastbuf *sorter_open_write(struct sort_bucket *b);
-void sorter_close_read(struct sort_bucket *b);
-void sorter_close_write(struct sort_bucket *b);
+void *sorter_alloc(struct sort_context *ctx, uns size);
+void sorter_alloc_buf(struct sort_context *ctx);
+void sorter_free_buf(struct sort_context *ctx);
+
+// Operations on buckets
+struct sort_bucket *sbuck_new(struct sort_context *ctx);
+void sbuck_drop(struct sort_bucket *b);
+int sbuck_can_read(struct sort_bucket *b);
+struct fastbuf *sbuck_open_read(struct sort_bucket *b);
+struct fastbuf *sbuck_open_write(struct sort_bucket *b);
+void sbuck_close_read(struct sort_bucket *b);
+void sbuck_close_write(struct sort_bucket *b);
 
 #endif
index 0026316b90bd59e55b453977b1d242db27886b35..b1504047d5c576e1136ef05267cbaa3368812934 100644 (file)
 uns sorter_trace;
 uns sorter_presort_bufsize = 65536;
 uns sorter_stream_bufsize = 65536;
+u64 sorter_bufsize = 65536;
 
 static struct cf_section sorter_config = {
   CF_ITEMS {
     CF_UNS("Trace", &sorter_trace),
     CF_UNS("PresortBuffer", &sorter_presort_bufsize),
     CF_UNS("StreamBuffer", &sorter_stream_bufsize),
+    CF_U64("SortBuffer", &sorter_bufsize),
     CF_END
   }
 };
index aece061c528b9093eadeb8dd057663195ffed07e..97942dee37bbc0d7604d3b2a6ed62f0e7fb631ca 100644 (file)
 #include "lib/mempool.h"
 #include "lib/sorter/common.h"
 
+void *
+sorter_alloc(struct sort_context *ctx, uns size)
+{
+  return mp_alloc_zero(ctx->pool, size);
+}
+
 struct sort_bucket *
-sorter_new_bucket(struct sort_context *ctx)
+sbuck_new(struct sort_context *ctx)
 {
-  return mp_alloc_zero(ctx->pool, sizeof(struct sort_bucket));
+  return sorter_alloc(ctx, sizeof(struct sort_bucket));
+}
+
+void
+sbuck_drop(struct sort_bucket *b)
+{
+  if (b)
+    {
+      if (b->n.prev)
+       clist_remove(&b->n);
+      bclose(b->fb);
+      bzero(b, sizeof(*b));
+    }
+}
+
+int
+sbuck_can_read(struct sort_bucket *b)
+{
+  return b && b->size;
 }
 
 struct fastbuf *
-sorter_open_read(struct sort_bucket *b)
+sbuck_open_read(struct sort_bucket *b)
 {
   /* FIXME: These functions should handle buckets with no fb and only name. */
   ASSERT(b->fb);
@@ -27,7 +51,7 @@ sorter_open_read(struct sort_bucket *b)
 }
 
 struct fastbuf *
-sorter_open_write(struct sort_bucket *b)
+sbuck_open_write(struct sort_bucket *b)
 {
   if (!b->fb)
     b->fb = bopen_tmp(sorter_stream_bufsize);
@@ -35,7 +59,7 @@ sorter_open_write(struct sort_bucket *b)
 }
 
 void
-sorter_close_read(struct sort_bucket *b)
+sbuck_close_read(struct sort_bucket *b)
 {
   if (!b)
     return;
@@ -45,46 +69,174 @@ sorter_close_read(struct sort_bucket *b)
 }
 
 void
-sorter_close_write(struct sort_bucket *b)
+sbuck_close_write(struct sort_bucket *b)
 {
   if (b->fb)
     {
       b->size = btell(b->fb);
       brewind(b->fb);
     }
-  /* FIXME: Remove empty buckets from the list automatically? */
 }
 
 void
-sorter_run(struct sort_context *ctx)
+sorter_alloc_buf(struct sort_context *ctx)
 {
-  ctx->pool = mp_new(4096);
-  ASSERT(!ctx->custom_presort);
-  ASSERT(!ctx->out_fb);
-  clist_init(&ctx->bucket_list);
+  if (ctx->big_buf)
+    return;
+  u64 bs = MAX(sorter_bufsize/2, 1);
+  bs = ALIGN_TO(bs, (u64)CPU_PAGE_SIZE);
+  ctx->big_buf = big_alloc(2*bs);
+  ctx->big_buf_size = 2*bs;
+  ctx->big_buf_half = ((byte*) ctx->big_buf) + bs;
+  ctx->big_buf_half_size = bs;
+  SORT_XTRACE("Allocated sorting buffer (%jd bytes)", (uintmax_t) bs);
+}
 
-  /* FIXME: There should be a way how to detect size of the input file */
+void
+sorter_free_buf(struct sort_context *ctx)
+{
+  if (!ctx->big_buf)
+    return;
+  big_free(ctx->big_buf, ctx->big_buf_size);
+  ctx->big_buf = NULL;
+  SORT_XTRACE("Freed sorting buffer");
+}
 
-  /* Trivial 2-way merge with no presorting (just a testing hack) */
-  struct sort_bucket *bin = sorter_new_bucket(ctx);
-  bin->flags = SBF_SOURCE;
-  bin->fb = ctx->in_fb;
-  bin->ident = "src";
+static int sorter_presort(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only)
+{
+  /* FIXME: Mode with no presorting (mostly for debugging) */
+  sorter_alloc_buf(ctx);
+  if (in->flags & SBF_CUSTOM_PRESORT)
+    {
+      struct fastbuf *f = sbuck_open_write(out);
+      return ctx->custom_presort(f, ctx->big_buf, ctx->big_buf_size);  // FIXME: out_only optimization?
+    }
+  return ctx->internal_sort(ctx, in, out, out_only);
+}
+
+static inline struct sort_bucket *
+sbuck_join_to(struct sort_bucket *b)
+{
+  struct sort_bucket *out = (struct sort_bucket *) b->n.prev;  // Such bucket is guaranteed to exist
+  return (out->flags & SBF_FINAL) ? out : NULL;
+}
+
+static void
+sorter_join(struct sort_bucket *b)
+{
+  struct sort_bucket *join = sbuck_join_to(b);
+  ASSERT(join);
+
+  // FIXME: What if the final bucket doesn't contain any file yet?
+
+  SORT_TRACE("Copying %jd bytes to output file", (uintmax_t) b->size);
+  struct fastbuf *src = sbuck_open_read(b);
+  struct fastbuf *dest = sbuck_open_write(join);
+  bbcopy(src, dest, ~0U);
+  sbuck_drop(b);
+}
+
+static void
+sorter_twoway(struct sort_context *ctx, struct sort_bucket *b)
+{
   struct sort_bucket *ins[3], *outs[3];
-  ins[0] = bin;
-  ins[1] = NULL;
+  struct sort_bucket *join = sbuck_join_to(b);
+
+  SORT_TRACE("Presorting");
+  ins[0] = sbuck_new(ctx);
+  sbuck_open_read(b);
+  if (!sorter_presort(ctx, b, ins[0], join ? : ins[0]))
+    {
+      if (join)
+       sbuck_drop(ins[0]);
+      else
+       clist_insert_after(&ins[0]->n, &b->n);
+      sbuck_drop(b);
+      return;
+    }
 
+  ins[1] = sbuck_new(ctx);
+  ins[2] = NULL;
+  int i = 1;
+  while (sorter_presort(ctx, b, ins[i], ins[i]))
+    i = 1-i;
+  sbuck_close_read(b);
+  sbuck_close_write(ins[0]);
+  sbuck_close_write(ins[1]);
+
+  SORT_TRACE("Main sorting");
   do {
-    outs[0] = sorter_new_bucket(ctx);
-    outs[1] = sorter_new_bucket(ctx);
+    if (ins[0]->runs == 1 && ins[1]->runs == 1 && join)        // FIXME: Debug switch for disabling joining optimizations
+      {
+       // This is guaranteed to produce a single run, so join if possible
+       outs[0] = join;
+       outs[1] = NULL;
+       ctx->twoway_merge(ctx, ins, outs);
+       ASSERT(outs[0]->runs == 2);
+       outs[0]->runs--;
+       SORT_TRACE("Pass done (joined final run)");
+       sbuck_drop(b);
+       return;
+      }
+    outs[0] = sbuck_new(ctx);
+    outs[1] = sbuck_new(ctx);
     outs[2] = NULL;
-    log(L_DEBUG, "Pass...");
     ctx->twoway_merge(ctx, ins, outs);
-    log(L_DEBUG, "Done (%d+%d runs)", outs[0]->runs, outs[1]->runs);
-    sorter_close_write(outs[0]);
-    sorter_close_write(outs[1]);
+    sbuck_close_write(outs[0]);
+    sbuck_close_write(outs[1]);
+    SORT_TRACE("Pass done (%d+%d runs, %jd+%jd bytes)", outs[0]->runs, outs[1]->runs, (uintmax_t) outs[0]->size, (uintmax_t) outs[1]->size);
+    sbuck_drop(ins[0]);
+    sbuck_drop(ins[1]);
     memcpy(ins, outs, 3*sizeof(struct sort_bucket *));
-  } while (ins[1]->fb);
+  } while (ins[1]->size);
+
+  sbuck_drop(ins[1]);
+  clist_insert_after(&ins[0]->n, &b->n);
+  sbuck_drop(b);
+}
+
+void
+sorter_run(struct sort_context *ctx)
+{
+  ctx->pool = mp_new(4096);
+  clist_init(&ctx->bucket_list);
+
+  /* FIXME: There should be a way how to detect size of the input file */
+  /* FIXME: Remember to test sorting of empty files */
+
+  // Create bucket containing the source
+  struct sort_bucket *bin = sbuck_new(ctx);
+  bin->flags = SBF_SOURCE;
+  if (ctx->custom_presort)
+    bin->flags |= SBF_CUSTOM_PRESORT;
+  else
+    bin->fb = ctx->in_fb;
+  bin->ident = "in";
+  bin->size = ~(u64)0;
+  bin->hash_bits = ctx->hash_bits;
+  clist_add_tail(&ctx->bucket_list, &bin->n);
+
+  // Create bucket for the output
+  struct sort_bucket *bout = sbuck_new(ctx);
+  bout->flags = SBF_FINAL;
+  bout->fb = ctx->out_fb;
+  bout->ident = "out";
+  bout->runs = 1;
+  clist_add_head(&ctx->bucket_list, &bout->n);
+
+  struct sort_bucket *b;
+  while (b = clist_next(&ctx->bucket_list, &bout->n))
+    {
+      if (!b->size)
+       sbuck_drop(b);
+      else if (b->runs == 1)
+       sorter_join(b);
+      else
+       sorter_twoway(ctx, b);
+    }
 
-  ctx->out_fb = sorter_open_read(ins[0]);
+  sorter_free_buf(ctx);
+  sbuck_close_write(bout);
+  SORT_XTRACE("Final size: %jd", (uintmax_t) bout->size);
+  ctx->out_fb = sbuck_open_read(bout);
 }
index 911b74e2403986e33586b79bbe54535e5ea8f59d..7f779401f147ade11efdd383dde8aba4d202ba3b 100644 (file)
@@ -7,8 +7,100 @@
  *     of the GNU Lesser General Public License.
  */
 
-static int P(internal)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out)
+typedef struct {
+  P(key) *key;
+  // FIXME: Add the hash here to save cache misses
+} P(internal_item_t);
+
+#define ASORT_PREFIX(x) SORT_PREFIX(array_##x)
+#define ASORT_KEY_TYPE P(internal_item_t)
+#define ASORT_ELT(i) ary[i]
+#define ASORT_LT(x,y) (P(compare)((x).key, (y).key) < 0)
+#define ASORT_EXTRA_ARGS , P(internal_item_t) *ary
+#include "lib/arraysort.h"
+
+static int P(internal)(struct sort_context *ctx, struct sort_bucket *bin, struct sort_bucket *bout, struct sort_bucket *bout_only)
 {
-  /* FIXME :) */
-  return 0;
+  sorter_alloc_buf(ctx);
+  ASSERT(bin->fb);                     // Expects the input bucket to be already open for reading
+  struct fastbuf *in = bin->fb;
+
+  P(key) key, *keybuf = ctx->key_buf;
+  if (!keybuf)
+    keybuf = ctx->key_buf = sorter_alloc(ctx, sizeof(key));
+  if (ctx->more_keys)
+    {
+      key = *keybuf;
+      ctx->more_keys = 0;
+    }
+  else if (!P(read_key)(in, &key))
+    return 0;
+
+#ifdef SORT_VAR_DATA
+  if (sizeof(key) + 1024 + SORT_DATA_SIZE(key) > ctx->big_buf_half_size)
+    {
+      SORT_XTRACE("s-internal: Generating a giant run");
+      struct fastbuf *out = sorter_open_write(bout); /* FIXME: Using a non-direct buffer would be nice here */
+      P(copy_data)(&key, in, out);
+      bout->runs++;
+      return 1;                                // We don't know, but 1 is always safe
+    }
+#endif
+
+  size_t bufsize = ctx->big_buf_half_size;     /* FIXME: In some cases, we can use the whole buffer */
+  bufsize = MIN((u64)bufsize, (u64)~0U * sizeof(P(internal_item_t)));  // The number of records must fit in uns
+
+  SORT_XTRACE("s-internal: Reading (bufsize=%zd)", bufsize);
+  P(internal_item_t) *item_array = ctx->big_buf, *item = item_array, *last_item;
+  byte *end = (byte *) ctx->big_buf + bufsize;
+  do
+    {
+      uns ksize = SORT_KEY_SIZE(key);
+#ifdef SORT_UNIFY
+      uns ksize_aligned = ALIGN_TO(ksize, CPU_STRUCT_ALIGN);
+#else
+      uns ksize_aligned = ksize;
+#endif
+      uns dsize = SORT_DATA_SIZE(key);
+      uns recsize = ALIGN_TO(ksize_aligned + dsize, CPU_STRUCT_ALIGN);
+      if (unlikely(sizeof(P(internal_item_t)) + recsize > (size_t)(end - (byte *) item)))
+       {
+         ctx->more_keys = 1;
+         *keybuf = key;
+         break;
+       }
+      end -= recsize;
+      memcpy(end, &key, ksize);
+#ifdef SORT_VAR_DATA
+      breadb(in, end + ksize_aligned, dsize);
+#endif
+      item->key = (P(key)*) end;
+      item++;
+    }
+  while (P(read_key)(in, &key));
+  last_item = item;
+
+  uns count = last_item - item_array;
+  SORT_XTRACE("s-internal: Sorting %d items", count);
+  P(array_sort)(count, item_array);
+
+  SORT_XTRACE("s-internal: Writing");
+  if (!ctx->more_keys)
+    bout = bout_only;
+  struct fastbuf *out = sbuck_open_write(bout);
+  bout->runs++;
+  /* FIXME: No unification done yet */
+  for (item = item_array; item < last_item; item++)
+    {
+      P(write_key)(out, item->key);
+#ifdef SORT_VAR_DATA
+      uns ksize = SORT_KEY_SIZE(*item->key);
+#ifdef SORT_UNIFY
+      ksize = ALIGN_TO(ksize, CPU_STRUCT_ALIGN);
+#endif
+      bwrite(out, (byte *) item->key + ksize, SORT_DATA_SIZE(*item->key));
+#endif
+    }
+
+  return ctx->more_keys;
 }
index 6bbdf28d1e9690dc06e6d4994fb7fee2377c3d65..16a6720b10d75526f272c5193d2a43402003b0d8 100644 (file)
@@ -10,7 +10,7 @@
 /* FIXME: There is a plenty of room for further optimization */
 /* FIXME: Swap outputs if there already are some runs? */
 
-static void P(twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, struct sort_bucket **outs)
+static void P(twoway_merge)(struct sort_context *ctx UNUSED, struct sort_bucket **ins, struct sort_bucket **outs)
 {
   struct fastbuf *fin1, *fin2, *fout1, *fout2, *ftmp;
   P(key) kbuf1, kbuf2, kbuf3, kbuf4;
@@ -20,11 +20,11 @@ static void P(twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins,
   int comp;
   uns run_count = 0;
 
-  fin1 = sorter_open_read(ins[0]);
+  fin1 = sbuck_open_read(ins[0]);
   next1 = P(read_key)(fin1, kin1);
-  if (ins[1])
+  if (sbuck_can_read(ins[1]))
     {
-      fin2 = sorter_open_read(ins[1]);
+      fin2 = sbuck_open_read(ins[1]);
       next2 = P(read_key)(fin2, kin2);
     }
   else
@@ -50,9 +50,9 @@ static void P(twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins,
          if (unlikely(!fout1))
            {
              if (!fout2)
-               fout1 = sorter_open_write(outs[0]);
+               fout1 = sbuck_open_write(outs[0]);
              else if (outs[1])
-               fout1 = sorter_open_write(outs[1]);
+               fout1 = sbuck_open_write(outs[1]);
              else
                fout1 = fout2;
            }
@@ -100,8 +100,8 @@ static void P(twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins,
        }
     }
 
-  sorter_close_read(ins[0]);
-  sorter_close_read(ins[1]);
+  sbuck_close_read(ins[0]);
+  sbuck_close_read(ins[1]);
   if (fout2 && fout2 != fout1)
     outs[1]->runs += run_count / 2;
   if (fout1)
index fe6cefc11f28b67df6c4d92e9f95704441fccce6..a380bec786bea3d50660e8624b0905d751a377c0 100644 (file)
@@ -56,7 +56,7 @@
  *
  *  Unification:
  *
- *  SORT_MERGE         merge items with identical keys, needs the following functions:
+ *  SORT_UNIFY         merge items with identical keys, needs the following functions:
  *  void PREFIX_write_merged(struct fastbuf *f, SORT_KEY **keys, uns n, byte *buf)
  *                     takes n records in memory with keys which compare equal and writes
  *                     a single record to the given fastbuf. Data for each key can
  *
  *  SORT_INPUT_FILE    file of a given name
  *  SORT_INPUT_FB      fastbuf stream
- *  SORT_INPUT_PRESORT custom presorter: call function PREFIX_presorter (see below)
- *                     to get successive batches of pre-sorted data as temporary
- *                     fastbuf streams or NULL if no more data is available.
+ *  SORT_INPUT_PRESORT custom presorter. Calls function
+ *  int PREFIX_presort(struct fastbuf *dest, byte *buf, size_t bufsize);
+ *                     to get successive batches of pre-sorted data.
  *                     The function is passed a page-aligned presorting buffer.
+ *                     It returns 1 on success or 0 on EOF.
  *
  *  Output (chose one of these):
  *
@@ -143,7 +144,7 @@ static inline int P(hash) (P(key) *x)
 #endif
 #endif
 
-#ifdef SORT_MERGE
+#ifdef SORT_UNIFY
 #define LESS <
 #else
 #define LESS <=
@@ -154,10 +155,22 @@ static inline int P(hash) (P(key) *x)
 #define SORT_ASSERT_UNIQUE
 #endif
 
+#ifdef SORT_KEY_SIZE
+#define SORT_VAR_KEY
+#else
+#define SORT_KEY_SIZE(key) sizeof(key)
+#endif
+
+#ifdef SORT_DATA_SIZE
+#define SORT_VAR_DATA
+#else
+#define SORT_DATA_SIZE(key) 0
+#endif
+
 static inline void P(copy_data)(P(key) *key, struct fastbuf *in, struct fastbuf *out)
 {
   bwrite(out, key, sizeof(P(key)));
-#ifdef SORT_DATA_SIZE
+#ifdef SORT_VAR_DATA
   bbcopy(in, out, SORT_DATA_SIZE(*key));
 #else
   (void) in;
@@ -192,7 +205,7 @@ static struct fastbuf *P(sort)(
   ctx.in_fb = in;
 #elif defined(SORT_INPUT_PRESORT)
   ASSERT(!in);
-  ctx.custom_presort = P(presorter);
+  ctx.custom_presort = P(presort);
 #else
 #error No input given.
 #endif
@@ -234,9 +247,11 @@ static struct fastbuf *P(sort)(
 #undef SORT_KEY_REGULAR
 #undef SORT_KEY_SIZE
 #undef SORT_DATA_SIZE
+#undef SORT_VAR_KEY
+#undef SORT_VAR_DATA
 #undef SORT_INT
 #undef SORT_HASH_BITS
-#undef SORT_MERGE
+#undef SORT_UNIFY
 #undef SORT_INPUT_FILE
 #undef SORT_INPUT_FB
 #undef SORT_INPUT_PRESORT