]> mj.ucw.cz Git - libucw.git/commitdiff
Merge with git+ssh://git.ucw.cz/projects/sherlock/GIT/sherlock.git
authorPavel Charvat <pavel.charvat@netcentrum.cz>
Mon, 25 Jun 2007 08:38:49 +0000 (10:38 +0200)
committerPavel Charvat <pavel.charvat@netcentrum.cz>
Mon, 25 Jun 2007 08:38:49 +0000 (10:38 +0200)
19 files changed:
1  2 
debug/sorter/asio-test.c
debug/sorter/file-test.c
images/color.c
lib/Makefile
lib/asio.t
lib/fastbuf.h
lib/fastbuf.t
lib/fb-direct.c
lib/fb-file.c
lib/fb-mmap.c
lib/fb-param.c
lib/lib.h
lib/mainloop.h
lib/sorter/common.h
lib/sorter/old-test.c
lib/sorter/sort-test.c
lib/stkstring.c
lib/stkstring.h
lib/stkstring.t

index a7c61627e77c49482fc7cdc961fc66dad0e9f48a,0000000000000000000000000000000000000000..40df77b33125c175e9858b6a290f4edbde7b2149
mode 100644,000000..100644
--- /dev/null
@@@ -1,157 -1,0 +1,157 @@@
-   log(L_INFO, "Spent %.3f sec (%.2f MB/sec)", (double)cnt_ms/1000, (double)cnt / 1048576 * 1000 / cnt_ms); \
 +/*
 + *  An experiment with parallel reading and writing of files using ASIO.
 + */
 +
 +#include "lib/lib.h"
 +#include "lib/lfs.h"
 +#include "lib/asio.h"
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +#include <string.h>
 +#include <fcntl.h>
 +#include <unistd.h>
 +
 +#define COPY
 +#define DIRECT O_DIRECT
 +
 +static timestamp_t timer;
 +
 +#define P_INIT do { cnt = 0; cnt_rep = 0; cnt_ms = 1; } while(0)
 +#define P_UPDATE(cc) do { \
 +  cnt += cc; \
 +  if (cnt >= cnt_rep) { cnt_ms += get_timer(&timer); \
 +    printf("%d of %d MB (%.2f MB/sec)\r", (int)(cnt >> 20), (int)(total_size >> 20), (double)cnt / 1048576 * 1000 / cnt_ms); \
 +    fflush(stdout); cnt_rep += 1<<26; } } while(0)
 +#define P_FINAL do { \
 +  cnt_ms += get_timer(&timer); \
-   log(L_INFO, "Creating input file");
++  msg(L_INFO, "Spent %.3f sec (%.2f MB/sec)", (double)cnt_ms/1000, (double)cnt / 1048576 * 1000 / cnt_ms); \
 +} while(0)
 +
 +static struct asio_queue io_queue;
 +
 +int main(int argc, char **argv)
 +{
 +  ASSERT(argc == 4);
 +  uns files = atol(argv[1]);
 +  uns bufsize = atol(argv[2]) * 1024;                         // Kbytes
 +  u64 total_size = (u64)atol(argv[3]) * 1024*1024*1024;               // Gbytes
 +  u64 cnt, cnt_rep;
 +  uns cnt_ms;
 +  int fd[files];
 +  byte name[files][16];
 +  struct asio_request *req[files];
 +
 +  init_timer(&timer);
 +
 +  io_queue.buffer_size = bufsize;
 +  io_queue.max_writebacks = 2;
 +  asio_init_queue(&io_queue);
 +
 +#ifdef COPY
-   log(L_INFO, "Initializing output files");
++  msg(L_INFO, "Creating input file");
 +  int in_fd = sh_open("tmp/ft-in", O_RDWR | O_CREAT | O_TRUNC | DIRECT, 0666);
 +  ASSERT(in_fd >= 0);
 +  ASSERT(!(total_size % bufsize));
 +  P_INIT;
 +  for (uns i=0; i<total_size/bufsize; i++)
 +    {
 +      struct asio_request *r = asio_get(&io_queue);
 +      r->op = ASIO_WRITE_BACK;
 +      r->fd = in_fd;
 +      r->len = bufsize;
 +      byte *xbuf = r->buffer;
 +      for (uns j=0; j<bufsize; j++)
 +      xbuf[j] = i+j;
 +      asio_submit(r);
 +      P_UPDATE(bufsize);
 +    }
 +  asio_sync(&io_queue);
 +  lseek(in_fd, 0, SEEK_SET);
 +  sync();
 +  P_FINAL;
 +#endif
 +
-   log(L_INFO, "Writing %d MB to %d files in parallel with %d byte buffers", (int)(total_size >> 20), files, bufsize);
++  msg(L_INFO, "Initializing output files");
 +  for (uns i=0; i<files; i++)
 +    {
 +      sprintf(name[i], "tmp/ft-%d", i);
 +      fd[i] = sh_open(name[i], O_RDWR | O_CREAT | O_TRUNC | DIRECT, 0666);
 +      if (fd[i] < 0)
 +      die("Cannot create %s: %m", name[i]);
 +    }
 +  sync();
 +  get_timer(&timer);
 +
-   log(L_INFO, "Syncing");
++  msg(L_INFO, "Writing %d MB to %d files in parallel with %d byte buffers", (int)(total_size >> 20), files, bufsize);
 +  P_INIT;
 +  for (uns i=0; i<files; i++)
 +    req[i] = asio_get(&io_queue);
 +  for (uns round=0; round<total_size/bufsize/files; round++)
 +    {
 +      for (uns i=0; i<files; i++)
 +      {
 +        struct asio_request *r = req[i];
 +#ifdef COPY
 +        struct asio_request *rr, *rd = asio_get(&io_queue);
 +        rd->op = ASIO_READ;
 +        rd->fd = in_fd;
 +        rd->len = bufsize;
 +        asio_submit(rd);
 +        rr = asio_wait(&io_queue);
 +        ASSERT(rr == rd && rd->status == (int)rd->len);
 +        memcpy(r->buffer, rd->buffer, bufsize);
 +        asio_put(rr);
 +#else
 +        for (uns j=0; j<bufsize; j++)
 +          r->buffer[j] = round+i+j;
 +#endif
 +        r->op = ASIO_WRITE_BACK;
 +        r->fd = fd[i];
 +        r->len = bufsize;
 +        asio_submit(r);
 +        P_UPDATE(bufsize);
 +        req[i] = asio_get(&io_queue);
 +      }
 +    }
 +  for (uns i=0; i<files; i++)
 +    asio_put(req[i]);
 +  asio_sync(&io_queue);
 +#ifdef COPY
 +  close(in_fd);
 +#endif
-   log(L_INFO, "Reading the files sequentially");
++  msg(L_INFO, "Syncing");
 +  sync();
 +  P_FINAL;
 +
-   log(L_INFO, "Done");
++  msg(L_INFO, "Reading the files sequentially");
 +  P_INIT;
 +  for (uns i=0; i<files; i++)
 +    {
 +      lseek(fd[i], 0, SEEK_SET);
 +      for (uns round=0; round<total_size/bufsize/files; round++)
 +      {
 +        struct asio_request *rr, *r = asio_get(&io_queue);
 +        r->op = ASIO_READ;
 +        r->fd = fd[i];
 +        r->len = bufsize;
 +        asio_submit(r);
 +        rr = asio_wait(&io_queue);
 +        ASSERT(rr == r && r->status == (int)bufsize);
 +        asio_put(r);
 +        P_UPDATE(bufsize);
 +      }
 +      close(fd[i]);
 +    }
 +  P_FINAL;
 +
 +  for (uns i=0; i<files; i++)
 +    unlink(name[i]);
 +#ifdef COPY
 +  unlink("tmp/ft-in");
 +#endif
 +
 +  asio_cleanup_queue(&io_queue);
++  msg(L_INFO, "Done");
 +  return 0;
 +}
index b6915b3fce9a93c230468bc8c026c67fb0c0e332,0000000000000000000000000000000000000000..430cb30c7ec000d4538f74f5b808295d28cc0d8b
mode 100644,000000..100644
--- /dev/null
@@@ -1,122 -1,0 +1,122 @@@
-   log(L_INFO, "Spent %.3f sec (%.2f MB/sec)", (double)cnt_ms/1000, (double)cnt / 1048576 * 1000 / cnt_ms); \
 +/*
 + *  An experiment with parallel reading and writing of files.
 + */
 +
 +#include "lib/lib.h"
 +#include "lib/lfs.h"
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +#include <fcntl.h>
 +#include <unistd.h>
 +
 +#define COPY
 +#define DIRECT O_DIRECT
 +
 +static timestamp_t timer;
 +
 +#define P_INIT do { cnt = 0; cnt_rep = 0; cnt_ms = 1; } while(0)
 +#define P_UPDATE(cc) do { \
 +  cnt += cc; \
 +  if (cnt >= cnt_rep) { cnt_ms += get_timer(&timer); \
 +    printf("%d of %d MB (%.2f MB/sec)\r", (int)(cnt >> 20), (int)(total_size >> 20), (double)cnt / 1048576 * 1000 / cnt_ms); \
 +    fflush(stdout); cnt_rep += 1<<26; } } while(0)
 +#define P_FINAL do { \
 +  cnt_ms += get_timer(&timer); \
