#!/usr/bin/perl
-# Batch EXecutor 3.0 -- Operations on Queues
+# Batch EXecutor 3.0 -- Show Queued Jobs
# (c) 2011-2012 Martin Mares <mj@ucw.cz>
use strict;
use warnings;
use Getopt::Long;
+use POSIX;
use BEX;
-my $init;
+my $op_by_job;
+my $op_by_host;
+my $op_rm;
+my $op_move_to;
+
+my $queue_name;
+my $filenames;
+my $given_job;
+my $summary;
+my $why;
sub usage() {
print <<AMEN ;
-Usage: bex queue [<options>] <subcommand>
+Usage: bex queue [<options and actions>] [[!]<machine-or-class> ...]
-Subcommands:
-init <queue> Create a new queue
-ls List all queues
+Actions:
+ --by-job Show jobs sorted by job ID (default)
+-h, --by-host Show jobs sorted by host
+ --rm Remove jobs from the queue
+ --move-to=<queue> Move jobs to a different queue
Options:
-None defined so far.
+-f, --filenames Show filenames of jobs and log files
+-j, --job=<id> Act on the specified job (default: on all)
+-q, --queue=<name> Act on the given queue
+-s, --summary Show only a summary
+-w, --why[=<lines>] In case of failed jobs, display last few lines of output
AMEN
exit 0;
}
+Getopt::Long::Configure("bundling");
GetOptions(
- "init!" => \$init,
+ "by-job!" => \$op_by_job,
+ "h|by-host!" => \$op_by_host,
+ "rm!" => \$op_rm,
+ "move-to=s" => \$op_move_to,
+ "f|filenames!" => \$filenames,
+ "j|job=s" => \$given_job,
+ "q|queue=s" => \$queue_name,
+ "s|summary!" => \$summary,
+ "w|why:i" => \$why,
"help" => \&usage,
) or die "Try `bex queue --help' for more information.\n";
-my $op = shift @ARGV // 'ls';
-
-if ($op eq 'init') {
- my $queue_name = shift @ARGV or die "bex queue init requires a queue name\n";
- my $path = $BEX::Config::home . '/' . $queue_name;
- -d $path and die "Queue directory $path already exists\n";
- mkdir $path;
- mkdir "$path/hosts";
- mkdir "$path/jobs";
- print "Queue $queue_name initialized.\n";
-} elsif ($op eq 'ls' && @ARGV == 0) {
- opendir D, $BEX::Config::home or die "Cannot read BEX home directory\n";
- for my $q (sort readdir D) {
- next if $q =~ /^\./;
- my $d = $BEX::Config::home . '/' . $q;
- if (-d $d && -d "$d/hosts" && -d "$d/jobs") {
- print "$q\n";
+my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
+my $queue = BEX::Queue->new($queue_name);
+
+# Select jobs
+my %jobs = ();
+my %machs = ();
+for my $m (@machines) {
+ for my $j ($queue->scan($m)) {
+ if (defined $given_job) {
+ next if $j ne $given_job;
+ }
+ push @{$jobs{$j}}, $m;
+ push @{$machs{$m}}, $j;
+ }
+}
+
+sub do_ls();
+sub do_rm();
+sub do_move_to();
+
+my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to);
+if ($ops > 1) { die "Multiple actions are not allowed\n"; }
+
+if ($op_rm) { do_rm(); }
+elsif (defined $op_move_to) { do_move_to(); }
+else { do_ls(); }
+exit 0;
+
+sub do_ls()
+{
+ my %stat = ();
+ my %mach_locked = ();
+ my %cnt_by_job = ();
+ my %cnt_by_mach = ();
+ my %why = ();
+ for my $m (keys %machs) {
+ $mach_locked{$m} = $queue->is_locked($m, undef);
+ for my $j (@{$machs{$m}}) {
+ my $st = $queue->read_job_status($m, $j);
+ my $s = $st->{"Status"} // "UNKNOWN";
+ $cnt_by_job{$j}{$s}++;
+ $cnt_by_mach{$m}{$s}++;
+ $why{$m}{$j} = "";
+ if ($filenames) {
+ $why{$m}{$j} .= "\t\t== Job file: " . $queue->queue_file($m, $j) . "\n";
+ }
+ if (defined($st->{'Time'}) && defined($st->{'Status'})) {
+ $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' .
+ POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']';
+ if (defined($why) && $st->{'Status'} eq 'FAILED') {
+ my $lines = $why ? $why : 3;
+ my $log = $queue->log_file($m, $j);
+ if (-f $log) {
+ $why{$m}{$j} .= join("", map { "\t\t>> $_" } `tail -n$lines $log`);
+ }
+ }
+ } else {
+ $stat{$m}{$j} = '';
+ }
+ if ($mach_locked{$m} || $queue->is_locked($m, $j)) {
+ $stat{$m}{$j} .= ' [LOCKED]';
+ }
+ }
+ }
+
+ if ($queue->is_locked(undef, undef)) {
+ print "### Queue lock present\n\n";
+ }
+
+ if ($summary) {
+ if ($op_by_host) {
+ for my $m (sort keys %cnt_by_mach) {
+ print "$m: ", join(" ", map { "$_:" . $cnt_by_mach{$m}{$_} } sort keys %{$cnt_by_mach{$m}}), "\n";
+ }
+ } else {
+ for my $j (sort keys %cnt_by_job) {
+ print $queue->job_name($j), ": ", join(" ", map { "$_:" . $cnt_by_job{$j}{$_} } sort keys %{$cnt_by_job{$j}}), "\n";
+ }
+ }
+ } else {
+ if ($op_by_host) {
+ for my $m (sort keys %machs) {
+ print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n";
+ for my $j (@{$machs{$m}}) {
+ print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n";
+ print $why{$m}{$j};
+ }
+ }
+ } else {
+ for my $j (sort keys %jobs) {
+ print $queue->job_name($j), "\n";
+ for my $m (sort @{$jobs{$j}}) {
+ print "\t$m", $stat{$m}{$j}, "\n";
+ print $why{$m}{$j};
+ }
+ }
+ }
+ }
+}
+
+sub do_rm()
+{
+ my $err = 0;
+ for my $m (sort keys %machs) {
+ for my $j (sort @{$machs{$m}}) {
+ if (!$queue->lock($m, $j)) {
+ print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n";
+ $err = 1;
+ } else {
+ $queue->update_job_status($m, $j, 'REMOVED');
+ $queue->remove($m, $j);
+ print "Removed $m:", $queue->job_name($j), "\n";
+ }
+ }
+ }
+ $queue->unlock;
+ exit $err;
+}
+
+sub do_move_to()
+{
+ my $err = 0;
+ my $dest = BEX::Queue->new($op_move_to);
+ $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n";
+ for my $j (sort keys %jobs) {
+ my $job = BEX::Job->new_from_file($queue->job_file($j));
+ for my $m (sort @{$jobs{$j}}) {
+ if (!$queue->lock($m, $j)) {
+ print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n";
+ $err = 1;
+ } else {
+ my $enq = $dest->enqueue($m, $job);
+ if ($enq) {
+ $dest->update_job_status($m, $job->id, 'NEW', 'Moved to this queue');
+ } else {
+ $dest->log($m, $job->id, 'REQUEUE', 'Moved to this queue');
+ }
+ $queue->update_job_status($m, $job->id, 'REMOVED', 'Moved from this queue');
+ $queue->remove($m, $j);
+ print "Moved $m:", $dest->job_name($j);
+ print " (already queued)" if !$enq;
+ print "\n";
+ }
}
}
- closedir D;
-} else {
- die "Invalid subcommand\n";
+ $queue->unlock;
+ exit $err;
}