#!/usr/bin/perl # Batch EXecutor -- Parallel Execution Using Screen # (c) 2011-2013 Martin Mares use strict; use warnings; use feature 'switch'; use experimental 'smartmatch'; use Getopt::Long; use POSIX; use BEX; my $queue_name; my $text_mode; my $debug = 0; my $debug_children; sub usage() { print <] [[!] ...] Options: --debug Log status changes to stderr --debug-children Log stdout and stderr of child processes to ./debug.log -p, --parallel= Set limit on the number of jobs run in parallel -q, --queue= Run jobs in the given queue --text Use plain-text user interface instead of curses AMEN exit 0; } GetOptions( "q|queue=s" => \$queue_name, "text!" => \$text_mode, "debug+" => \$debug, "debug-children!" => \$debug_children, "p|parallel=i" => \$BEX::Config::max_parallel_jobs, "help" => \&usage, ) or die "Try `bex prun --help' for more information.\n"; system 'tmux', 'has-session'; !$? or die "You need to start tmux first.\n"; my $queue = BEX::Queue->new($queue_name); my $fifo_name = $queue->{'Path'} . '/status-fifo'; unlink $fifo_name; mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!"; open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!"; my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new); my @machines = (); for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) { my @jobs = $queue->scan($mach); @jobs or next; push @machines, $mach; for (@jobs) { $ui->update($mach, $_, 'READY'); } } my %running = (); my $max = $BEX::Config::max_parallel_jobs; while (keys %running || @machines) { if (@machines && keys %running < $max) { my $mach = shift @machines; $ui->update($mach, undef, 'START'); my @tm = ('tmux', 'new-window', '-n', $mach, '-d'); my $P5LIB = $ENV{"PERL5LIB"} // ""; my @cmd = ( "BEX_HOME='$BEX::Config::home'", "BEX_LIB='$BEX::Config::lib'", "PERL5LIB='$P5LIB'", "$BEX::Config::lib/bin/bex-run", "--status-fifo=$fifo_name", "--queue=" . $queue->{'Name'}, $mach, ); push @cmd, ">>debug.log", "2>&1" if $debug_children; push @tm, join(" ", @cmd); system @tm; !$? or $ui->update($mach, undef, 'INTERR'); $running{$mach} = 'START'; next; } $_ = ; chomp; print STDERR "<< $_\n" if $debug; my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/; if (!defined $stat) { $ui->err("Received invalid status message <$_>"); next; } if (!defined $running{$mach}) { $ui->err("Received status message <$_> for a machine which does not run") unless $stat eq 'DONE'; next; } $running{$mach} = $stat; $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat); if ($stat eq 'DONE') { delete $running{$mach}; } } close FIFO; unlink $fifo_name; $ui->done; package BEX::bprun::text; sub new($) { return bless {}; } sub done($) { } sub update($$$$) { my ($ui, $mach, $jid, $stat) = @_; print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n"; } sub err($$) { my ($ui, $msg) = @_; print STDERR "ERROR: $msg\n"; } package BEX::bprun::curses; use Curses; my $C; my $nrows; my @by_row; my %by_host; my %host_state; my %host_cnt; my %job_state; my %job_cnt; my %host_last_fail_job; my %host_last_fail_stat; my @states; my %state_to_pri; BEGIN { @by_row = (); %by_host = (); @states = qw(unknown ready running done failed); %state_to_pri = ( 'unknown' => 0, 'ready' => 1, 'done' => 2, 'failed' => 3, 'running' => 4, ); } sub new($) { $C = new Curses; start_color; has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n"; cbreak; noecho; $C->intrflush(0); $C->keypad(1); $C->meta(1); $C->clear; init_pair(1, COLOR_YELLOW, COLOR_BLUE); init_pair(2, COLOR_YELLOW, COLOR_RED); init_pair(3, COLOR_YELLOW, COLOR_BLACK); init_pair(4, COLOR_RED, COLOR_BLACK); init_pair(5, COLOR_BLUE, COLOR_BLACK); $nrows = $C->getmaxy - 2; if ($BEX::Config::max_parallel_jobs > $nrows) { $BEX::Config::max_parallel_jobs = $nrows; } %host_state = %host_cnt = (); %job_state = %job_cnt = (); for my $s (@states) { $host_cnt{$s} = 0; $job_cnt{'*'}{$s} = 0; } my $ui = bless {}; $ui->refresh_status; return $ui; } sub done($) { $C->bkgdset(COLOR_PAIR(1) | A_BOLD); $C->addstr($C->getmaxy-1, 0, "Press any key to quit..."); $C->clrtoeol; $C->getch; endwin; } sub err($$) { my ($ui, $msg) = @_; $C->bkgdset(COLOR_PAIR(2) | A_BOLD); $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx); $C->clrtoeol; $C->refresh; } sub set_host_status($$$) { my ($ui, $mach, $stat) = @_; print STDERR "H: $mach $stat\n" if $debug; my $prev_stat = $host_state{$mach}; if (defined $prev_stat) { $host_cnt{$prev_stat}--; } else { for my $s (@states) { $job_cnt{$mach}{$s} = 0; } } $host_state{$mach} = $stat; $host_cnt{$stat}++; } sub set_job_status($$$$) { my ($ui, $mach, $jid, $stat) = @_; print STDERR "J: $mach $jid $stat\n" if $debug; my $prev_stat = $job_state{$mach}{$jid} // 'unknown'; $job_cnt{$mach}{$prev_stat}--; $job_cnt{'*'}{$prev_stat}--; $job_state{$mach}{$jid} = $stat; $job_cnt{$mach}{$stat}++; $job_cnt{'*'}{$stat}++; } sub refresh_status($) { $C->bkgdset(COLOR_PAIR(1) | A_BOLD); $C->addnstr(0, 0, sprintf("BEX Hosts: %dR %dD %dE %dW Jobs: %dR %dD %dE %dW", $host_cnt{'running'}, $host_cnt{'done'}, $host_cnt{'failed'}, $host_cnt{'ready'}, $job_cnt{'*'}{'running'}, $job_cnt{'*'}{'done'}, $job_cnt{'*'}{'failed'}, $job_cnt{'*'}{'ready'}, ), $C->getmaxx); $C->clrtoeol; $C->refresh; } sub get_slot($) { my ($mach) = @_; my $s = $by_host{$mach}; if (!defined $s) { $s = $by_host{$mach} = { 'Host' => $mach }; } return $s; } my $place_counter; sub place_slot($) { my ($s) = @_; $s->{'LastUpdate'} = $place_counter++; return $s if defined $s->{'Row'}; my $pri = $state_to_pri{$host_state{$s->{'Host'}}}; my ($best, $besti); my $bestpri = 99; for my $i (0..$nrows-1) { my $r = $by_row[$i]; if (!defined $r) { $besti = $i; $best = undef; last; } my $rpri = $state_to_pri{$host_state{$r->{'Host'}}}; print STDERR "I: ... considering ", $r->{'Host'}, " (pri $rpri, lu ", $r->{'LastUpdate'}, ")\n" if $debug > 1; next if $rpri > $pri; if ($rpri < $bestpri || $rpri == $bestpri && $r->{'LastUpdate'} < $best->{'LastUpdate'}) { # Trick: $best must be defined, as otherwise $bestpri == 99 $best = $r; $besti = $i; $bestpri = $rpri; } } if (defined $besti) { if ($best) { print STDERR "I: Replacing ", $best->{'Host'}, " (pri $bestpri)\n" if $debug; delete $best->{'Row'}; } print STDERR "I: Allocated ", $s->{'Host'}, " \@$besti (pri $pri)\n" if $debug; $s->{'Row'} = $besti; $by_row[$besti] = $s; } else { print STDERR "I: No place for ", $s->{'Host'}, " (pri $pri)\n" if $debug; } } sub redraw_slot($) { my ($s) = @_; my $r = $s->{'Row'} // return; $r++; my $mach = $s->{'Host'}; my $stat = $s->{'Status'} // "?"; my $jid = $s->{'Job'} // ""; my $jname = ($jid eq "" ? "" : $queue->job_name($jid)); my $jcnt = $job_cnt{$mach}; if ($jcnt->{'running'}) { if ($jcnt->{'failed'}) { $C->bkgdset(COLOR_PAIR(4) | A_BOLD); } else { $C->bkgdset(COLOR_PAIR(3) | A_BOLD); } } else { if ($jcnt->{'failed'}) { $C->bkgdset(COLOR_PAIR(4)); } else { $C->bkgdset(0); } } $C->addstr($r, 0, sprintf("%-20.20s", $mach)); if ($jcnt->{'failed'}) { $C->bkgdset(COLOR_PAIR(4)); $C->addstr(sprintf("%3dE ", $jcnt->{'failed'})); } else { $C->bkgdset(0); $C->addstr(" "); } $C->bkgdset(0); $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'})); if ($stat eq 'DONE') { my $lfs = $host_last_fail_stat{$mach}; my $lfj = $host_last_fail_job{$mach}; if (defined $lfs) { $C->bkgdset(($lfs eq 'NOPING') ? COLOR_PAIR(5) : COLOR_PAIR(4)); $C->addstr(sprintf(" %-8s %s", $lfs, $lfj ? $queue->job_name($lfj) : "")); } } else { my $text = sprintf(" %-8s %s", $stat, $jname); $C->addstr($text); } $C->clrtoeol; $C->refresh; } sub update($$$$) { my ($ui, $mach, $jid, $stat) = @_; given ($stat) { when ('READY') { # Pseudo-state generated internally $ui->set_host_status($mach, 'ready'); $ui->set_job_status($mach, $jid, 'ready'); } when ('OK') { $ui->set_job_status($mach, $jid, 'done'); } when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL', 'NOXFER']) { $ui->set_job_status($mach, $jid, 'failed'); $host_last_fail_job{$mach} = $jid; $host_last_fail_stat{$mach} = $stat; } when ('DONE') { if ($job_cnt{$mach}{'failed'}) { $ui->set_host_status($mach, 'failed'); } else { $ui->set_host_status($mach, 'done'); } } when ('INIT') { $ui->set_host_status($mach, 'running'); $ui->set_job_status($mach, $jid, 'running') if defined $jid; } when ('LOCKED') { if (defined $jid) { $ui->set_job_status($mach, $jid, 'failed'); } else { for my $j (keys %{$job_state{$mach}}) { $ui->set_job_status($mach, $j, 'failed'); } $ui->set_host_status($mach, 'failed'); $host_last_fail_job{$mach} = $jid; $host_last_fail_stat{$mach} = $stat; } } when (['START', 'PING', 'PREP', 'SEND', 'RUN']) { } default { $ui->err("Received unknown job status $stat"); } } my $s = get_slot($mach); $s->{'Job'} = $jid; $s->{'Status'} = $stat; place_slot($s); redraw_slot($s); $ui->refresh_status; }