-   log(L_INFO, "Creating input file");
++  msg(L_INFO, "Spent %.3f sec (%.2f MB/sec)", (double)cnt_ms/1000, (double)cnt / 1048576 * 1000 / cnt_ms); \
 +} while(0)
 +
 +int main(int argc, char **argv)
 +{
 +  ASSERT(argc == 4);
 +  uns files = atol(argv[1]);
 +  uns bufsize = atol(argv[2]) * 1024;                         // Kbytes
 +  u64 total_size = (u64)atol(argv[3]) * 1024*1024*1024;               // Gbytes
 +  u64 cnt, cnt_rep;
 +  uns cnt_ms;
 +  int fd[files];
 +  byte *buf[files], name[files][16];
 +  uns xbufsize = bufsize;                                     // Used for single-file I/O
 +  byte *xbuf = big_alloc(xbufsize);
 +
 +  init_timer(&timer);
 +
 +#ifdef COPY
-   log(L_INFO, "Initializing output files");
++  msg(L_INFO, "Creating input file");
 +  int in_fd = sh_open("tmp/ft-in", O_RDWR | O_CREAT | O_TRUNC | DIRECT, 0666);
 +  ASSERT(in_fd >= 0);
 +  ASSERT(!(total_size % xbufsize));
 +  P_INIT;
 +  for (uns i=0; i<total_size/xbufsize; i++)
 +    {
 +      for (uns j=0; j<xbufsize; j++)
 +      xbuf[j] = i+j;
 +      uns c = write(in_fd, xbuf, xbufsize);
 +      ASSERT(c == xbufsize);
 +      P_UPDATE(c);
 +    }
 +  lseek(in_fd, 0, SEEK_SET);
 +  sync();
 +  P_FINAL;
 +#endif
 +
-   log(L_INFO, "Writing %d MB to %d files in parallel with %d byte buffers", (int)(total_size >> 20), files, bufsize);
++  msg(L_INFO, "Initializing output files");
 +  for (uns i=0; i<files; i++)
 +    {
 +      sprintf(name[i], "tmp/ft-%d", i);
 +      fd[i] = sh_open(name[i], O_RDWR | O_CREAT | O_TRUNC | DIRECT, 0666);
 +      if (fd[i] < 0)
 +      die("Cannot create %s: %m", name[i]);
 +      buf[i] = big_alloc(bufsize);
 +    }
 +  sync();
 +  get_timer(&timer);
 +
-   log(L_INFO, "Syncing");
++  msg(L_INFO, "Writing %d MB to %d files in parallel with %d byte buffers", (int)(total_size >> 20), files, bufsize);
 +  P_INIT;
 +  for (uns r=0; r<total_size/bufsize/files; r++)
 +    {
 +      for (uns i=0; i<files; i++)
 +      {
 +#ifdef COPY
 +        uns ci = read(in_fd, buf[i], bufsize);
 +        ASSERT(ci == bufsize);
 +#else
 +        for (uns j=0; j<bufsize; j++)
 +          buf[i][j] = r+i+j;
 +#endif
 +        uns c = write(fd[i], buf[i], bufsize);
 +        ASSERT(c == bufsize);
 +        P_UPDATE(c);
 +      }
 +    }
 +#ifdef COPY
 +  close(in_fd);
 +#endif
-   log(L_INFO, "Reading the files sequentially");
++  msg(L_INFO, "Syncing");
 +  sync();
 +  P_FINAL;
 +
-   log(L_INFO, "Done");
++  msg(L_INFO, "Reading the files sequentially");
 +  P_INIT;
 +  for (uns i=0; i<files; i++)
 +    {
 +      lseek(fd[i], 0, SEEK_SET);
 +      for (uns r=0; r<total_size/xbufsize/files; r++)
 +      {
 +        uns c = read(fd[i], xbuf, xbufsize);
 +        ASSERT(c == xbufsize);
 +        P_UPDATE(c);
 +      }
 +      close(fd[i]);
 +    }
 +  P_FINAL;
 +
 +  for (uns i=0; i<files; i++)
 +    unlink(name[i]);
 +#ifdef COPY
 +  unlink("tmp/ft-in");
 +#endif
++  msg(L_INFO, "Done");
 +  return 0;
 +}
diff --cc images/color.c
Simple merge
diff --cc lib/Makefile
Simple merge
diff --cc lib/asio.t
index 4382107f7fcf765901d38e64f7185dc9e793cee7,0000000000000000000000000000000000000000..b660657e8acf42a339fd5568d8ca5acd190a6bf6
mode 100644,000000..100644
--- /dev/null
@@@ -1,4 -1,0 +1,4 @@@
- Run:  echo y | obj/lib/asio-t
 +# Tests for asynchronous I/O
 +
++Run:  echo y | ../obj/lib/asio-t
 +Out:  ABCDEFGHIJ
diff --cc lib/fastbuf.h
index 7bedd76df08200623ce264c66266295740042323,7ec5ec6f2ca993fd91fe6d1951260950ddee3460..70e95336fb6d27b38ffd85d678e6baccecd760ea
@@@ -73,39 -73,10 +73,39 @@@ struct fastbuf 
    int can_overwrite_buffer;           /* Can the buffer be altered? (see discussion above) 0=never, 1=temporarily, 2=permanently */
  };
  
- struct fastbuf *bopen_file(byte *name, int mode, struct fb_params *params);           /* Use params==NULL for defaults */
- struct fastbuf *bopen_file_try(byte *name, int mode, struct fb_params *params);
 +/* FastIO on files with run-time parametrization */
 +
 +enum fb_type {                                /* Which back-end you want to use */
 +  FB_STD,                             /* Standard buffered I/O */
 +  FB_DIRECT,                          /* Direct I/O bypassing system caches (see fb-direct.c for description) */
 +  FB_MMAP                             /* Memory mapped files */
 +};
 +
 +struct fb_params {
 +  enum fb_type type;
 +  uns buffer_size;
 +  /* FB_STD only */
 +  uns keep_back_buf;
 +  /* FB_DIRECT only */
 +  uns read_ahead;                     
 +  uns write_back;
 +  struct asio_queue *asio;
 +};
 +
 +struct cf_section;
 +extern struct cf_section fbpar_cf;
 +extern struct fb_params fbpar_def;
 +
++struct fastbuf *bopen_file(const byte *name, int mode, struct fb_params *params);     /* Use params==NULL for defaults */
++struct fastbuf *bopen_file_try(const byte *name, int mode, struct fb_params *params);
 +struct fastbuf *bopen_tmp_file(struct fb_params *params);
 +struct fastbuf *bopen_fd(int fd, struct fb_params *params);
 +
  /* FastIO on standard files (specify buffer size 0 to enable mmaping) */
  
- struct fastbuf *bfdopen_internal(int fd, byte *name, uns buflen);
- struct fastbuf *bopen(byte *name, uns mode, uns buflen);
- struct fastbuf *bopen_try(byte *name, uns mode, uns buflen);
++struct fastbuf *bfdopen_internal(int fd, const byte *name, uns buflen);
+ struct fastbuf *bopen(const byte *name, uns mode, uns buflen);
+ struct fastbuf *bopen_try(const byte *name, uns mode, uns buflen);
  struct fastbuf *bopen_tmp(uns buflen);
  struct fastbuf *bfdopen(int fd, uns buflen);
  struct fastbuf *bfdopen_shared(int fd, uns buflen);
@@@ -114,14 -85,6 +114,14 @@@ void bfilesync(struct fastbuf *b)
  #define TEMP_FILE_NAME_LEN 256
  void temp_file_name(byte *name);
  
- struct fastbuf *bfmmopen_internal(int fd, byte *name, uns mode);
 +/* Internal functions of some file back-ends */
 +
- struct fastbuf *fbdir_open_fd_internal(int fd, byte *name, struct asio_queue *io_queue, uns buffer_size, uns read_ahead, uns write_back);
++struct fastbuf *bfmmopen_internal(int fd, const byte *name, uns mode);
 +
 +extern uns fbdir_cheat;
 +struct asio_queue;
++struct fastbuf *fbdir_open_fd_internal(int fd, const byte *name, struct asio_queue *io_queue, uns buffer_size, uns read_ahead, uns write_back);
 +
  /* FastIO on in-memory streams */
  
  struct fastbuf *fbmem_create(unsigned blocksize);     /* Create stream and return its writing fastbuf */
diff --cc lib/fastbuf.t
index 4771a35e41f79e95879e141f3db607645253cffc,0000000000000000000000000000000000000000..6f8681a0d29c270ad22a5a23791275fe0cc3389b
mode 100644,000000..100644
--- /dev/null
@@@ -1,15 -1,0 +1,15 @@@
- Run:  obj/lib/fb-file-t
 +# Tests for fastbufs
 +
- Run:  obj/lib/fb-grow-t
++Run:  ../obj/lib/fb-file-t
 +Out:  112
 +      <hello><hello><hello><hello><hello><hello><hello><hello><hello><hello><hello><hello><hello><hello><hello><hello>
 +      112 116
 +
- Run:  obj/lib/fb-pool-t
++Run:  ../obj/lib/fb-grow-t
 +Out:  <10><10><0>1234512345<10><9>5<10>
 +      <10><10><0>1234512345<10><9>5<10>
 +      <10><10><0>1234512345<10><9>5<10>
 +      <10><10><0>1234512345<10><9>5<10>
 +      <10><10><0>1234512345<10><9>5<10>
 +
