2 # Batch EXecutor -- Parallel Execution Using Screen
3 # (c) 2011-2013 Martin Mares <mj@ucw.cz>
8 use experimental 'smartmatch';
20 Usage: bex prun [<options>] [[!]<machine-or-group> ...]
23 --debug Log status changes to stderr
24 --debug-children Log stdout and stderr of child processes to ./debug.log
25 -p, --parallel=<n> Set limit on the number of jobs run in parallel
26 -q, --queue=<name> Run jobs in the given queue
27 --text Use plain-text user interface instead of curses
33 "q|queue=s" => \$queue_name,
34 "text!" => \$text_mode,
36 "debug-children!" => \$debug_children,
37 "p|parallel=i" => \$BEX::Config::max_parallel_jobs,
39 ) or die "Try `bex prun --help' for more information.\n";
41 system 'tmux', 'has-session';
42 !$? or die "You need to start tmux first.\n";
44 my $queue = BEX::Queue->new($queue_name);
45 my $fifo_name = $queue->{'Path'} . '/status-fifo';
47 mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!";
48 open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!";
50 my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new);
53 for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) {
54 my @jobs = $queue->scan($mach);
56 push @machines, $mach;
57 for (@jobs) { $ui->update($mach, $_, 'READY'); }
61 my $max = $BEX::Config::max_parallel_jobs;
63 while (keys %running || @machines) {
64 if (@machines && keys %running < $max) {
65 my $mach = shift @machines;
66 $ui->update($mach, undef, 'START');
67 my @tm = ('tmux', 'new-window', '-n', $mach, '-d');
68 my $P5LIB = $ENV{"PERL5LIB"} // "";
70 "BEX_HOME='$BEX::Config::home'",
71 "BEX_LIB='$BEX::Config::lib'",
73 "$BEX::Config::lib/bin/bex-run",
74 "--status-fifo=$fifo_name",
75 "--queue=" . $queue->{'Name'},
78 push @cmd, ">>debug.log", "2>&1" if $debug_children;
79 push @tm, join(" ", @cmd);
81 !$? or $ui->update($mach, undef, 'INTERR');
82 $running{$mach} = 'START';
87 print STDERR "<< $_\n" if $debug;
88 my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/;
90 $ui->err("Received invalid status message <$_>");
93 if (!defined $running{$mach}) {
94 $ui->err("Received status message <$_> for a machine which does not run")
95 unless $stat eq 'DONE';
98 $running{$mach} = $stat;
99 $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat);
100 if ($stat eq 'DONE') {
101 delete $running{$mach};
109 package BEX::bprun::text;
119 my ($ui, $mach, $jid, $stat) = @_;
120 print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n";
125 print STDERR "ERROR: $msg\n";
128 package BEX::bprun::curses;
144 my %host_last_fail_job;
145 my %host_last_fail_stat;
153 @states = qw(unknown ready running done failed);
166 has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n";
172 init_pair(1, COLOR_YELLOW, COLOR_BLUE);
173 init_pair(2, COLOR_YELLOW, COLOR_RED);
174 init_pair(3, COLOR_YELLOW, COLOR_BLACK);
175 init_pair(4, COLOR_RED, COLOR_BLACK);
176 init_pair(5, COLOR_BLUE, COLOR_BLACK);
178 $nrows = $C->getmaxy - 2;
179 if ($BEX::Config::max_parallel_jobs > $nrows) {
180 $BEX::Config::max_parallel_jobs = $nrows;
183 %host_state = %host_cnt = ();
184 %job_state = %job_cnt = ();
185 for my $s (@states) {
187 $job_cnt{'*'}{$s} = 0;
197 $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
198 $C->addstr($C->getmaxy-1, 0, "Press any key to quit...");
206 $C->bkgdset(COLOR_PAIR(2) | A_BOLD);
207 $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx);
212 sub set_host_status($$$) {
213 my ($ui, $mach, $stat) = @_;
214 print STDERR "H: $mach $stat\n" if $debug;
215 my $prev_stat = $host_state{$mach};
216 if (defined $prev_stat) {
217 $host_cnt{$prev_stat}--;
219 for my $s (@states) { $job_cnt{$mach}{$s} = 0; }
221 $host_state{$mach} = $stat;
225 sub set_job_status($$$$) {
226 my ($ui, $mach, $jid, $stat) = @_;
227 print STDERR "J: $mach $jid $stat\n" if $debug;
228 my $prev_stat = $job_state{$mach}{$jid} // 'unknown';
229 $job_cnt{$mach}{$prev_stat}--;
230 $job_cnt{'*'}{$prev_stat}--;
231 $job_state{$mach}{$jid} = $stat;
232 $job_cnt{$mach}{$stat}++;
233 $job_cnt{'*'}{$stat}++;
236 sub refresh_status($) {
237 $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
239 sprintf("BEX Hosts: %dR %dD %dE %dW Jobs: %dR %dD %dE %dW",
240 $host_cnt{'running'},
244 $job_cnt{'*'}{'running'},
245 $job_cnt{'*'}{'done'},
246 $job_cnt{'*'}{'failed'},
247 $job_cnt{'*'}{'ready'},
255 my $s = $by_host{$mach};
257 $s = $by_host{$mach} = { 'Host' => $mach };
266 $s->{'LastUpdate'} = $place_counter++;
267 return $s if defined $s->{'Row'};
269 my $pri = $state_to_pri{$host_state{$s->{'Host'}}};
272 for my $i (0..$nrows-1) {
279 my $rpri = $state_to_pri{$host_state{$r->{'Host'}}};
280 print STDERR "I: ... considering ", $r->{'Host'}, " (pri $rpri, lu ", $r->{'LastUpdate'}, ")\n" if $debug > 1;
281 next if $rpri > $pri;
283 if ($rpri < $bestpri ||
284 $rpri == $bestpri && $r->{'LastUpdate'} < $best->{'LastUpdate'}) {
285 # Trick: $best must be defined, as otherwise $bestpri == 99
292 if (defined $besti) {
294 print STDERR "I: Replacing ", $best->{'Host'}, " (pri $bestpri)\n" if $debug;
295 delete $best->{'Row'};
297 print STDERR "I: Allocated ", $s->{'Host'}, " \@$besti (pri $pri)\n" if $debug;
298 $s->{'Row'} = $besti;
299 $by_row[$besti] = $s;
301 print STDERR "I: No place for ", $s->{'Host'}, " (pri $pri)\n" if $debug;
307 my $r = $s->{'Row'} // return;
309 my $mach = $s->{'Host'};
310 my $stat = $s->{'Status'} // "?";
311 my $jid = $s->{'Job'} // "";
312 my $jname = ($jid eq "" ? "" : $queue->job_name($jid));
313 my $jcnt = $job_cnt{$mach};
314 if ($jcnt->{'running'}) {
315 if ($jcnt->{'failed'}) {
316 $C->bkgdset(COLOR_PAIR(4) | A_BOLD);
318 $C->bkgdset(COLOR_PAIR(3) | A_BOLD);
321 if ($jcnt->{'failed'}) {
322 $C->bkgdset(COLOR_PAIR(4));
327 $C->addstr($r, 0, sprintf("%-20.20s", $mach));
328 if ($jcnt->{'failed'}) {
329 $C->bkgdset(COLOR_PAIR(4));
330 $C->addstr(sprintf("%3dE ", $jcnt->{'failed'}));
336 $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'}));
337 if ($stat eq 'DONE') {
338 my $lfs = $host_last_fail_stat{$mach};
339 my $lfj = $host_last_fail_job{$mach};
341 $C->bkgdset(($lfs eq 'NOPING') ? COLOR_PAIR(5) : COLOR_PAIR(4));
342 $C->addstr(sprintf(" %-8s %s", $lfs, $lfj ? $queue->job_name($lfj) : ""));
345 my $text = sprintf(" %-8s %s", $stat, $jname);
353 my ($ui, $mach, $jid, $stat) = @_;
356 # Pseudo-state generated internally
357 $ui->set_host_status($mach, 'ready');
358 $ui->set_job_status($mach, $jid, 'ready');
361 $ui->set_job_status($mach, $jid, 'done');
363 when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL', 'NOXFER']) {
364 $ui->set_job_status($mach, $jid, 'failed');
365 $host_last_fail_job{$mach} = $jid;
366 $host_last_fail_stat{$mach} = $stat;
369 if ($job_cnt{$mach}{'failed'}) {
370 $ui->set_host_status($mach, 'failed');
372 $ui->set_host_status($mach, 'done');
376 $ui->set_host_status($mach, 'running');
377 $ui->set_job_status($mach, $jid, 'running') if defined $jid;
381 $ui->set_job_status($mach, $jid, 'failed');
383 for my $j (keys %{$job_state{$mach}}) {
384 $ui->set_job_status($mach, $j, 'failed');
386 $ui->set_host_status($mach, 'failed');
387 $host_last_fail_job{$mach} = $jid;
388 $host_last_fail_stat{$mach} = $stat;
391 when (['START', 'PING', 'PREP', 'SEND', 'RUN']) {
394 $ui->err("Received unknown job status $stat");
397 my $s = get_slot($mach);
399 $s->{'Status'} = $stat;