]> mj.ucw.cz Git - bex.git/blob - lib/bin/bex-prun
prun: Added a switch for setting the number of parallel jobs
[bex.git] / lib / bin / bex-prun
1 #!/usr/bin/perl
2 # Batch EXecutor 3.0 -- Parallel Execution Using Screen
3 # (c) 2011-2012 Martin Mares <mj@ucw.cz>
4
5 use strict;
6 use warnings;
7 use feature 'switch';
8 use Getopt::Long;
9 use POSIX;
10 use BEX;
11
12 my $queue_name;
13 my $text_mode;
14 my $debug_children;
15
16 sub usage() {
17         print <<AMEN ;
18 Usage: bex prun [<options>] [[!]<machine-or-class> ...]
19
20 Options:
21     --debug-children    Log stdout and stderr to ./debug.log
22 -p, --parallel=<n>      Set limit on the number of jobs run in parallel
23 -q, --queue=<name>      Run jobs in the given queue
24     --text              Use plain-text user interface instead of curses
25 AMEN
26         exit 0;
27 }
28
29 GetOptions(
30         "q|queue=s" => \$queue_name,
31         "text!" => \$text_mode,
32         "debug-children!" => \$debug_children,
33         "j|parallel=i" => \$BEX::Config::max_parallel_jobs,
34         "help" => \&usage,
35 ) or die "Try `bex prun --help' for more information.\n";
36
37 system 'tmux', 'has-session';
38 !$? or die "You need to start tmux first.\n";
39
40 my $queue = BEX::Queue->new($queue_name);
41 my $fifo_name = $queue->{'Path'} . '/status-fifo';
42 unlink $fifo_name;
43 mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!";
44 open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!";
45
46 my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new);
47
48 my @machines = ();
49 for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) {
50         my @jobs = $queue->scan($mach);
51         @jobs or next;
52         push @machines, $mach;
53         for (@jobs) { $ui->update($mach, $_, 'READY'); }
54 }
55
56 my %running = ();
57 my $max = $BEX::Config::max_parallel_jobs;
58
59 while (keys %running || @machines) {
60         if (@machines && keys %running < $max) {
61                 my $mach = shift @machines;
62                 $ui->update($mach, undef, 'START');
63                 my @tm = ('tmux', 'new-window', '-n', $mach, '-d');
64                 my $P5LIB = $ENV{"PERL5LIB"} // "";
65                 my @cmd = (
66                         "BEX_HOME='$BEX::Config::home'",
67                         "BEX_LIB='$BEX::Config::lib'",
68                         "PERL5LIB='$P5LIB'",
69                         "$BEX::Config::lib/bin/bex-run",
70                         "--status-fifo=$fifo_name",
71                         "--queue=" . $queue->{'Name'},
72                         $mach,
73                         );
74                 push @cmd, ">debug.log", "2>&1" if $debug_children;
75                 push @tm, join(" ", @cmd);
76                 system @tm;
77                 !$? or $ui->update($mach, undef, 'INTERR');
78                 $running{$mach} = 'START';
79                 next;
80         }
81         $_ = <FIFO>;
82         chomp;
83         my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/;
84         if (!defined $stat) {
85                 $ui->err("Received invalid status message <$_>");
86                 next;
87         }
88         if (!defined $running{$mach}) {
89                 $ui->err("Received status message <$_> for a machine which does not run");
90                 next;
91         }
92         $running{$mach} = $stat;
93         $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat);
94         if ($stat eq 'DONE') {
95                 delete $running{$mach};
96         }
97 }
98
99 close FIFO;
100 unlink $fifo_name;
101 $ui->done;
102
103 package BEX::bprun::text;
104
105 sub new($) {
106         return bless {};
107 }
108
109 sub done($) {
110 }
111
112 sub update($$$$) {
113         my ($ui, $mach, $jid, $stat) = @_;
114         print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n";
115 }
116
117 sub err($$) {
118         my ($ui, $msg) = @_;
119         print STDERR "ERROR: $msg\n";
120 }
121
122 package BEX::bprun::curses;
123
124 use Curses;
125
126 my $C;
127
128 my $nrows;
129 my @by_row = ();
130 my %by_host = ();
131
132 my %host_state;
133 my %host_cnt;
134
135 my %job_state;
136 my %job_cnt;
137
138 my %host_last_fail_job;
139 my %host_last_fail_stat;
140
141 sub new($) {
142         $C = new Curses;
143         start_color;
144         has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n";
145         cbreak; noecho;
146         $C->intrflush(0);
147         $C->keypad(1);
148         $C->meta(1);
149         $C->clear;
150         init_pair(1, COLOR_YELLOW, COLOR_BLUE);
151         init_pair(2, COLOR_YELLOW, COLOR_RED);
152         init_pair(3, COLOR_YELLOW, COLOR_BLACK);
153         init_pair(4, COLOR_RED, COLOR_BLACK);
154
155         $nrows = $C->getmaxy - 2;
156         if ($BEX::Config::max_parallel_jobs > $nrows) {
157                 $BEX::Config::max_parallel_jobs = $nrows;
158         }
159
160         %host_state = %host_cnt = ();
161         %job_state = %job_cnt = ();
162         for my $s ('unknown', 'ready', 'running', 'done', 'failed') {
163                 $host_cnt{$s} = 0;
164                 $job_cnt{'*'}{$s} = 0;
165         }
166
167         my $ui = bless {};
168         $ui->refresh_status;
169         return $ui;
170 }
171
172 sub done($)
173 {
174         $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
175         $C->addstr($C->getmaxy-1, 0, "Press any key to quit...");
176         $C->clrtoeol;
177         $C->getch;
178         endwin;
179 }
180
181 sub err($$) {
182         my ($ui, $msg) = @_;
183         $C->bkgdset(COLOR_PAIR(2) | A_BOLD);
184         $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx);
185         $C->clrtoeol;
186         $C->refresh;
187 }
188
189 sub set_host_status($$$) {
190         my ($ui, $mach, $stat) = @_;
191         my $prev_stat = $host_state{$mach};
192         if (defined $prev_stat) {
193                 $host_cnt{$prev_stat}--;
194         } else {
195                 for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; }
196         }
197         $host_state{$mach} = $stat;
198         $host_cnt{$stat}++;
199 }
200
201 sub set_job_status($$$$) {
202         my ($ui, $mach, $jid, $stat) = @_;
203         my $prev_stat = $job_state{$mach}{$jid} // 'unknown';
204         $job_cnt{$mach}{$prev_stat}--;
205         $job_cnt{'*'}{$prev_stat}--;
206         $job_state{$mach}{$jid} = $stat;
207         $job_cnt{$mach}{$stat}++;
208         $job_cnt{'*'}{$stat}++;
209 }
210
211 sub refresh_status($) {
212         $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
213         $C->addnstr(0, 0,
214                 sprintf("BEX  Hosts: %dR %dD %dE %dW  Jobs: %dR %dD %dE %dW",
215                         $host_cnt{'running'},
216                         $host_cnt{'done'},
217                         $host_cnt{'failed'},
218                         $host_cnt{'ready'},
219                         $job_cnt{'*'}{'running'},
220                         $job_cnt{'*'}{'done'},
221                         $job_cnt{'*'}{'failed'},
222                         $job_cnt{'*'}{'ready'},
223                 ), $C->getmaxx);
224         $C->clrtoeol;
225         $C->refresh;
226 }
227
228 sub get_slot($) {
229         my ($mach) = @_;
230         my $s;
231         if (defined ($s = $by_host{$mach})) {
232                 delete $s->{'Gone'};
233         } else {
234                 my ($best, $besti);
235                 for my $i (0..$nrows-1) {
236                         my $r = $by_row[$i];
237                         if (!defined $r) {
238                                 $besti = $i;
239                                 $best = undef;
240                                 last;
241                         } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) {
242                                 $besti = $i;
243                                 $best = $r;
244                         }
245                 }
246                 if ($best) {
247                         delete $by_host{$best->{'Host'}};
248                 }
249                 $s->{'Host'} = $mach;
250                 $s->{'Row'} = $besti;
251                 $by_host{$mach} = $s;
252                 $by_row[$besti] = $s;
253         }
254         return $s;
255 }
256
257 my $gone_counter = 1;
258 sub delete_slot($) {
259         my ($s) = @_;
260         $s->{'Gone'} = $gone_counter++;
261 }
262
263 sub redraw_slot($) {
264         my ($s) = @_;
265         my $mach = $s->{'Host'};
266         my $stat = $s->{'Status'} // "?";
267         my $jid = $s->{'Job'} // "";
268         my $jname = ($jid eq "" ? "" : $queue->job_name($jid));
269         my $jcnt = $job_cnt{$mach};
270         if ($jcnt->{'running'}) {
271                 if ($jcnt->{'failed'}) {
272                         $C->bkgdset(COLOR_PAIR(4) | A_BOLD);
273                 } else {
274                         $C->bkgdset(COLOR_PAIR(3) | A_BOLD);
275                 }
276         } else {
277                 if ($jcnt->{'failed'}) {
278                         $C->bkgdset(COLOR_PAIR(4));
279                 } else {
280                         $C->bkgdset(0);
281                 }
282         }
283         my $r = $s->{'Row'} + 1;
284         $C->addstr($r, 0, sprintf("%-20.20s", $mach));
285         if ($jcnt->{'failed'}) {
286                 $C->bkgdset(COLOR_PAIR(4));
287                 $C->addstr(sprintf("%3dE ", $jcnt->{'failed'}));
288         } else {
289                 $C->bkgdset(0);
290                 $C->addstr("     ");
291         }
292         $C->bkgdset(0);
293         $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'}));
294         if ($stat eq 'DONE') {
295                 if (defined $host_last_fail_stat{$mach}) {
296                         $C->bkgdset(COLOR_PAIR(4));
297                         $C->addstr(sprintf("  %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach})));
298                 }
299         } else {
300                 my $text = sprintf("  %-8s %s", $stat, $jname);
301                 $C->addstr($text);
302         }
303         $C->clrtoeol;
304         $C->refresh;
305 }
306
307 sub update($$$$) {
308         my ($ui, $mach, $jid, $stat) = @_;
309         my $s = get_slot($mach);
310         given ($stat) {
311                 when ('READY') {
312                         # Pseudo-state generated internally
313                         $ui->set_host_status($mach, 'ready');
314                         $ui->set_job_status($mach, $jid, 'ready');
315                 }
316                 when ('OK') {
317                         $ui->set_job_status($mach, $jid, 'done');
318                 }
319                 when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) {
320                         $ui->set_job_status($mach, $jid, 'failed');
321                         $host_last_fail_job{$mach} = $jid;
322                         $host_last_fail_stat{$mach} = $stat;
323                 }
324                 when ('DONE') {
325                         if ($job_cnt{$mach}{'failed'}) {
326                                 $ui->set_host_status($mach, 'failed');
327                         } else {
328                                 $ui->set_host_status($mach, 'done');
329                         }
330                 }
331                 when ('INIT') {
332                         $ui->set_host_status($mach, 'running');
333                         $ui->set_job_status($mach, $jid, 'running') if defined $jid;
334                 }
335                 when ('LOCKED') {
336                         if (defined $jid) {
337                                 $ui->set_job_status($mach, $jid, 'failed');
338                         } else {
339                                 for my $j (keys %{$job_state{$mach}}) {
340                                         $ui->set_job_status($mach, $jid, 'failed');
341                                 }
342                                 $ui->set_host_status($mach, 'failed');
343                                 $host_last_fail_job{$mach} = $jid;
344                                 $host_last_fail_stat{$mach} = $stat;
345                         }
346                 }
347                 when (['START', 'PING', 'SEND', 'RUN']) {
348                 }
349                 default {
350                         $ui->err("Received unknown job status $stat");
351                 }
352         }
353         $s->{'Job'} = $jid;
354         $s->{'Status'} = $stat;
355         redraw_slot($s);
356         if ($stat eq 'DONE') { delete_slot($s); }
357         $ui->refresh_status;
358 }