++Run:  ../obj/lib/fb-pool-t
diff --cc lib/fb-direct.c
index b82faa9157a05aabcdca4c373ba658316df5d629,0000000000000000000000000000000000000000..806064a9c9a10026e47e81d0136e55712c3838ba
mode 100644,000000..100644
--- /dev/null
@@@ -1,348 -1,0 +1,348 @@@
-       log(L_ERROR, "unlink(%s): %m", f->name);
 +/*
 + *    UCW Library -- Fast Buffered I/O on O_DIRECT Files
 + *
 + *    (c) 2006 Martin Mares <mj@ucw.cz>
 + *
 + *    This software may be freely distributed and used according to the terms
 + *    of the GNU Lesser General Public License.
 + */
 +
 +/*
 + *    This is a fastbuf backend for fast streaming I/O using O_DIRECT and
 + *    the asynchronous I/O module. It's designed for use on large files
 + *    which don't fit in the disk cache.
 + *
 + *    CAVEATS:
 + *
 + *      - All operations with a single fbdirect handle must be done
 + *        within a single thread, unless you provide a custom I/O queue
 + *        and take care of locking.
 + *
 + *    FIXME: what if the OS doesn't support O_DIRECT?
 + *    FIXME: unaligned seeks and partial writes?
 + *    FIXME: merge with other file-oriented fastbufs
 + */
 +
 +#undef LOCAL_DEBUG
 +
 +#include "lib/lib.h"
 +#include "lib/fastbuf.h"
 +#include "lib/lfs.h"
 +#include "lib/asio.h"
 +#include "lib/conf.h"
 +#include "lib/threads.h"
 +
 +#include <string.h>
 +#include <fcntl.h>
 +#include <unistd.h>
 +#include <stdio.h>
 +
 +uns fbdir_cheat;
 +
 +static struct cf_section fbdir_cf = {
 +  CF_ITEMS {
 +    CF_UNS("Cheat", &fbdir_cheat),
 +    CF_END
 +  }
 +};
 +
 +#define FBDIR_ALIGN 512
 +
 +enum fbdir_mode {                             // Current operating mode
 +    M_NULL,
 +    M_READ,
 +    M_WRITE
 +};
 +
 +struct fb_direct {
 +  struct fastbuf fb;
 +  int fd;                                     // File descriptor
 +  int is_temp_file;                           // 0=normal file, 1=temporary file, delete on close, -1=shared FD
 +  struct asio_queue *io_queue;                        // I/O queue to use
 +  struct asio_queue *user_queue;              // If io_queue was supplied by the user
 +  struct asio_request *pending_read;
 +  struct asio_request *done_read;
 +  struct asio_request *active_buffer;
 +  enum fbdir_mode mode;
 +  byte name[0];
 +};
 +#define FB_DIRECT(f) ((struct fb_direct *)(f)->is_fastbuf)
 +
 +static void CONSTRUCTOR
 +fbdir_global_init(void)
 +{
 +  cf_declare_section("FBDirect", &fbdir_cf, 0);
 +}
 +
 +static void
 +fbdir_read_sync(struct fb_direct *F)
 +{
 +  while (F->pending_read)
 +    {
 +      struct asio_request *r = asio_wait(F->io_queue);
 +      ASSERT(r);
 +      struct fb_direct *G = r->user_data;
 +      ASSERT(G);
 +      ASSERT(G->pending_read == r && !G->done_read);
 +      G->pending_read = NULL;
 +      G->done_read = r;
 +    }
 +}
 +
 +static void
 +fbdir_change_mode(struct fb_direct *F, enum fbdir_mode mode)
 +{
 +  if (F->mode == mode)
 +    return;
 +  DBG("FB-DIRECT: Switching mode to %d", mode);
 +  switch (F->mode)
 +    {
 +    case M_NULL:
 +      break;
 +    case M_READ:
 +      fbdir_read_sync(F);                     // Wait for read-ahead requests to finish
 +      if (F->done_read)                               // Return read-ahead requests if any
 +      {
 +        asio_put(F->done_read);
 +        F->done_read = NULL;
 +      }
 +      break;
 +    case M_WRITE:
 +      asio_sync(F->io_queue);                 // Wait for pending writebacks
 +      break;
 +    }
 +  if (F->active_buffer)
 +    {
 +      asio_put(F->active_buffer);
 +      F->active_buffer = NULL;
 +    }
 +  F->mode = mode;
 +}
 +
 +static void
 +fbdir_submit_read(struct fb_direct *F)
 +{
 +  struct asio_request *r = asio_get(F->io_queue);
 +  r->fd = F->fd;
 +  r->op = ASIO_READ;
 +  r->len = F->io_queue->buffer_size;
 +  r->user_data = F;
 +  asio_submit(r);
 +  F->pending_read = r;
 +}
 +
 +static int
 +fbdir_refill(struct fastbuf *f)
 +{
 +  struct fb_direct *F = FB_DIRECT(f);
 +
 +  DBG("FB-DIRECT: Refill");
 +
 +  if (!F->done_read)
 +    {
 +      if (!F->pending_read)
 +      {
 +        fbdir_change_mode(F, M_READ);
 +        fbdir_submit_read(F);
 +      }
 +      fbdir_read_sync(F);
 +      ASSERT(F->done_read);
 +    }
 +
 +  struct asio_request *r = F->done_read;
 +  F->done_read = NULL;
 +  if (F->active_buffer)
 +    asio_put(F->active_buffer);
 +  F->active_buffer = r;
 +  if (!r->status)
 +    return 0;
 +  if (r->status < 0)
 +    die("Error reading %s: %s", f->name, strerror(r->returned_errno));
 +  f->bptr = f->buffer = r->buffer;
 +  f->bstop = f->bufend = f->buffer + r->status;
 +  f->pos += r->status;
 +
 +  fbdir_submit_read(F);                               // Read-ahead the next block
 +
 +  return r->status;
 +}
 +
 +static void
 +fbdir_spout(struct fastbuf *f)
 +{
 +  struct fb_direct *F = FB_DIRECT(f);
 +  struct asio_request *r;
 +
 +  DBG("FB-DIRECT: Spout");
 +
 +  fbdir_change_mode(F, M_WRITE);
 +  r = F->active_buffer;
 +  if (r && f->bptr > f->bstop)
 +    {
 +      r->op = ASIO_WRITE_BACK;
 +      r->fd = F->fd;
 +      r->len = f->bptr - f->bstop;
 +      ASSERT(!(f->pos % FBDIR_ALIGN) || fbdir_cheat);
 +      f->pos += r->len;
 +      if (!fbdir_cheat && r->len % FBDIR_ALIGN)                       // Have to simulate incomplete writes
 +      {
 +        r->len = ALIGN_TO(r->len, FBDIR_ALIGN);
 +        asio_submit(r);
 +        asio_sync(F->io_queue);
 +        DBG("FB-DIRECT: Truncating at %llu", (long long)f->pos);
 +        if (sh_ftruncate(F->fd, f->pos) < 0)
 +          die("Error truncating %s: %m", f->name);
 +      }
 +      else
 +      asio_submit(r);
 +      r = NULL;
 +    }
 +  if (!r)
 +    r = asio_get(F->io_queue);
 +  f->bstop = f->bptr = f->buffer = r->buffer;
 +  f->bufend = f->buffer + F->io_queue->buffer_size;
 +  F->active_buffer = r;
 +}
 +
 +static int
 +fbdir_seek(struct fastbuf *f, sh_off_t pos, int whence)
 +{
 +  DBG("FB-DIRECT: Seek %llu %d", (long long)pos, whence);
 +
 +  if (whence == SEEK_SET && pos == f->pos)
 +    return 1;
 +
 +  fbdir_change_mode(FB_DIRECT(f), M_NULL);                    // Wait for all async requests to finish
 +  sh_off_t l = sh_seek(FB_DIRECT(f)->fd, pos, whence);
 +  if (l < 0)
 +    return 0;
 +  f->pos = l;
 +  return 1;
 +}
 +
 +static struct asio_queue *
 +fbdir_get_io_queue(uns buffer_size, uns write_back)
 +{
 +  struct ucwlib_context *ctx = ucwlib_thread_context();
 +  struct asio_queue *q = ctx->io_queue;
 +  if (!q)
 +    {
 +      q = xmalloc_zero(sizeof(struct asio_queue));
 +      q->buffer_size = buffer_size;
 +      q->max_writebacks = write_back;
 +      asio_init_queue(q);
 +      ctx->io_queue = q;
 +    }
 +  q->use_count++;
 +  DBG("FB-DIRECT: Got I/O queue, uc=%d", q->use_count);
 +  return q;
 +}
 +
 +static void
 +fbdir_put_io_queue(void)
 +{
 +  struct ucwlib_context *ctx = ucwlib_thread_context();
 +  struct asio_queue *q = ctx->io_queue;
 +  ASSERT(q);
 +  DBG("FB-DIRECT: Put I/O queue, uc=%d", q->use_count);
 +  if (!--q->use_count)
 +    {
 +      asio_cleanup_queue(q);
 +      xfree(q);
 +      ctx->io_queue = NULL;
 +    }
 +}
 +
 +static void
 +fbdir_close(struct fastbuf *f)
 +{
 +  struct fb_direct *F = FB_DIRECT(f);
 +
 +  DBG("FB-DIRECT: Close");
 +
 +  fbdir_change_mode(F, M_NULL);
 +  if (!F->user_queue)
 +    fbdir_put_io_queue();
 +
 +  switch (F->is_temp_file)
 +    {
 +    case 1:
 +      if (unlink(f->name) < 0)
- fbdir_open_fd_internal(int fd, byte *name, struct asio_queue *q, uns buffer_size, uns read_ahead UNUSED, uns write_back)
++      msg(L_ERROR, "unlink(%s): %m", f->name);
 +    case 0:
 +      close(F->fd);
 +    }
 +
 +  xfree(f);
 +}
 +
 +static int
 +fbdir_config(struct fastbuf *f, uns item, int value)
 +{
 +  switch (item)
 +    {
 +    case BCONFIG_IS_TEMP_FILE:
 +      FB_DIRECT(f)->is_temp_file = value;
 +      return 0;
 +    default:
 +      return -1;
 +    }
 +}
 +
 +struct fastbuf *
++fbdir_open_fd_internal(int fd, const byte *name, struct asio_queue *q, uns buffer_size, uns read_ahead UNUSED, uns write_back)
 +{
 +  int namelen = strlen(name) + 1;
 +  struct fb_direct *F = xmalloc(sizeof(struct fb_direct) + namelen);
 +  struct fastbuf *f = &F->fb;
 +
 +  DBG("FB-DIRECT: Open");
 +  bzero(F, sizeof(*F));
 +  f->name = F->name;
 +  memcpy(f->name, name, namelen);
 +  F->fd = fd;
 +  if (q)
 +    F->io_queue = F->user_queue = q;
 +  else
 +    F->io_queue = fbdir_get_io_queue(buffer_size, write_back);
 +  f->refill = fbdir_refill;
 +  f->spout = fbdir_spout;
 +  f->seek = fbdir_seek;
 +  f->close = fbdir_close;
 +  f->config = fbdir_config;
 +  f->can_overwrite_buffer = 2;
 +  return f;
 +}
 +
 +#ifdef TEST
 +
 +#include "lib/getopt.h"
 +
 +int main(int argc, char **argv)
 +{
 +  struct fastbuf *f, *t;
 +
 +  log_init(NULL);
 +  if (cf_getopt(argc, argv, CF_SHORT_OPTS, CF_NO_LONG_OPTS, NULL) >= 0)
 +    die("Hey, whaddya want?");
 +  f = (optind < argc) ? fbdir_open(argv[optind++], O_RDONLY, NULL) : fbdir_open_fd(0, NULL);
 +  t = (optind < argc) ? fbdir_open(argv[optind++], O_RDWR | O_CREAT | O_TRUNC, NULL) : fbdir_open_fd(1, NULL);
 +
 +  bbcopy(f, t, ~0U);
 +  ASSERT(btell(f) == btell(t));
 +
 +#if 0         // This triggers unaligned write
 +  bflush(t);
 +  bputc(t, '\n');
 +#endif
 +
 +  brewind(t);
 +  bgetc(t);
 +  ASSERT(btell(t) == 1);
 +
 +  bclose(f);
 +  bclose(t);
 +  return 0;
 +}
 +
 +#endif
diff --cc lib/fb-file.c
index 87791c978a388886f12c3197632e1915fc6be823,dd8d14886a6872e4fa74bd0a8add456467d93492..ed66dfe284c0d91160e58cf9404e27265d62e5de
@@@ -205,10 -71,9 +205,10 @@@ bfd_close(struct fastbuf *f
      {
      case 1:
        if (unlink(f->name) < 0)
-       log(L_ERROR, "unlink(%s): %m", f->name);
+       msg(L_ERROR, "unlink(%s): %m", f->name);
      case 0:
 -      close(FB_FILE(f)->fd);
 +      if (close(FB_FILE(f)->fd))
 +      die("close(%s): %m", f->name);
      }
    xfree(f);
  }
@@@ -229,12 -91,11 +229,12 @@@ bfd_config(struct fastbuf *f, uns item
      }
  }
  
 -static struct fastbuf *
 -bfdopen_internal(int fd, uns buflen, const byte *name)
 +struct fastbuf *
