]> mj.ucw.cz Git - misc.git/blob - ucw/qhash.c
qhash: two-level parallel SHA-1
[misc.git] / ucw / qhash.c
1 /* Two-level parallel SHA1 */
2 /* find -type f -print0 | qhash */
3
4 #undef LOCAL_DEBUG
5
6 #include <ucw/lib.h>
7 #include <ucw/fastbuf.h>
8 #include <ucw/sha1.h>
9 #include <ucw/string.h>
10 #include <ucw/workqueue.h>
11 #include <fcntl.h>
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <unistd.h>
15
16 #define BLOCK_SIZE 4096
17
18 #define THREADS 4
19 #define PIECES (2*THREADS)
20 #define BLOCKS_PER_PIECE 2
21
22 #ifdef THREADS
23
24 static int threads_inited;
25 static struct worker_pool thread_pool = {
26   .num_threads = THREADS,
27 };
28 static struct work_queue work_queue;
29
30 struct hash_work {
31   struct work w;
32   byte *buffer;
33   byte out[BLOCKS_PER_PIECE][SHA1_SIZE];
34   uint len;
35   uint out_len;
36   uint running;
37 };
38 static struct hash_work works[PIECES];
39
40 static void do_work(struct worker_thread *t UNUSED, struct work *w)
41 {
42   struct hash_work *hw = (struct hash_work *) w;
43
44   int i = 0;
45   byte *in = hw->buffer;
46   uint remains = hw->len;
47   while (remains)
48     {
49       uint len = MIN(remains, BLOCK_SIZE);
50       sha1_hash_buffer(hw->out[i], in, len);
51       in += len;
52       remains -= len;
53       i++;
54     }
55   hw->out_len = i;
56 }
57
58 static void init_threads(void)
59 {
60   if (threads_inited)
61     return;
62
63   worker_pool_init(&thread_pool);
64   work_queue_init(&thread_pool, &work_queue);
65
66   for (int i=0; i<PIECES; i++)
67     {
68       works[i].w.go = do_work;
69       works[i].buffer = big_alloc(BLOCK_SIZE * BLOCKS_PER_PIECE);
70     }
71   threads_inited = 1;
72 }
73
74 static void hash_file(char *name, char *buf)
75 {
76   int fd = open(name, O_RDONLY);
77   if (fd < 0)
78     {
79       fprintf(stderr, "Cannot open %s: %m\n", name);
80       strcpy(buf, "???");
81       return;
82     }
83
84   sha1_context main_ctx;
85   sha1_init(&main_ctx);
86   init_threads();
87
88   uint current_piece = 0;
89   uint running_pieces = 0;
90   uint eof = 0;
91   uint seen_err = 0;
92   size_t total = 0;
93
94   while (!eof || running_pieces)
95     {
96       if (!eof && running_pieces < PIECES)
97         {
98           struct hash_work *hw = &works[(current_piece + running_pieces) % PIECES];
99           int r = read(fd, hw->buffer, BLOCK_SIZE * BLOCKS_PER_PIECE);
100           if (r < 0)
101             {
102               fprintf(stderr, "Error reading %s: %m\n", name);
103               eof = seen_err = 1;
104             }
105           else if (!r)
106             eof = 1;
107           else
108             {
109               total += r;
110               hw->len = r;
111               hw->running = 1;
112               work_submit(&work_queue, &hw->w);
113               running_pieces++;
114             }
115         }
116       else
117         {
118           struct hash_work *hw = (struct hash_work *) work_wait(&work_queue);
119           hw->running = 0;
120           while (running_pieces && !works[current_piece].running)
121             {
122               hw = &works[current_piece];
123               for (uint i=0; i < hw->out_len; i++)
124                 {
125 #ifdef LOCAL_DEBUG
126                   char xx[SHA1_HEX_SIZE];
127                   mem_to_hex(xx, hw->out[i], SHA1_SIZE, 0);
128                   DBG("\t\t%s", xx);
129 #endif
130                   sha1_update(&main_ctx, hw->out[i], SHA1_SIZE);
131                 }
132               current_piece = (current_piece + 1) % PIECES;
133               running_pieces--;
134             }
135         }
136     }
137
138   close(fd);
139
140   if (seen_err)
141     {
142       strcpy(buf, "???");
143       return;
144     }
145
146   byte *h = sha1_final(&main_ctx);
147   buf += sprintf(buf, "%jd\t", total);
148   mem_to_hex(buf, h, SHA1_SIZE, 0);
149 }
150
151 #else
152
153 static void hash_file(char *name, char *buf)
154 {
155   int fd = open(name, O_RDONLY);
156   if (fd < 0)
157     {
158       fprintf(stderr, "Cannot open %s: %m\n", name);
159       strcpy(buf, "???");
160       return;
161     }
162
163   static byte *hash_buffer;
164   if (!hash_buffer)
165     hash_buffer = big_alloc(BLOCK_SIZE);
166
167   sha1_context block_ctx, main_ctx;
168   sha1_init(&main_ctx);
169
170   int len;
171   size_t total = 0;
172   while ((len = read(fd, hash_buffer, BLOCK_SIZE)) > 0)
173     {
174       sha1_init(&block_ctx);
175       sha1_update(&block_ctx, hash_buffer, len);
176       byte *h = sha1_final(&block_ctx);
177       sha1_update(&main_ctx, h, SHA1_SIZE);
178       total += len;
179     }
180   if (len < 0)
181     {
182       fprintf(stderr, "Error reading %s: %m\n", name);
183       close(fd);
184       strcpy(buf, "???");
185       return;
186     }
187
188   close(fd);
189
190   byte *h = sha1_final(&main_ctx);
191   buf += sprintf(buf, "%jd\t", total);
192   mem_to_hex(buf, h, SHA1_SIZE, 0);
193 }
194
195 #endif
196
197 int main(void)
198 {
199   struct fastbuf *in = bopen_fd(0, NULL);
200   struct fastbuf *out = bopen_fd(1, NULL);
201
202   char line[4096];
203   while (bgets0(in, line, sizeof(line)))
204     {
205       char hash[256];
206       hash_file(line, hash);
207       bprintf(out, "%s\t%s\n", line, hash);
208     }
209
210   bclose(out);
211   bclose(in);
212   return 0;
213 }