From 194ce5b02e0dffa662bb2ce3a531d4f3a1246bfb Mon Sep 17 00:00:00 2001 From: Martin Mares Date: Mon, 31 Oct 2011 15:30:50 +0100 Subject: [PATCH] Implemented `bq --rm' and `bq --move-to' --- NOTES | 1 + TODO | 1 + bq | 140 +++++++++++++++++++++++++++++++++++------------ lib/BEX/Job.pm | 8 +++ lib/BEX/Queue.pm | 11 +++- 5 files changed, 123 insertions(+), 38 deletions(-) diff --git a/NOTES b/NOTES index 9f3dc17..ee84961 100644 --- a/NOTES +++ b/NOTES @@ -56,6 +56,7 @@ OK Job finished successfully (this is usually not seen in the queue, since FAILED Job failed to execute (i.e., it returned a non-zero exit code) INTERR Internal error of BEX (e.g., failed to read job prolog file) PREPFAIL Preparatory commands failed (i.e., those present in Prep header field) +REMOVED Job removed from the queue (behavior similar to OK) These are present only in log files and messages sent over status FIFO: diff --git a/TODO b/TODO index e6fca02..ea1dfba 100644 --- a/TODO +++ b/TODO @@ -3,3 +3,4 @@ - bprun --curses - Terminology: machine vs. host - Detector of orphans (unused queue dirs, jobs on non-existent machines, non-queued jobs) +- use job->name diff --git a/bq b/bq index 9be29ab..478b683 100755 --- a/bq +++ b/bq @@ -10,77 +10,147 @@ use POSIX; use lib 'lib'; use BEX; -my $by_host; +my $op_by_job; +my $op_by_host; +my $op_rm; +my $op_move_to; + my $queue_name; my $given_job; GetOptions( - "h|by-host!" => \$by_host, + "by-job!" => \$op_by_job, + "h|by-host!" => \$op_by_host, + "rm!" => \$op_rm, + "move-to=s" => \$op_move_to, "j|job=s" => \$given_job, "q|queue=s" => \$queue_name, ) or die <] [[!] ...] +Usage: bq [] [[!] ...] + +Actions: + --by-job Show jobs sorted by job ID (default) +-h, --by-host Show jobs sorted by host + --rm Remove jobs from the queue + --move-to= Move jobs to a different queue Options: --h, --by-host Show jobs sorted by host (default: by job) --j, --job= Show only instances of the specified job --q, --queue= Show jobs in the given queue +-j, --job= Act on the specified job (default: on all) +-q, --queue= Act on the given queue AMEN my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); my $queue = BEX::Queue->new($queue_name); +# Select jobs my %jobs = (); my %machs = (); -my %subj = (); -my %stat = (); -my %mach_locked = (); for my $m (@machines) { - $mach_locked{$m} = $queue->is_locked($m, undef); for my $j ($queue->scan($m)) { if (defined $given_job) { next if $j ne $given_job; } push @{$jobs{$j}}, $m; push @{$machs{$m}}, $j; - my $job = $queue->job_metadata($j); - $subj{$j} = ' (' . $job->{'Subject'} . ')'; - } } -# Read status of each job -for my $m (keys %machs) { - for my $j (@{$machs{$m}}) { - my $st = $queue->read_job_status($m, $j); - if (defined($st->{'Time'}) && defined($st->{'Status'})) { - $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' . - POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']'; - } else { - $stat{$m}{$j} = ''; - } - if ($mach_locked{$m} || $queue->is_locked($m, $j)) { - $stat{$m}{$j} .= ' [LOCKED]'; +sub do_ls(); +sub do_rm(); +sub do_move_to(); + +my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to); +if ($ops > 1) { die "Multiple actions are not allowed\n"; } + +if ($op_rm) { do_rm(); } +elsif (defined $op_move_to) { do_move_to(); } +else { do_ls(); } +exit 0; + +sub do_ls() +{ + my %stat = (); + my %mach_locked = (); + for my $m (keys %machs) { + $mach_locked{$m} = $queue->is_locked($m, undef); + for my $j (@{$machs{$m}}) { + my $st = $queue->read_job_status($m, $j); + if (defined($st->{'Time'}) && defined($st->{'Status'})) { + $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' . + POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']'; + } else { + $stat{$m}{$j} = ''; + } + if ($mach_locked{$m} || $queue->is_locked($m, $j)) { + $stat{$m}{$j} .= ' [LOCKED]'; + } } } -} -if ($queue->is_locked(undef, undef)) { - print "### Queue lock present\n\n"; + if ($queue->is_locked(undef, undef)) { + print "### Queue lock present\n\n"; + } + + if ($op_by_host) { + for my $m (sort keys %machs) { + print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n"; + for my $j (@{$machs{$m}}) { + print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n"; + } + } + } else { + for my $j (sort keys %jobs) { + print $queue->job_name($j), "\n"; + for my $m (sort @{$jobs{$j}}) { + print "\t$m", $stat{$m}{$j}, "\n"; + } + } + } } -if ($by_host) { +sub do_rm() +{ + my $err = 0; for my $m (sort keys %machs) { - print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n"; - for my $j (@{$machs{$m}}) { - print "\t$j", $subj{$j}, $stat{$m}{$j}, "\n"; + for my $j (sort @{$machs{$m}}) { + if (!$queue->lock($m, $j)) { + print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n"; + $err = 1; + } else { + $queue->log($m, $j, 'REMOVED'); + $queue->write_job_status($m, $j, { 'Time' => time, 'Status' => 'REMOVED' }); + $queue->remove($m, $j); + print "Removed $m:", $queue->job_name($j), "\n"; + } } } -} else { + $queue->unlock; + exit $err; +} + +sub do_move_to() +{ + my $err = 0; + my $dest = BEX::Queue->new($op_move_to); + $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n"; for my $j (sort keys %jobs) { - print $j, $subj{$j}, "\n"; + my $job = BEX::Job->new_from_file($queue->job_file($j)); for my $m (sort @{$jobs{$j}}) { - print "\t$m", $stat{$m}{$j}, "\n"; + if (!$queue->lock($m, $j)) { + print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n"; + $err = 1; + } else { + my $enq = $dest->enqueue($m, $job); + $dest->write_job_status($m, $job->id, { 'Time' => time, 'Status' => 'NEW', 'Message' => 'Moved to this queue' }); + $queue->log($m, $j, 'REMOVED', "Moved to another queue"); + $queue->write_job_status($m, $j, { 'Time' => time, 'Status' => 'REMOVED', 'Message' => 'Moved to another queue' }); + $queue->remove($m, $j); + print "Moved $m:", $queue->job_name($j); + print " (already queued)" if !$enq; + print "\n"; + } } } + $queue->unlock; + exit $err; } diff --git a/lib/BEX/Job.pm b/lib/BEX/Job.pm index b0add1b..d040f1a 100644 --- a/lib/BEX/Job.pm +++ b/lib/BEX/Job.pm @@ -56,6 +56,14 @@ sub id($) { return $_[0]->{'ID'}; } +sub name($) { + my ($job) = @_; + my $name = $job->{'ID'}; + my $subj = $job->{'Subject'} // ""; + $name .= " ($subj)" if $subj !~ /^\s*$/; + return $name; +} + sub attr($$;$) { my ($job, $attr, $val) = @_; $job->{$attr} = $val if defined $val; diff --git a/lib/BEX/Queue.pm b/lib/BEX/Queue.pm index 355f649..b8fb6f1 100644 --- a/lib/BEX/Queue.pm +++ b/lib/BEX/Queue.pm @@ -103,9 +103,9 @@ sub scan($$) { return sort @list; } -sub remove($$) { - my ($queue, $machine, $jid) = @_; - if ($BEX::Config::keep_history) { +sub remove($$;$) { + my ($queue, $machine, $jid, $force_remove) = @_; + if ($BEX::Config::keep_history && !$force_remove) { my $s = $queue->{'Name'} . '/hosts/' . $machine; my $d = $queue->{'Name'} . '/history/' . $machine; File::Path::mkpath($d); @@ -133,6 +133,11 @@ sub job_metadata($$) { return $cache->{$jid}; } +sub job_name($$) { + my ($queue, $jid) = @_; + return $queue->job_metadata($jid)->name; +} + sub read_job_status($$$) { my ($queue, $machine, $jid) = @_; my %s = (); -- 2.39.2