- bfdopen_internal(int fd, byte *name, uns buflen)
++bfdopen_internal(int fd, const byte *name, uns buflen)
  {
 +  ASSERT(buflen);
    int namelen = strlen(name) + 1;
 -  struct fb_file *F = xmalloc(sizeof(struct fb_file) + buflen + namelen);
 +  struct fb_file *F = xmalloc_zero(sizeof(struct fb_file) + buflen + namelen);
    struct fastbuf *f = &F->fb;
  
    bzero(F, sizeof(*F));
  }
  
  struct fastbuf *
- bopen_try(byte *name, uns mode, uns buflen)
+ bopen_try(const byte *name, uns mode, uns buflen)
  {
 -  int fd = sh_open(name, mode, 0666);
 -  if (fd < 0)
 -    return NULL;
 -  struct fastbuf *b = bfdopen_internal(fd, buflen, name);
 -  if (mode & O_APPEND)
 -    bfd_seek(b, 0, SEEK_END);
 -  return b;
 +  return bopen_file_try(name, mode, &(struct fb_params){ .type = FB_STD, .buffer_size = buflen });
  }
  
  struct fastbuf *
- bopen(byte *name, uns mode, uns buflen)
+ bopen(const byte *name, uns mode, uns buflen)
  {
 -  if (!buflen)
 -    return bopen_mm(name, mode);
 -  struct fastbuf *b = bopen_try(name, mode, buflen);
 -  if (!b)
 -    die("Unable to %s file %s: %m",
 -      (mode & O_CREAT) ? "create" : "open", name);
 -  return b;
 +  return bopen_file(name, mode, &(struct fb_params){ .type = FB_STD, .buffer_size = buflen });
  }
  
  struct fastbuf *
diff --cc lib/fb-mmap.c
index 15c542a07082bbc0f1a0f4d23d9d334596b9d461,28e90df47ca10b9afdfe70cce0f87f6b610deed0..d16458ef83b8e52de73a5afaab6b474d2cca9699
@@@ -146,10 -145,9 +146,10 @@@ bfmm_close(struct fastbuf *f
      {
      case 1:
        if (unlink(f->name) < 0)
-       log(L_ERROR, "unlink(%s): %m", f->name);
+       msg(L_ERROR, "unlink(%s): %m", f->name);
      case 0:
 -      close(F->fd);
 +      if (close(F->fd))
 +      die("close(%s): %m", f->name);
      }
    xfree(f);
  }
@@@ -167,8 -165,8 +167,8 @@@ bfmm_config(struct fastbuf *f, uns item
      }
  }
  
 -static struct fastbuf *
 +struct fastbuf *
- bfmmopen_internal(int fd, byte *name, uns mode)
+ bfmmopen_internal(int fd, const byte *name, uns mode)
  {
    int namelen = strlen(name) + 1;
    struct fb_mmap *F = xmalloc(sizeof(struct fb_mmap) + namelen);
diff --cc lib/fb-param.c
index e487ced4068bb9e01fb26792ad820a84c28abf5b,0000000000000000000000000000000000000000..d9e7a22129394d9734337afe6f801e7ffb95c1c4
mode 100644,000000..100644
--- /dev/null
@@@ -1,129 -1,0 +1,132 @@@
- bopen_fd_internal(int fd, struct fb_params *params, uns mode, byte *name)
 +/*
 + *    UCW Library -- FastIO on files with run-time parametrization
 + *
 + *    (c) 2007 Pavel Charvat <pchar@ucw.cz>
 + *
 + *    This software may be freely distributed and used according to the terms
 + *    of the GNU Lesser General Public License.
 + */
 +
 +#include "lib/lib.h"
 +#include "lib/conf.h"
 +#include "lib/lfs.h"
 +#include "lib/fastbuf.h"
 +
 +#include <fcntl.h>
 +#include <stdio.h>
 +
 +struct fb_params fbpar_def = {
 +  .buffer_size = 65536,
 +  .read_ahead = 1,
 +  .write_back = 1,
 +}; 
 +
 +struct cf_section fbpar_cf = {
 +# define F(x) PTR_TO(struct fb_params, x)
 +  CF_TYPE(struct fb_params),
 +  CF_ITEMS {
 +    CF_LOOKUP("Type", (int *)F(type), ((byte *[]){"std", "direct", "mmap", NULL})),
 +    CF_UNS("BufSize", F(buffer_size)),
 +    CF_UNS("KeepBackBuf", F(keep_back_buf)),
 +    CF_UNS("ReadAhead", F(read_ahead)),
 +    CF_UNS("WriteBack", F(write_back)),
 +    CF_END
 +  }
 +# undef F
 +};
 +
 +static struct cf_section fbpar_global_cf = {
 +  CF_ITEMS {
 +    CF_SECTION("Defaults", &fbpar_def, &fbpar_cf),
 +    CF_END
 +  }
 +};
 +
 +static void CONSTRUCTOR
 +fbpar_global_init(void)
 +{
 +  cf_declare_section("FBParam", &fbpar_global_cf, 0);
 +}
 +
 +static struct fastbuf *
-     sprintf(name = buf, "fd%d", fd);
++bopen_fd_internal(int fd, struct fb_params *params, uns mode, const byte *name)
 +{
 +  byte buf[32];
 +  if (!name)
-           log(L_WARN, "Cannot set O_DIRECT on fd %d: %m", fd);
++    {
++      sprintf(buf, "fd%d", fd);
++      name = buf;
++    }
 +  struct fastbuf *fb;
 +  switch (params->type)
 +    {
 +      case FB_STD:
 +      fb = bfdopen_internal(fd, name,
 +          params->buffer_size ? : fbpar_def.buffer_size);
 +      if (params->keep_back_buf)
 +        bconfig(fb, BCONFIG_KEEP_BACK_BUF, 1);
 +      return fb;
 +      case FB_DIRECT:
 +      fb = fbdir_open_fd_internal(fd, name, params->asio,
 +          params->buffer_size ? : fbpar_def.buffer_size,
 +          params->read_ahead ? : fbpar_def.read_ahead,
 +          params->write_back ? : fbpar_def.write_back);
 +      if (!~mode && !fbdir_cheat && ((int)(mode = fcntl(fd, F_GETFL)) < 0 || fcntl(fd, F_SETFL, mode | O_DIRECT)) < 0)
- bopen_file_internal(byte *name, int mode, struct fb_params *params, int try)
++          msg(L_WARN, "Cannot set O_DIRECT on fd %d: %m", fd);
 +      return fb;
 +      case FB_MMAP:
 +      if (!~mode && (int)(mode = fcntl(fd, F_GETFL)) < 0)
 +          die("Cannot get flags of fd %d: %m", fd);
 +      return bfmmopen_internal(fd, name, mode);
 +      default:
 +      ASSERT(0);
 +    }
 +}
 +
 +static struct fastbuf *
- bopen_file(byte *name, int mode, struct fb_params *params)
++bopen_file_internal(const byte *name, int mode, struct fb_params *params, int try)
 +{
 +  if (params->type == FB_DIRECT && !fbdir_cheat)
 +    mode |= O_DIRECT;
 +  if (params->type == FB_MMAP && (mode & O_ACCMODE) == O_WRONLY)
 +    mode = (mode & ~O_ACCMODE) | O_RDWR;
 +  int fd = sh_open(name, mode, 0666);
 +  if (fd < 0)
 +    if (try)
 +      return NULL;
 +    else
 +      die("Unable to %s file %s: %m", (mode & O_CREAT) ? "create" : "open", name);
 +  struct fastbuf *fb = bopen_fd_internal(fd, params, mode, name);
 +  ASSERT(fb);
 +  if (mode & O_APPEND)
 +    bseek(fb, 0, SEEK_END);
 +  return fb;
 +}
 +
 +struct fastbuf *
- bopen_file_try(byte *name, int mode, struct fb_params *params)
++bopen_file(const byte *name, int mode, struct fb_params *params)
 +{
 +  return bopen_file_internal(name, mode, params ? : &fbpar_def, 0);
 +}
 +
 +struct fastbuf *
++bopen_file_try(const byte *name, int mode, struct fb_params *params)
 +{
 +  return bopen_file_internal(name, mode, params ? : &fbpar_def, 1);
 +}
 +
 +struct fastbuf *
 +bopen_fd(int fd, struct fb_params *params)
 +{
 +  return bopen_fd_internal(fd, params ? : &fbpar_def, ~0U, NULL);
 +}
 +
 +struct fastbuf *
 +bopen_tmp_file(struct fb_params *params)
 +{
 +  byte buf[TEMP_FILE_NAME_LEN];
 +  temp_file_name(buf);
 +  struct fastbuf *fb = bopen_file_internal(buf, O_RDWR | O_CREAT | O_TRUNC, params, 0);
 +  bconfig(fb, BCONFIG_IS_TEMP_FILE, 1);
 +  return fb;
 +}
diff --cc lib/lib.h
Simple merge
diff --cc lib/mainloop.h
Simple merge
index 60c6db98ceb8c63185df0922b25faacbedede4fd,0000000000000000000000000000000000000000..db414dd08c6c033e99124ce00b373b68997a3881
mode 100644,000000..100644
--- /dev/null
@@@ -1,113 -1,0 +1,113 @@@
- #define SORT_TRACE(x...) do { if (sorter_trace) log(L_DEBUG, x); } while(0)
- #define SORT_XTRACE(level, x...) do { if (sorter_trace >= level) log(L_DEBUG, x); } while(0)
 +/*
 + *    UCW Library -- Universal Sorter: Common Declarations
 + *
 + *    (c) 2007 Martin Mares <mj@ucw.cz>
 + *
 + *    This software may be freely distributed and used according to the terms
 + *    of the GNU Lesser General Public License.
 + */
 +
 +#ifndef _UCW_SORTER_COMMON_H
 +#define _UCW_SORTER_COMMON_H
 +
 +#include "lib/clists.h"
 +
 +/* Configuration, some of the variables are used by the old sorter, too. */
 +extern uns sorter_trace, sorter_presort_bufsize, sorter_stream_bufsize;
 +extern uns sorter_debug, sorter_min_radix_bits, sorter_max_radix_bits;
 +extern u64 sorter_bufsize;
 +extern struct fb_params sorter_fb_params;
 +
++#define SORT_TRACE(x...) do { if (sorter_trace) msg(L_DEBUG, x); } while(0)
++#define SORT_XTRACE(level, x...) do { if (sorter_trace >= level) msg(L_DEBUG, x); } while(0)
 +
 +enum sort_debug {
 +  SORT_DEBUG_NO_PRESORT = 1,
 +  SORT_DEBUG_NO_JOIN = 2,
 +  SORT_DEBUG_KEEP_BUCKETS = 4,
 +  SORT_DEBUG_NO_RADIX = 8,
 +};
 +
 +struct sort_bucket;
 +
 +struct sort_context {
 +  struct fastbuf *in_fb;
 +  struct fastbuf *out_fb;
 +  uns hash_bits;
 +  u64 in_size;
 +
 +  struct mempool *pool;
 +  clist bucket_list;
 +  void *big_buf;
 +  size_t big_buf_size;
 +
 +  int (*custom_presort)(struct fastbuf *dest, void *buf, size_t bufsize);
 +
 +  // Take as much as possible from the source bucket, sort it in memory and dump to destination bucket.
 +  // Return 1 if there is more data available in the source bucket.
 +  int (*internal_sort)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket *out, struct sort_bucket *out_only);
 +
 +  // Estimate how much input data from `b' will fit in the internal sorting buffer.
 +  u64 (*internal_estimate)(struct sort_context *ctx, struct sort_bucket *b);
 +
 +  // Two-way split/merge: merge up to 2 source buckets to up to 2 destination buckets.
 +  // Bucket arrays are NULL-terminated.
 +  void (*twoway_merge)(struct sort_context *ctx, struct sort_bucket **ins, struct sort_bucket **outs);
 +
 +  // Radix split according to hash function
 +  void (*radix_split)(struct sort_context *ctx, struct sort_bucket *in, struct sort_bucket **outs, uns bitpos, uns numbits);
 +
 +  // State variables of internal_sort
 +  void *key_buf;
 +  int more_keys;
 +
 +  // Timing
 +  timestamp_t start_time;
 +  uns last_pass_time;
 +  uns total_int_time, total_pre_time, total_ext_time;
 +};
 +
 +void sorter_run(struct sort_context *ctx);
 +
 +/* Buffers */
 +
 +void *sorter_alloc(struct sort_context *ctx, uns size);
 +void sorter_prepare_buf(struct sort_context *ctx);
 +void sorter_alloc_buf(struct sort_context *ctx);
 +void sorter_free_buf(struct sort_context *ctx);
 +
 +/* Buckets */
 +
 +struct sort_bucket {
 +  cnode n;
 +  struct sort_context *ctx;
 +  uns flags;
 +  struct fastbuf *fb;
 +  byte *filename;
 +  u64 size;                           // Size in bytes (not valid when writing)
 +  uns runs;                           // Number of runs, 0 if not sorted
 +  uns hash_bits;                      // Remaining bits of the hash function
 +  byte *ident;                                // Identifier used in debug messages
 +};
 +
 +enum sort_bucket_flags {
 +  SBF_FINAL = 1,                      // This bucket corresponds to the final output file (always 1 run)
 +  SBF_SOURCE = 2,                     // Contains the source file (always 0 runs)
 +  SBF_CUSTOM_PRESORT = 4,             // Contains source to read via custom presorter
 +  SBF_OPEN_WRITE = 256,                       // We are currently writing to the fastbuf
 +  SBF_OPEN_READ = 512,                        // We are reading from the fastbuf
 +  SBF_DESTROYED = 1024,                       // Already done with, no further references allowed
 +  SBF_SWAPPED_OUT = 2048,             // Swapped out to a named file
 +};
 +
 +struct sort_bucket *sbuck_new(struct sort_context *ctx);
 +void sbuck_drop(struct sort_bucket *b);
 +int sbuck_have(struct sort_bucket *b);
 +int sbuck_has_file(struct sort_bucket *b);
 +sh_off_t sbuck_size(struct sort_bucket *b);
 +struct fastbuf *sbuck_read(struct sort_bucket *b);
 +struct fastbuf *sbuck_write(struct sort_bucket *b);
 +void sbuck_swap_out(struct sort_bucket *b);
 +void format_size(byte *buf, u64 x);
 +
 +#endif
