#!/usr/bin/perl # Batch EXecutor 2.0 -- Parallel Execution Using Screen # (c) 2011 Martin Mares use strict; use warnings; use feature 'switch'; use Getopt::Long; use POSIX; use lib 'lib'; use BEX; my $queue_name; my $screen_session = 'BEX'; my $text_mode; GetOptions( "q|queue=s" => \$queue_name, "session=s" => \$screen_session, "text!" => \$text_mode, ) or die <] [[!] ...] Options: -q, --queue= Run jobs in the given queue --session= Job windows should be opened within the given screen session (default: BEX) --text Use textual user interface instead of curses AMEN system 'screen', '-S', $screen_session, '-X', 'select', '.'; !$? or die "Screen session $screen_session not found\n"; my $queue = BEX::Queue->new($queue_name); my @machines = (); my %job_counter = (); for my $m (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) { my @jobs = $queue->scan($m); @jobs or next; push @machines, $m; $job_counter{$m} = @jobs; } my $fifo_name = $queue->{'Name'} . '/status-fifo'; unlink $fifo_name; mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!"; open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!"; my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new); my %running = (); my $max = $BEX::Config::max_parallel_jobs; while (keys %running || @machines) { if (@machines && keys %running < $max) { my $mach = shift @machines; $ui->update($mach, undef, 'START'); my @scr = ('screen', '-t', $mach); push @scr, '-S', $screen_session if defined $screen_session; push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach; system @scr; !$? or $ui->update($mach, undef, 'INTERR'); $running{$mach} = 'START'; next; } $_ = ; chomp; my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/; if (!defined $stat) { $ui->err("Received invalid status message <$_>"); next; } if (!defined $running{$mach}) { $ui->err("Received status message <$_> for a machine which does not run"); next; } $running{$mach} = $stat; $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat); if ($stat eq 'DONE') { delete $running{$mach}; } } close FIFO; unlink $fifo_name; $ui->done; package BEX::bprun::text; sub new($) { return bless {}; } sub done($) { } sub update($$$$) { my ($ui, $mach, $jid, $stat) = @_; print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n"; } sub err($$) { my ($ui, $msg) = @_; print STDERR "ERROR: $msg\n"; } package BEX::bprun::curses; use Curses; my $C; my $nrows; my @by_row = (); my %by_host = (); my $total_hosts; my $active_hosts; my $done_hosts; my $failed_hosts; my $total_jobs; my $active_jobs; my $done_jobs; my $failed_jobs; my %host_active_jobs; my %host_done_jobs; my %host_failed_jobs; my %host_last_fail_job; my %host_last_fail_stat; sub new($) { $C = new Curses; start_color; has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n"; cbreak; noecho; $C->intrflush(0); $C->keypad(1); $C->meta(1); $C->clear; init_pair(1, COLOR_YELLOW, COLOR_BLUE); init_pair(2, COLOR_YELLOW, COLOR_RED); init_pair(3, COLOR_YELLOW, COLOR_BLACK); init_pair(4, COLOR_RED, COLOR_BLACK); $nrows = $C->getmaxy - 2; if ($BEX::Config::max_parallel_jobs > $nrows) { $BEX::Config::max_parallel_jobs = $nrows; } $total_hosts = $active_hosts = $done_hosts = $failed_hosts = 0; $total_jobs = $active_jobs = $done_jobs = $failed_jobs = 0; %host_active_jobs = %host_done_jobs = %host_failed_jobs = %host_last_fail_job = %host_last_fail_stat = (); for my $m (@machines) { $total_hosts++; $total_jobs += $job_counter{$m}; $host_active_jobs{$m} = $host_done_jobs{$m} = $host_failed_jobs{$m} = 0; } my $ui = bless {}; $ui->refresh_status; return $ui; } sub done($) { $C->bkgdset(COLOR_PAIR(1) | A_BOLD); $C->addstr($C->getmaxy-1, 0, "Press any key to quit..."); $C->clrtoeol; $C->getch; endwin; } sub err($$) { my ($ui, $msg) = @_; $C->bkgdset(COLOR_PAIR(2) | A_BOLD); $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx); $C->clrtoeol; $C->refresh; } sub refresh_status($) { $C->bkgdset(COLOR_PAIR(1) | A_BOLD); my $waiting_hosts = $total_hosts - $active_hosts - $done_hosts - $failed_hosts; my $waiting_jobs = $total_jobs - $active_jobs - $done_jobs - $failed_jobs; $C->addnstr(0, 0, "BEX Hosts: ${active_hosts}R ${done_hosts}D ${failed_hosts}E ${waiting_hosts}W Jobs: ${active_jobs}R ${done_jobs}D ${failed_jobs}E ${waiting_jobs}W", $C->getmaxx); $C->clrtoeol; $C->refresh; } sub get_slot($) { my ($mach) = @_; my $s; if (defined ($s = $by_host{$mach})) { delete $s->{'Gone'}; } else { my ($best, $besti); for my $i (0..$nrows-1) { my $r = $by_row[$i]; if (!defined $r) { $besti = $i; $best = undef; last; } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) { $besti = $i; $best = $r; } } if ($best) { delete $by_host{$best->{'Host'}}; } $s->{'Host'} = $mach; $s->{'Row'} = $besti; $by_host{$mach} = $s; $by_row[$besti] = $s; } return $s; } my $gone_counter = 1; sub delete_slot($) { my ($s) = @_; $s->{'Gone'} = $gone_counter++; } sub redraw_slot($) { my ($s) = @_; my $mach = $s->{'Host'}; my $stat = $s->{'Status'} // "?"; my $jid = $s->{'Job'} // ""; my $jname = ($jid eq "" ? "" : $queue->job_name($jid)); if ($host_active_jobs{$mach}) { if ($host_failed_jobs{$mach}) { $C->bkgdset(COLOR_PAIR(4) | A_BOLD); } else { $C->bkgdset(COLOR_PAIR(3) | A_BOLD); } } else { if ($host_failed_jobs{$mach}) { $C->bkgdset(COLOR_PAIR(4)); } else { $C->bkgdset(0); } } my $r = $s->{'Row'} + 1; $C->addstr($r, 0, sprintf("%-20.20s", $mach)); if ($host_failed_jobs{$mach}) { $C->bkgdset(COLOR_PAIR(4)); $C->addstr(sprintf("%3dE ", $host_failed_jobs{$mach})); } else { $C->bkgdset(0); $C->addstr(" "); } $C->bkgdset(0); $C->addstr(sprintf("%3dD %3dW", $host_done_jobs{$mach}, $job_counter{$mach} - $host_done_jobs{$mach} - $host_failed_jobs{$mach})); if ($stat eq 'DONE') { if (defined $host_last_fail_stat{$mach}) { $C->bkgdset(COLOR_PAIR(4)); $C->addstr(sprintf(" %-8s %s", $host_last_fail_stat{$mach}, $host_last_fail_job{$mach})); } } else { my $text = sprintf(" %-8s %s", $stat, $jname); $C->addstr($text); } $C->clrtoeol; $C->refresh; } sub update($$$$) { my ($ui, $mach, $jid, $stat) = @_; my $s = get_slot($mach); given ($stat) { when ('OK') { $active_jobs--; $done_jobs++; $host_active_jobs{$mach}--; $host_done_jobs{$mach}++; } when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) { $active_jobs--; $failed_jobs++; $host_active_jobs{$mach}--; $host_failed_jobs{$mach}++; $host_last_fail_job{$mach} = $jid; $host_last_fail_stat{$mach} = $stat; } when ('DONE') { $active_hosts--; if ($host_failed_jobs{$mach}) { $failed_hosts++; } else { $done_hosts++; } } when ('INIT') { if (defined $jid) { $active_hosts++; } else { $active_jobs++; $host_active_jobs{$mach}++; } } when ('LOCKED') { if ($jid eq '-') { $failed_jobs += $job_counter{$mach}; $host_failed_jobs{$mach} += $job_counter{$mach}; } else { $active_jobs--; $failed_jobs++; $host_active_jobs{$mach}--; $host_failed_jobs{$mach}++; $host_last_fail_job{$mach} = $jid; $host_last_fail_stat{$mach} = $stat; } } when (['START', 'PING', 'SEND', 'RUN']) { } default { $ui->err("Received unknown job status $stat"); } } $s->{'Job'} = $jid; $s->{'Status'} = $stat; redraw_slot($s); if ($stat eq 'DONE') { delete_slot($s); } $ui->refresh_status; }