#!/usr/bin/perl # Batch EXecutor -- Show Queued Jobs # (c) 2011-2015 Martin Mares use strict; use warnings; use Getopt::Long; use POSIX; use BEX; my $op_by_job; my $op_by_host; my $op_rm; my $op_move_to; my $queue_name; my $attention; my $filenames; my $given_job; my $summary; my $state_regex; my $why; sub usage() { print <] [[!] ...] Actions: --by-job Show jobs sorted by job ID (default) -h, --by-host Show jobs sorted by host --rm Remove jobs from the queue --move-to= Move jobs to a different queue Options: -a, --attn Show only jobs needing attention (e.g., failures) -f, --filenames Show filenames of jobs and log files -j, --job= Act on the specified job (default: on all) -q, --queue= Act on the given queue -s, --summary Show only a summary -S, --state= Act only on jobs whose state matches the given regex -w, --why[=] In case of failed jobs, display last few lines of output AMEN exit 0; } Getopt::Long::Configure("bundling"); GetOptions( "by-job!" => \$op_by_job, "h|by-host!" => \$op_by_host, "rm!" => \$op_rm, "move-to=s" => \$op_move_to, "a|attn!" => \$attention, "f|filenames!" => \$filenames, "j|job=s" => \$given_job, "q|queue=s" => \$queue_name, "s|summary!" => \$summary, "S|state=s" => \$state_regex, "w|why:i" => \$why, "help" => \&usage, ) or die "Try `bex queue --help' for more information.\n"; my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); my $queue = BEX::Queue->new($queue_name); $given_job = $queue->resolve_job_id($given_job) if defined $given_job; # Status cache my %status_cache = (); sub get_status($$) { my ($m, $j) = @_; $status_cache{$m}{$j} or $status_cache{$m}{$j} = $queue->read_job_status($m, $j); return $status_cache{$m}{$j}; } sub get_stat($$) { my ($m, $j) = @_; return get_status($m, $j)->{'Status'} // 'UNKNOWN'; } # Select jobs my %jobs = (); my %machs = (); for my $m (@machines) { for my $j ($queue->scan($m)) { if (defined $given_job) { next if $j ne $given_job; } if (defined $attention) { next if get_stat($m, $j) =~ m{^(NEW|NOPING)$}; } if (defined $state_regex) { next unless get_stat($m, $j) =~ m{^($state_regex)$}; } push @{$jobs{$j}}, $m; push @{$machs{$m}}, $j; } } sub do_ls(); sub do_rm(); sub do_move_to(); my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to); if ($ops > 1) { die "Multiple actions are not allowed\n"; } if ($op_rm) { do_rm(); } elsif (defined $op_move_to) { do_move_to(); } else { do_ls(); } exit 0; sub do_ls() { my %stat = (); my %mach_locked = (); my %cnt_by_job = (); my %cnt_by_mach = (); my %why = (); for my $m (keys %machs) { $mach_locked{$m} = $queue->is_locked($m, undef); for my $j (@{$machs{$m}}) { my $st = get_status($m, $j); my $s = get_stat($m, $j); my $adir = $queue->attachment_dir($j); my $has_adir = -d $adir; $cnt_by_job{$j}{$s}++; $cnt_by_mach{$m}{$s}++; $why{$m}{$j} = ""; if ($filenames) { $why{$m}{$j} .= "\t\t== Job file: " . $queue->queue_file($m, $j) . "\n"; $why{$m}{$j} .= "\t\t== Attachments: $adir\n" if $has_adir; } if (defined($st->{'Time'}) && defined($st->{'Status'})) { $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' . POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']'; if (defined($why) && $st->{'Status'} eq 'FAILED') { my $lines = $why ? $why : 3; my $log = $queue->log_file($m, $j); if (-f $log) { $why{$m}{$j} .= join("", map { "\t\t>> $_" } `tail -n$lines $log`); } } } else { $stat{$m}{$j} = ''; } if ($mach_locked{$m} || $queue->is_locked($m, $j)) { $stat{$m}{$j} .= ' [LOCKED]'; } $stat{$m}{$j} .= ' +ATT' if $has_adir; } } if ($queue->is_locked(undef, undef)) { print "### Queue lock present\n\n"; } if ($summary) { if ($op_by_host) { for my $m (sort keys %cnt_by_mach) { print "$m: ", join(" ", map { "$_:" . $cnt_by_mach{$m}{$_} } sort keys %{$cnt_by_mach{$m}}), "\n"; } } else { for my $j (sort keys %cnt_by_job) { print $queue->job_name($j), ": ", join(" ", map { "$_:" . $cnt_by_job{$j}{$_} } sort keys %{$cnt_by_job{$j}}), "\n"; } } } else { if ($op_by_host) { for my $m (sort keys %machs) { print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n"; for my $j (@{$machs{$m}}) { print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n"; print $why{$m}{$j}; } } } else { for my $j (sort keys %jobs) { print $queue->job_name($j), "\n"; for my $m (sort @{$jobs{$j}}) { print "\t$m", $stat{$m}{$j}, "\n"; print $why{$m}{$j}; } } } } } sub do_rm() { my $err = 0; for my $m (sort keys %machs) { for my $j (sort @{$machs{$m}}) { if (!$queue->lock($m, $j)) { print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n"; $err = 1; } else { $queue->update_job_status($m, $j, 'REMOVED'); $queue->remove($m, $j); print "Removed $m:", $queue->job_name($j), "\n"; } } } $queue->unlock; exit $err; } sub do_move_to() { my $err = 0; my $dest = BEX::Queue->new($op_move_to); $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n"; for my $j (sort keys %jobs) { my $job = BEX::Job->new_from_file($queue->job_file($j)); for my $m (sort @{$jobs{$j}}) { if (!$queue->lock($m, $j)) { print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n"; $err = 1; } else { my $enq = $dest->enqueue($m, $job); if ($enq) { $dest->update_job_status($m, $job->id, 'NEW', 'Moved to this queue'); } else { $dest->log($m, $job->id, 'REQUEUE', 'Moved to this queue'); } $queue->update_job_status($m, $job->id, 'REMOVED', 'Moved from this queue'); $queue->remove($m, $j); print "Moved $m:", $dest->job_name($j); print " (already queued)" if !$enq; print "\n"; } } } $queue->unlock; exit $err; }