index 8f2aacaa0500e413beae7d1b02cd6f1ded9ff37a,0000000000000000000000000000000000000000..bc108f58e84e14398592d0adf5ebff70cd9c6036
mode 100644,000000..100644
--- /dev/null
@@@ -1,413 -1,0 +1,413 @@@
-   log(L_INFO, "Test took %.3fs", get_timer(&timer) / 1000.);
 +/*
 + *    UCW Library -- Testing the Old Sorter
 + *
 + *    (c) 2007 Martin Mares <mj@ucw.cz>
 + *
 + *    This software may be freely distributed and used according to the terms
 + *    of the GNU Lesser General Public License.
 + */
 +
 +#include "lib/lib.h"
 +#include "lib/getopt.h"
 +#include "lib/conf.h"
 +#include "lib/fastbuf.h"
 +#include "lib/ff-binary.h"
 +#include "lib/hashfunc.h"
 +#include "lib/md5.h"
 +
 +#include <stdlib.h>
 +#include <stdio.h>
 +#include <string.h>
 +#include <fcntl.h>
 +#include <unistd.h>
 +
 +/*** Time measurement ***/
 +
 +static timestamp_t timer;
 +
 +static void
 +start(void)
 +{
 +  sync();
 +  init_timer(&timer);
 +}
 +
 +static void
 +stop(void)
 +{
 +  sync();
-   log(L_INFO, ">>> Integers (%s, N=%u)", ((char *[]) { "increasing", "decreasing", "random" })[mode], N);
++  msg(L_INFO, "Test took %.3fs", get_timer(&timer) / 1000.);
 +}
 +
 +/*** Simple 4-byte integer keys ***/
 +
 +struct key1 {
 +  u32 x;
 +};
 +
 +static inline int s1_compare(struct key1 *x, struct key1 *y)
 +{
 +  COMPARE(x->x, y->x);
 +  return 0;
 +}
 +
 +#define SORT_KEY struct key1
 +#define SORT_PREFIX(x) s1_##x
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_FB
 +#define SORT_UNIQUE
 +#define SORT_REGULAR
 +#define SORT_PRESORT
 +
 +#include "lib/sorter.h"
 +
 +static void
 +test_int(int mode, u64 size)
 +{
 +  uns N = size ? nextprime(MIN(size/4, 0xffff0000)) : 0;
 +  uns K = N/4*3;
-   log(L_INFO, ">>> Hashes (%s, N=%u)", ((char *[]) { "increasing", "decreasing", "random" })[mode], N);
++  msg(L_INFO, ">>> Integers (%s, N=%u)", ((char *[]) { "increasing", "decreasing", "random" })[mode], N);
 +
 +  struct fastbuf *f = bopen_tmp(65536);
 +  for (uns i=0; i<N; i++)
 +    bputl(f, (mode==0) ? i : (mode==1) ? N-1-i : ((u64)i * K + 17) % N);
 +  brewind(f);
 +
 +  start();
 +  f = s1_sort(f);
 +  stop();
 +
 +  SORT_XTRACE(2, "Verifying");
 +  for (uns i=0; i<N; i++)
 +    {
 +      uns j = bgetl(f);
 +      if (i != j)
 +      die("Discrepancy: %u instead of %u", j, i);
 +    }
 +  bclose(f);
 +}
 +
 +/*** Longer records with hashes (similar to Shepherd's index records) ***/
 +
 +struct key3 {
 +  u32 hash[4];
 +  u32 i;
 +  u32 payload[3];
 +};
 +
 +static inline int s3_compare(struct key3 *x, struct key3 *y)
 +{
 +  /* FIXME: Maybe unroll manually? */
 +  for (uns i=0; i<4; i++)
 +    COMPARE(x->hash[i], y->hash[i]);
 +  return 0;
 +}
 +
 +#define SORT_KEY struct key3
 +#define SORT_PREFIX(x) s3_##x
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_FB
 +#define SORT_REGULAR
 +#define SORT_PRESORT
 +
 +#include "lib/sorter.h"
 +
 +static void
 +gen_hash_key(int mode, struct key3 *k, uns i)
 +{
 +  k->i = i;
 +  k->payload[0] = 7*i + 13;
 +  k->payload[1] = 13*i + 19;
 +  k->payload[2] = 19*i + 7;
 +  switch (mode)
 +    {
 +    case 0:
 +      k->hash[0] = i;
 +      k->hash[1] = k->payload[0];
 +      k->hash[2] = k->payload[1];
 +      k->hash[3] = k->payload[2];
 +      break;
 +    case 1:
 +      k->hash[0] = ~i;
 +      k->hash[1] = k->payload[0];
 +      k->hash[2] = k->payload[1];
 +      k->hash[3] = k->payload[2];
 +      break;
 +    default: ;
 +      struct MD5Context ctx;
 +      MD5Init(&ctx);
 +      MD5Update(&ctx, (byte*) &k->i, 4);
 +      MD5Final((byte*) &k->hash, &ctx);
 +      break;
 +    }
 +}
 +
 +static void
 +test_hashes(int mode, u64 size)
 +{
 +  uns N = MIN(size / sizeof(struct key3), 0xffffffff);
-   log(L_INFO, ">>> Strings %s(N=%u)", (mode ? "with data " : ""), N);
++  msg(L_INFO, ">>> Hashes (%s, N=%u)", ((char *[]) { "increasing", "decreasing", "random" })[mode], N);
 +  struct key3 k, lastk;
 +
 +  struct fastbuf *f = bopen_tmp(65536);
 +  uns hash_sum = 0;
 +  for (uns i=0; i<N; i++)
 +    {
 +      gen_hash_key(mode, &k, i);
 +      hash_sum += k.hash[3];
 +      bwrite(f, &k, sizeof(k));
 +    }
 +  brewind(f);
 +
 +  start();
 +  f = s3_sort(f);
 +  stop();
 +
 +  SORT_XTRACE(2, "Verifying");
 +  for (uns i=0; i<N; i++)
 +    {
 +      int ok = breadb(f, &k, sizeof(k));
 +      ASSERT(ok);
 +      if (i && s3_compare(&k, &lastk) <= 0)
 +      ASSERT(0);
 +      gen_hash_key(mode, &lastk, k.i);
 +      if (memcmp(&k, &lastk, sizeof(k)))
 +      ASSERT(0);
 +      hash_sum -= k.hash[3];
 +    }
 +  ASSERT(!hash_sum);
 +  bclose(f);
 +}
 +
 +/*** Variable-length records (strings) with and without var-length data ***/
 +
 +#define KEY4_MAX 256
 +
 +struct key4 {
 +  uns len;
 +  byte s[KEY4_MAX];
 +};
 +
 +static inline int s4_fetch_key(struct fastbuf *f, struct key4 *x)
 +{
 +  int len = bgetl(f);
 +  if (len < 0)
 +    return 0;
 +  x->len = len;
 +  breadb(f, x->s, len);
 +  return 1;
 +}
 +
 +static inline void s4_copy_data(struct fastbuf *i UNUSED, struct fastbuf *f, struct key4 *x)
 +{
 +  bputl(f, x->len);
 +  bwrite(f, x->s, x->len);
 +}
 +
 +static inline int s4_compare(struct key4 *x, struct key4 *y)
 +{
 +  uns l = MIN(x->len, y->len);
 +  int c = memcmp(x->s, y->s, l);
 +  if (c)
 +    return c;
 +  COMPARE(x->len, y->len);
 +  return 0;
 +}
 +
 +static inline byte *s4_fetch_item(struct fastbuf *f UNUSED, struct key4 *x, byte *limit UNUSED)
 +{
 +  return &x->s[x->len];
 +}
 +
 +static inline void s4_store_item(struct fastbuf *f, struct key4 *x)
 +{
 +  s4_copy_data(NULL, f, x);
 +}
 +
 +#define SORT_KEY struct key4
 +#define SORT_PREFIX(x) s4_##x
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_FB
 +#define SORT_PRESORT
 +
 +#include "lib/sorter.h"
 +
 +#define s4b_compare s4_compare
 +#define s4b_fetch_key s4_fetch_key
 +
 +static inline uns s4_data_size(struct key4 *x)
 +{
 +  return x->len ? (x->s[0] ^ 0xad) : 0;
 +}
 +
 +static inline void s4b_copy_data(struct fastbuf *i, struct fastbuf *f, struct key4 *x)
 +{
 +  bputl(f, x->len);
 +  bwrite(f, x->s, x->len);
 +  bbcopy(i, f, s4_data_size(x));
 +}
 +
 +static inline byte *s4b_fetch_item(struct fastbuf *f, struct key4 *x, byte *limit)
 +{
 +  byte *d = &x->s[x->len];
 +  if (d + s4_data_size(x) > limit)
 +    return NULL;
 +  breadb(f, d, s4_data_size(x));
 +  return d + s4_data_size(x);
 +}
 +
 +static inline void s4b_store_item(struct fastbuf *f, struct key4 *x)
 +{
 +  bputl(f, x->len);
 +  bwrite(f, x->s, x->len + s4_data_size(x));
 +}
 +
 +#define SORT_KEY struct key4
 +#define SORT_PREFIX(x) s4b_##x
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_FB
 +#define SORT_PRESORT
 +
 +#include "lib/sorter.h"
 +
 +static void
 +gen_key4(struct key4 *k)
 +{
 +  k->len = random_max(KEY4_MAX);
 +  for (uns i=0; i<k->len; i++)
 +    k->s[i] = random();
 +}
 +
 +static void
 +gen_data4(byte *buf, uns len, uns h)
 +{
 +  while (len--)
 +    {
 +      *buf++ = h >> 24;
 +      h = h*259309 + 17;
 +    }
 +}
 +
 +static void
 +test_strings(uns mode, u64 size)
 +{
 +  uns avg_item_size = KEY4_MAX/2 + 4 + (mode ? 128 : 0);
 +  uns N = MIN(size / avg_item_size, 0xffffffff);
++  msg(L_INFO, ">>> Strings %s(N=%u)", (mode ? "with data " : ""), N);
 +  srand(1);
 +
 +  struct key4 k, lastk;
 +  byte buf[256], buf2[256];
 +  uns sum = 0;
 +
 +  struct fastbuf *f = bopen_tmp(65536);
 +  for (uns i=0; i<N; i++)
 +    {
 +      gen_key4(&k);
 +      s4_copy_data(NULL, f, &k);
 +      uns h = hash_block(k.s, k.len);
 +      sum += h;
 +      if (mode)
 +      {
 +        gen_data4(buf, s4_data_size(&k), h);
 +        bwrite(f, buf, s4_data_size(&k));
 +      }
 +    }
 +  brewind(f);
 +
 +  start();
 +  f = (mode ? s4b_sort : s4_sort)(f);
 +  stop();
 +
 +  SORT_XTRACE(2, "Verifying");
 +  for (uns i=0; i<N; i++)
 +    {
 +      int ok = s4_fetch_key(f, &k);
 +      ASSERT(ok);
 +      uns h = hash_block(k.s, k.len);
 +      if (mode && s4_data_size(&k))
 +      {
 +        ok = breadb(f, buf, s4_data_size(&k));
 +        ASSERT(ok);
 +        gen_data4(buf2, s4_data_size(&k), h);
 +        ASSERT(!memcmp(buf, buf2, s4_data_size(&k)));
 +      }
 +      if (i && s4_compare(&k, &lastk) < 0)
 +      ASSERT(0);
 +      sum -= h;
 +      lastk = k;
 +    }
 +  ASSERT(!sum);
 +  bclose(f);
 +}
 +
 +/*** Main ***/
 +
 +static void
 +run_test(uns i, u64 size)
 +{
 +  switch (i)
 +    {
 +    case 0:
 +      test_int(0, size); break;
 +    case 1:
 +      test_int(1, size); break;
 +    case 2:
 +      test_int(2, size); break;
 +    case 3:
 +    case 4:
 +    case 5:
 +      break;
 +    case 6:
 +      test_hashes(0, size); break;
 +    case 7:
 +      test_hashes(1, size); break;
 +    case 8:
 +      test_hashes(2, size); break;
 +    case 9:
 +      test_strings(0, size); break;
 +    case 10:
 +      test_strings(1, size); break;
 +#define TMAX 11
 +    }
 +}
 +
 +int
 +main(int argc, char **argv)
 +{
 +  log_init(NULL);
 +  int c;
 +  u64 size = 10000000;
 +  uns t = ~0;
 +
 +  while ((c = cf_getopt(argc, argv, CF_SHORT_OPTS "s:t:v", CF_NO_LONG_OPTS, NULL)) >= 0)
 +    switch (c)
 +      {
 +      case 's':
 +      if (cf_parse_u64(optarg, &size))
 +        goto usage;
 +      break;
 +      case 't':
 +      t = atol(optarg);
 +      if (t >= TMAX)
 +        goto usage;
 +      break;
 +      case 'v':
 +      sorter_trace++;
 +      break;
 +      default:
 +      usage:
 +      fputs("Usage: sort-test [-v] [-s <size>] [-t <test>]\n", stderr);
 +      exit(1);
 +      }
 +  if (optind != argc)
 +    goto usage;
 +
 +  if (t != ~0U)
 +    run_test(t, size);
 +  else
 +    for (uns i=0; i<TMAX; i++)
 +      run_test(i, size);
 +
 +  return 0;
 +}
