- bprun: option for setting max # of running jobs
- bprun --job
- bprun --curses
-- Locking
- rsync, rsync-only
- Terminology: machine vs. host
- ssh options
- Detector of orphans (unused queue dirs, jobs on non-existent machines, non-queued jobs)
- job failed => give a more explanatory message
+- write_job_status should be atomic
+- change ((no subject))
my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
my $queue = BEX::Queue->new($queue_name);
+$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
+
for my $mach (@machines) {
my @q = $queue->scan($mach) or next;
+ if (!$queue->lock($mach, undef)) {
+ print "### Machine $mach is locked by another brun, skipping...\n";
+ update_status($mach, '-', 'LOCKED', undef);
+ update_status($mach, '-', 'DONE', undef);
+ next;
+ }
update_status($mach, '-', 'INIT', undef);
my $ping;
for my $jid (@q) {
$jid eq $given_job or next;
}
my $job = BEX::Job->new_from_file($queue->job_file($jid));
+ if (!$queue->lock($mach, $jid)) {
+ print "### Skipping locked $jid on $mach ###\n";
+ update_status($mach, $jid, 'LOCKED', undef);
+ next;
+ }
my $stat = {
'Time' => time,
};
} else {
($s, $msg) = run_job($job, $queue, $mach);
}
+
+ $stat->{'Status'} = $s;
+ $stat->{'Message'} = $msg;
+ $queue->write_job_status($mach, $jid, $stat);
+
+ # Called after writing the status file, so that the front-end watching
+ # our status FIFO can see the new status file.
update_status($mach, $jid, $s, $queue, $msg);
if ($s eq 'OK') {
$queue->remove($mach, $jid);
} else {
print "--- $s: $msg\n";
- $stat->{'Status'} = $s;
- $stat->{'Message'} = $msg;
- $queue->write_job_status($mach, $jid, $stat);
}
}
update_status($mach, '-', 'DONE', undef);
}
+$queue->unlock;
# Before we try to connect to a host, ping it to check if it's alive
our $ping_hosts = 1;
+# Whenever we want to run a job on a machine, we must obtain a lock.
+# Available locking schemes are:
+# none - no locking takes place (dangerous!)
+# job - obtain exclusive access to the job, other jobs on the same
+# host can run in parallel
+# host - obtain exclusive access to the host, other hosts can run
+# queue - obtain exclusive access to the whole queue
+our $locking_scheme = 'host';
+
# Various utility functions
sub parse_machine_list(@);
use strict;
use warnings;
+use feature 'switch';
package BEX::Queue;
use IO::File;
use File::Path;
+use Fcntl qw(:flock);
use POSIX ();
sub new($;$) {
my $sf = $queue->status_file($machine, $jid);
open S, '>', $sf or die "Cannot create $sf: $!";
for my $k (sort keys %$stat) {
- print S "$k: ", $stat->{$k}, "\n";
+ print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
}
close S;
}
+# Whenever we want to run a job on a machine, we must obtain a lock;
+# at most one lock can be held at a time by a single BEX::Queue object.
+# See the description of locking schemes in BEX::Config.
+sub lock($$$) {
+ my ($queue, $machine, $jid) = @_;
+ my $lock = $queue->{'Name'};
+ given ($BEX::Config::locking_scheme) {
+ when ('queue') { $lock .= '/lock'; }
+ when ('host') {
+ defined($machine) or return 1;
+ $lock .= "/hosts/$machine/lock";
+ }
+ when ('job') {
+ defined($machine) && defined($jid) or return 1;
+ $lock .= "/hosts/$machine/$jid.lock";
+ }
+ when ('none') { return 1; }
+ default { die "Invalid BEX::Config::locking_scheme"; }
+ }
+ if (defined($queue->{'LockName'})) {
+ return 1 if ($queue->{'LockName'} eq $lock);
+ $queue->unlock;
+ }
+ open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
+ if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
+ close $queue->{'LockHandle'};
+ delete $queue->{'LockHandle'};
+ return 0;
+ }
+ $queue->{'LockName'} = $lock;
+ return 1;
+}
+
+sub unlock($) {
+ my ($queue) = @_;
+ defined $queue->{'LockName'} or return;
+ unlink $queue->{'LockName'};
+ flock $queue->{'LockHandle'}, LOCK_UN;
+ close $queue->{'LockHandle'};
+ delete $queue->{'LockHandle'};
+ delete $queue->{'LockName'};
+}
+
42;