index f103710070e9115e4b1544323b5b24ec2e725969,0000000000000000000000000000000000000000..be6fd511681ec5b12b0bc74fdcb1b0b43934ff52
mode 100644,000000..100644
--- /dev/null
@@@ -1,688 -1,0 +1,688 @@@
-   log(L_INFO, "Test took %.3fs", get_timer(&timer) / 1000.);
 +/*
 + *    UCW Library -- Testing the Sorter
 + *
 + *    (c) 2007 Martin Mares <mj@ucw.cz>
 + *
 + *    This software may be freely distributed and used according to the terms
 + *    of the GNU Lesser General Public License.
 + */
 +
 +#include "lib/lib.h"
 +#include "lib/getopt.h"
 +#include "lib/conf.h"
 +#include "lib/fastbuf.h"
 +#include "lib/ff-binary.h"
 +#include "lib/hashfunc.h"
 +#include "lib/md5.h"
 +
 +#include <stdlib.h>
 +#include <stdio.h>
 +#include <string.h>
 +#include <fcntl.h>
 +#include <unistd.h>
 +
 +/*** Time measurement ***/
 +
 +static timestamp_t timer;
 +
 +static void
 +start(void)
 +{
 +  sync();
 +  init_timer(&timer);
 +}
 +
 +static void
 +stop(void)
 +{
 +  sync();
-   log(L_INFO, ">>> Integers (%s, N=%u)", ((char *[]) { "increasing", "decreasing", "random" })[mode], N);
++  msg(L_INFO, "Test took %.3fs", get_timer(&timer) / 1000.);
 +}
 +
 +/*** Simple 4-byte integer keys ***/
 +
 +struct key1 {
 +  u32 x;
 +};
 +
 +#define SORT_KEY_REGULAR struct key1
 +#define SORT_PREFIX(x) s1_##x
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_FB
 +#define SORT_UNIQUE
 +#define SORT_INT(k) (k).x
 +#define SORT_DELETE_INPUT 0
 +
 +#include "lib/sorter/sorter.h"
 +
 +static void
 +test_int(int mode, u64 size)
 +{
 +  uns N = size ? nextprime(MIN(size/4, 0xffff0000)) : 0;
 +  uns K = N/4*3;
-   log(L_INFO, ">>> Counted integers (%s, N=%u, mult=%u)", ((char *[]) { "increasing", "decreasing", "random" })[mode], N, mult);
++  msg(L_INFO, ">>> Integers (%s, N=%u)", ((char *[]) { "increasing", "decreasing", "random" })[mode], N);
 +
 +  struct fastbuf *f = bopen_tmp(65536);
 +  for (uns i=0; i<N; i++)
 +    bputl(f, (mode==0) ? i : (mode==1) ? N-1-i : ((u64)i * K + 17) % N);
 +  brewind(f);
 +
 +  start();
 +  f = s1_sort(f, NULL, N-1);
 +  stop();
 +
 +  SORT_XTRACE(2, "Verifying");
 +  for (uns i=0; i<N; i++)
 +    {
 +      uns j = bgetl(f);
 +      if (i != j)
 +      die("Discrepancy: %u instead of %u", j, i);
 +    }
 +  bclose(f);
 +}
 +
 +/*** Integers with merging, but no data ***/
 +
 +struct key2 {
 +  u32 x;
 +  u32 cnt;
 +};
 +
 +static inline void s2_write_merged(struct fastbuf *f, struct key2 **k, void **d UNUSED, uns n, void *buf UNUSED)
 +{
 +  for (uns i=1; i<n; i++)
 +    k[0]->cnt += k[i]->cnt;
 +  bwrite(f, k[0], sizeof(struct key2));
 +}
 +
 +#define SORT_KEY_REGULAR struct key2
 +#define SORT_PREFIX(x) s2_##x
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_FB
 +#define SORT_UNIFY
 +#define SORT_INT(k) (k).x
 +
 +#include "lib/sorter/sorter.h"
 +
 +static void
 +test_counted(int mode, u64 size)
 +{
 +  u64 items = size / sizeof(struct key2);
 +  uns mult = 2;
 +  while (items/(2*mult) > 0xffff0000)
 +    mult++;
 +  uns N = items ? nextprime(items/(2*mult)) : 0;
 +  uns K = N/4*3;
-   log(L_INFO, ">>> Hashes (%s, N=%u)", ((char *[]) { "increasing", "decreasing", "random" })[mode], N);
++  msg(L_INFO, ">>> Counted integers (%s, N=%u, mult=%u)", ((char *[]) { "increasing", "decreasing", "random" })[mode], N, mult);
 +
 +  struct fastbuf *f = bopen_tmp(65536);
 +  for (uns m=0; m<mult; m++)
 +    for (uns i=0; i<N; i++)
 +      for (uns j=0; j<2; j++)
 +      {
 +        bputl(f, (mode==0) ? (i%N) : (mode==1) ? N-1-(i%N) : ((u64)i * K + 17) % N);
 +        bputl(f, 1);
 +      }
 +  brewind(f);
 +
 +  start();
 +  f = s2_sort(f, NULL, N-1);
 +  stop();
 +
 +  SORT_XTRACE(2, "Verifying");
 +  for (uns i=0; i<N; i++)
 +    {
 +      uns j = bgetl(f);
 +      if (i != j)
 +      die("Discrepancy: %u instead of %u", j, i);
 +      uns k = bgetl(f);
 +      if (k != 2*mult)
 +      die("Discrepancy: %u has count %u instead of %u", j, k, mult);
 +    }
 +  bclose(f);
 +}
 +
 +/*** Longer records with hashes (similar to Shepherd's index records) ***/
 +
 +struct key3 {
 +  u32 hash[4];
 +  u32 i;
 +  u32 payload[3];
 +};
 +
 +static inline int s3_compare(struct key3 *x, struct key3 *y)
 +{
 +  /* FIXME: Maybe unroll manually? */
 +  for (uns i=0; i<4; i++)
 +    COMPARE(x->hash[i], y->hash[i]);
 +  return 0;
 +}
 +
 +static inline uns s3_hash(struct key3 *x)
 +{
 +  return x->hash[0];
 +}
 +
 +#define SORT_KEY_REGULAR struct key3
 +#define SORT_PREFIX(x) s3_##x
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_FB
 +#define SORT_HASH_BITS 32
 +
 +#include "lib/sorter/sorter.h"
 +
 +static void
 +gen_hash_key(int mode, struct key3 *k, uns i)
 +{
 +  k->i = i;
 +  k->payload[0] = 7*i + 13;
 +  k->payload[1] = 13*i + 19;
 +  k->payload[2] = 19*i + 7;
 +  switch (mode)
 +    {
 +    case 0:
 +      k->hash[0] = i;
 +      k->hash[1] = k->payload[0];
 +      k->hash[2] = k->payload[1];
 +      k->hash[3] = k->payload[2];
 +      break;
 +    case 1:
 +      k->hash[0] = ~i;
 +      k->hash[1] = k->payload[0];
 +      k->hash[2] = k->payload[1];
 +      k->hash[3] = k->payload[2];
 +      break;
 +    default: ;
 +      struct MD5Context ctx;
 +      MD5Init(&ctx);
 +      MD5Update(&ctx, (byte*) &k->i, 4);
 +      MD5Final((byte*) &k->hash, &ctx);
 +      break;
 +    }
 +}
 +
 +static void
 +test_hashes(int mode, u64 size)
 +{
 +  uns N = MIN(size / sizeof(struct key3), 0xffffffff);
-   log(L_INFO, ">>> Strings %s(N=%u)", (mode ? "with data " : ""), N);
++  msg(L_INFO, ">>> Hashes (%s, N=%u)", ((char *[]) { "increasing", "decreasing", "random" })[mode], N);
 +  struct key3 k, lastk;
 +
 +  struct fastbuf *f = bopen_tmp(65536);
 +  uns hash_sum = 0;
 +  for (uns i=0; i<N; i++)
 +    {
 +      gen_hash_key(mode, &k, i);
 +      hash_sum += k.hash[3];
 +      bwrite(f, &k, sizeof(k));
 +    }
 +  brewind(f);
 +
 +  start();
 +  f = s3_sort(f, NULL);
 +  stop();
 +
 +  SORT_XTRACE(2, "Verifying");
 +  for (uns i=0; i<N; i++)
 +    {
 +      int ok = breadb(f, &k, sizeof(k));
 +      ASSERT(ok);
 +      if (i && s3_compare(&k, &lastk) <= 0)
 +      ASSERT(0);
 +      gen_hash_key(mode, &lastk, k.i);
 +      if (memcmp(&k, &lastk, sizeof(k)))
 +      ASSERT(0);
 +      hash_sum -= k.hash[3];
 +    }
 +  ASSERT(!hash_sum);
 +  bclose(f);
 +}
 +
 +/*** Variable-length records (strings) with and without var-length data ***/
 +
 +#define KEY4_MAX 256
 +
 +struct key4 {
 +  uns len;
 +  byte s[KEY4_MAX];
 +};
 +
 +static inline int s4_compare(struct key4 *x, struct key4 *y)
 +{
 +  uns l = MIN(x->len, y->len);
 +  int c = memcmp(x->s, y->s, l);
 +  if (c)
 +    return c;
 +  COMPARE(x->len, y->len);
 +  return 0;
 +}
 +
 +static inline int s4_read_key(struct fastbuf *f, struct key4 *x)
 +{
 +  x->len = bgetl(f);
 +  if (x->len == 0xffffffff)
 +    return 0;
 +  ASSERT(x->len < KEY4_MAX);
 +  breadb(f, x->s, x->len);
 +  return 1;
 +}
 +
 +static inline void s4_write_key(struct fastbuf *f, struct key4 *x)
 +{
 +  ASSERT(x->len < KEY4_MAX);
 +  bputl(f, x->len);
 +  bwrite(f, x->s, x->len);
 +}
 +
 +#define SORT_KEY struct key4
 +#define SORT_PREFIX(x) s4_##x
 +#define SORT_KEY_SIZE(x) (sizeof(struct key4) - KEY4_MAX + (x).len)
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_FB
 +
 +#include "lib/sorter/sorter.h"
 +
 +#define s4b_compare s4_compare
 +#define s4b_read_key s4_read_key
 +#define s4b_write_key s4_write_key
 +
 +static inline uns s4_data_size(struct key4 *x)
 +{
 +  return x->len ? (x->s[0] ^ 0xad) : 0;
 +}
 +
 +#define SORT_KEY struct key4
 +#define SORT_PREFIX(x) s4b_##x
 +#define SORT_KEY_SIZE(x) (sizeof(struct key4) - KEY4_MAX + (x).len)
 +#define SORT_DATA_SIZE(x) s4_data_size(&(x))
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_FB
 +
 +#include "lib/sorter/sorter.h"
 +
 +static void
 +gen_key4(struct key4 *k)
 +{
 +  k->len = random_max(KEY4_MAX);
 +  for (uns i=0; i<k->len; i++)
 +    k->s[i] = random();
 +}
 +
 +static void
 +gen_data4(byte *buf, uns len, uns h)
 +{
 +  while (len--)
 +    {
 +      *buf++ = h >> 24;
 +      h = h*259309 + 17;
 +    }
 +}
 +
 +static void
 +test_strings(uns mode, u64 size)
 +{
 +  uns avg_item_size = KEY4_MAX/2 + 4 + (mode ? 128 : 0);
 +  uns N = MIN(size / avg_item_size, 0xffffffff);
-   log(L_INFO, ">>> Graph%s (N=%u)", (mode ? "" : " with custom presorting"), N);
++  msg(L_INFO, ">>> Strings %s(N=%u)", (mode ? "with data " : ""), N);
 +  srand(1);
 +
 +  struct key4 k, lastk;
 +  byte buf[256], buf2[256];
 +  uns sum = 0;
 +
 +  struct fastbuf *f = bopen_tmp(65536);
 +  for (uns i=0; i<N; i++)
 +    {
 +      gen_key4(&k);
 +      s4_write_key(f, &k);
 +      uns h = hash_block(k.s, k.len);
 +      sum += h;
 +      if (mode)
 +      {
 +        gen_data4(buf, s4_data_size(&k), h);
 +        bwrite(f, buf, s4_data_size(&k));
 +      }
 +    }
 +  brewind(f);
 +
 +  start();
 +  f = (mode ? s4b_sort : s4_sort)(f, NULL);
 +  stop();
 +
 +  SORT_XTRACE(2, "Verifying");
 +  for (uns i=0; i<N; i++)
 +    {
 +      int ok = s4_read_key(f, &k);
 +      ASSERT(ok);
 +      uns h = hash_block(k.s, k.len);
 +      if (mode && s4_data_size(&k))
 +      {
 +        ok = breadb(f, buf, s4_data_size(&k));
 +        ASSERT(ok);
 +        gen_data4(buf2, s4_data_size(&k), h);
 +        ASSERT(!memcmp(buf, buf2, s4_data_size(&k)));
 +      }
 +      if (i && s4_compare(&k, &lastk) < 0)
 +      ASSERT(0);
 +      sum -= h;
 +      lastk = k;
 +    }
 +  ASSERT(!sum);
 +  bclose(f);
 +}
 +
 +/*** Graph-like structure with custom presorting ***/
 +
 +struct key5 {
 +  u32 x;
 +  u32 cnt;
 +};
 +
 +static uns s5_N, s5_K, s5_L, s5_i, s5_j;
 +
 +struct s5_pair {
 +  uns x, y;
 +};
 +
 +static int s5_gen(struct s5_pair *p)
 +{
 +  if (s5_j >= s5_N)
 +    {
 +      if (s5_i >= s5_N-1)
 +      return 0;
 +      s5_j = 0;
 +      s5_i++;
 +    }
 +  p->x = ((u64)s5_j * s5_K) % s5_N;
 +  p->y = ((u64)(s5_i + s5_j) * s5_L) % s5_N;
 +  s5_j++;
 +  return 1;
 +}
 +
 +#define ASORT_PREFIX(x) s5m_##x
 +#define ASORT_KEY_TYPE u32
 +#define ASORT_ELT(i) ary[i]
 +#define ASORT_EXTRA_ARGS , u32 *ary
 +#include "lib/arraysort.h"
 +
 +static void s5_write_merged(struct fastbuf *f, struct key5 **keys, void **data, uns n, void *buf)
 +{
 +  u32 *a = buf;
 +  uns m = 0;
 +  for (uns i=0; i<n; i++)
 +    {
 +      memcpy(&a[m], data[i], 4*keys[i]->cnt);
 +      m += keys[i]->cnt;
 +    }
 +  s5m_sort(m, a);
 +  keys[0]->cnt = m;
 +  bwrite(f, keys[0], sizeof(struct key5));
 +  bwrite(f, a, 4*m);                  /* FIXME: Might overflow here */
 +}
 +
 +static void s5_copy_merged(struct key5 **keys, struct fastbuf **data, uns n, struct fastbuf *dest)
 +{
 +  u32 k[n];
 +  uns m = 0;
 +  for (uns i=0; i<n; i++)
 +    {
 +      k[i] = bgetl(data[i]);
 +      m += keys[i]->cnt;
 +    }
 +  struct key5 key = { .x = keys[0]->x, .cnt = m };
 +  bwrite(dest, &key, sizeof(key));
 +  while (key.cnt--)
 +    {
 +      uns b = 0;
 +      for (uns i=1; i<n; i++)
 +      if (k[i] < k[b])
 +        b = i;
 +      bputl(dest, k[b]);
 +      if (--keys[b]->cnt)
 +      k[b] = bgetl(data[b]);
 +      else
 +      k[b] = ~0U;
 +    }
 +}
 +
 +static inline int s5p_lt(struct s5_pair x, struct s5_pair y)
 +{
 +  COMPARE_LT(x.x, y.x);
 +  COMPARE_LT(x.y, y.y);
 +  return 0;
 +}
 +
 +/* FIXME: Use smarter internal sorter when it's available */
 +#define ASORT_PREFIX(x) s5p_##x
 +#define ASORT_KEY_TYPE struct s5_pair
 +#define ASORT_ELT(i) ary[i]
 +#define ASORT_LT(x,y) s5p_lt(x,y)
 +#define ASORT_EXTRA_ARGS , struct s5_pair *ary
 +#include "lib/arraysort.h"
 +
 +static int s5_presort(struct fastbuf *dest, void *buf, size_t bufsize)
 +{
 +  uns max = MIN(bufsize/sizeof(struct s5_pair), 0xffffffff);
 +  struct s5_pair *a = buf;
 +  uns n = 0;
 +  while (n<max && s5_gen(&a[n]))
 +    n++;
 +  if (!n)
 +    return 0;
 +  s5p_sort(n, a);
 +  uns i = 0;
 +  while (i < n)
 +    {
 +      uns j = i;
 +      while (i < n && a[i].x == a[j].x)
 +      i++;
 +      struct key5 k = { .x = a[j].x, .cnt = i-j };
 +      bwrite(dest, &k, sizeof(k));
 +      while (j < i)
 +      bputl(dest, a[j++].y);
 +    }
 +  return 1;
 +}
 +
 +#define SORT_KEY_REGULAR struct key5
 +#define SORT_PREFIX(x) s5_##x
 +#define SORT_DATA_SIZE(k) (4*(k).cnt)
 +#define SORT_UNIFY
 +#define SORT_UNIFY_WORKSPACE(k) SORT_DATA_SIZE(k)
 +#define SORT_INPUT_PRESORT
 +#define SORT_OUTPUT_THIS_FB
 +#define SORT_INT(k) (k).x
 +
 +#include "lib/sorter/sorter.h"
 +
 +#define SORT_KEY_REGULAR struct key5
 +#define SORT_PREFIX(x) s5b_##x
 +#define SORT_DATA_SIZE(k) (4*(k).cnt)
 +#define SORT_UNIFY
 +#define SORT_UNIFY_WORKSPACE(k) SORT_DATA_SIZE(k)
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_THIS_FB
 +#define SORT_INT(k) (k).x
 +#define s5b_write_merged s5_write_merged
 +#define s5b_copy_merged s5_copy_merged
 +
 +#include "lib/sorter/sorter.h"
 +
 +static void
 +test_graph(uns mode, u64 size)
 +{
 +  uns N = 3;
 +  while ((u64)N*(N+2)*4 < size)
 +    N = nextprime(N);
-   log(L_INFO, ">>> 64-bit integers (%s, N=%llu)", ((char *[]) { "increasing", "decreasing", "random" })[mode], (long long)N);
++  msg(L_INFO, ">>> Graph%s (N=%u)", (mode ? "" : " with custom presorting"), N);
 +  s5_N = N;
 +  s5_K = N/4*3;
 +  s5_L = N/3*2;
 +  s5_i = s5_j = 0;
 +
 +  struct fastbuf *in = NULL;
 +  if (mode)
 +    {
 +      struct s5_pair p;
 +      in = bopen_tmp(65536);
 +      while (s5_gen(&p))
 +      {
 +        struct key5 k = { .x = p.x, .cnt = 1 };
 +        bwrite(in, &k, sizeof(k));
 +        bputl(in, p.y);
 +      }
 +      brewind(in);
 +    }
 +
 +  start();
 +  struct fastbuf *f = bopen_tmp(65536);
 +  bputl(f, 0xfeedcafe);
 +  struct fastbuf *g = (mode ? s5b_sort(in, f, s5_N-1) : s5_sort(NULL, f, s5_N-1));
 +  ASSERT(f == g);
 +  stop();
 +
 +  SORT_XTRACE(2, "Verifying");
 +  uns c = bgetl(f);
 +  ASSERT(c == 0xfeedcafe);
 +  for (uns i=0; i<N; i++)
 +    {
 +      struct key5 k;
 +      int ok = breadb(f, &k, sizeof(k));
 +      ASSERT(ok);
 +      ASSERT(k.x == i);
 +      ASSERT(k.cnt == N);
 +      for (uns j=0; j<N; j++)
 +      {
 +        uns y = bgetl(f);
 +        ASSERT(y == j);
 +      }
 +    }
 +  bclose(f);
 +}
 +
 +/*** Simple 8-byte integer keys ***/
 +
 +struct key6 {
 +  u64 x;
 +};
 +
 +#define SORT_KEY_REGULAR struct key6
 +#define SORT_PREFIX(x) s6_##x
 +#define SORT_INPUT_FB
 +#define SORT_OUTPUT_FB
 +#define SORT_UNIQUE
 +#define SORT_INT64(k) (k).x
 +
 +#include "lib/sorter/sorter.h"
 +
 +static void
 +test_int64(int mode, u64 size)
 +{
 +  u64 N = size ? nextprime(MIN(size/8, 0xffff0000)) : 0;
 +  u64 K = N/4*3;
++  msg(L_INFO, ">>> 64-bit integers (%s, N=%llu)", ((char *[]) { "increasing", "decreasing", "random" })[mode], (long long)N);
 +
 +  struct fastbuf *f = bopen_tmp(65536);
 +  for (u64 i=0; i<N; i++)
 +    bputq(f, 777777*((mode==0) ? i : (mode==1) ? N-1-i : ((u64)i * K + 17) % N));
 +  brewind(f);
 +
 +  start();
 +  f = s6_sort(f, NULL, 777777*(N-1));
 +  stop();
 +
 +  SORT_XTRACE(2, "Verifying");
 +  for (u64 i=0; i<N; i++)
 +    {
 +      u64 j = bgetq(f);
 +      if (777777*i != j)
 +      die("Discrepancy: %llu instead of %llu", (long long)j, 777777*(long long)i);
 +    }
 +  bclose(f);
 +}
 +
 +/*** Main ***/
 +
 +static void
 +run_test(uns i, u64 size)
 +{
 +  switch (i)
 +    {
 +    case 0:
 +      test_int(0, size); break;
 +    case 1:
 +      test_int(1, size); break;
 +    case 2:
 +      test_int(2, size); break;
 +    case 3:
 +      test_counted(0, size); break;
 +    case 4:
 +      test_counted(1, size); break;
 +    case 5:
 +      test_counted(2, size); break;
 +    case 6:
 +      test_hashes(0, size); break;
 +    case 7:
 +      test_hashes(1, size); break;
 +    case 8:
 +      test_hashes(2, size); break;
 +    case 9:
 +      test_strings(0, size); break;
 +    case 10:
 +      test_strings(1, size); break;
 +    case 11:
 +      test_graph(0, size); break;
 +    case 12:
 +      test_graph(1, size); break;
 +    case 13:
 +      test_int64(0, size); break;
 +    case 14:
 +      test_int64(1, size); break;
 +    case 15:
 +      test_int64(2, size); break;
 +#define TMAX 16
 +    }
 +}
 +
 +int
 +main(int argc, char **argv)
 +{
 +  log_init(NULL);
 +  int c;
 +  u64 size = 10000000;
 +  uns t = ~0;
 +
 +  while ((c = cf_getopt(argc, argv, CF_SHORT_OPTS "d:s:t:v", CF_NO_LONG_OPTS, NULL)) >= 0)
 +    switch (c)
 +      {
 +      case 'd':
 +      sorter_debug = atol(optarg);
 +      break;
 +      case 's':
 +      if (cf_parse_u64(optarg, &size))
 +        goto usage;
 +      break;
 +      case 't':
 +      t = atol(optarg);
 +      if (t >= TMAX)
 +        goto usage;
 +      break;
 +      case 'v':
 +      sorter_trace++;
 +      break;
 +      default:
 +      usage:
 +      fputs("Usage: sort-test [-v] [-d <debug>] [-s <size>] [-t <test>]\n", stderr);
 +      exit(1);
 +      }
 +  if (optind != argc)
 +    goto usage;
 +
 +  if (t != ~0U)
 +    run_test(t, size);
 +  else
 +    for (uns i=0; i<TMAX; i++)
 +      run_test(i, size);
 +
 +  return 0;
 +}
diff --cc lib/stkstring.c
Simple merge
diff --cc lib/stkstring.h
index a874344c1f7b1fd1d3569457207bc4854be2410f,49431c181a9809fc3954c10ec8ac046b3536f0c8..2a310cad2ff6370559efb12d3d002432a9d8b229
  #define stk_strjoin(s,n,sep) ({ char **_s=(s); int _n=(n); char *_x=alloca(stk_array_len(_s,_n)+_n-1); stk_array_join(_x, _s, _n, (sep)); _x; })
  #define stk_printf(f...) ({ uns _l=stk_printf_internal(f); char *_x=alloca(_l); sprintf(_x, f); _x; })
  #define stk_vprintf(f, args) ({ uns _l=stk_vprintf_internal(f, args); char *_x=alloca(_l); vsprintf(_x, f, args); _x; })
- #define stk_hexdump(s,n) ({ uns _n=(n); char *_x=alloca(3*_n+1); stk_hexdump_internal(_x,(byte*)(s),_n); _x; })
- #define stk_str_unesc(s) ({ byte *_s=(s); byte *_d=alloca(strlen(_s)+1); str_unesc(_d, _s); _d; })
+ #define stk_hexdump(s,n) ({ uns _n=(n); char *_x=alloca(3*_n+1); stk_hexdump_internal(_x,(char*)(s),_n); _x; })
+ #define stk_str_unesc(s) ({ const char *_s=(s); char *_d=alloca(strlen(_s)+1); str_unesc(_d, _s); _d; })
 +#define stk_fsize(n) ({ char *_s=alloca(16); stk_fsize_internal(_s, n); _s; })
  
  uns stk_array_len(char **s, uns cnt);
  void stk_array_join(char *x, char **s, uns cnt, uns sep);
  uns stk_printf_internal(const char *x, ...) FORMAT_CHECK(printf,1,2);
  uns stk_vprintf_internal(const char *x, va_list args);
- void stk_hexdump_internal(char *dst, byte *src, uns n);
+ void stk_hexdump_internal(char *dst, const byte *src, uns n);
 +void stk_fsize_internal(char *dst, u64 size);
 +
 +#endif
diff --cc lib/stkstring.t
Simple merge