Options:
-j, --job=<id> Run only the specified job
--q, --queue=<name> Run jobs in the given queue
+-q, --queue=<name> Select job queue
--status-fifo=<f> Send status updates to the given named pipe
AMEN
autoflush $status_fd, 1;
}
-sub send_status($$$) {
- my ($mach, $job, $status) = @_;
+sub update_status($$$$;$) {
+ my ($mach, $job, $status, $log_on_queue, $msg) = @_;
if ($status_fd) {
print $status_fd "! $mach $job $status\n";
}
+ if ($log_on_queue) {
+ $log_on_queue->update_job_status($mach, $job, $status, $msg);
+ }
}
+my %pings;
+
sub ping_machine($) {
my ($mach) = @_;
- send_status($mach, '-', 'PING');
- `ping -c1 -n $mach >/dev/null 2>/dev/null`;
- return !$?;
+ if (!defined $pings{$mach}) {
+ if ($BEX::Config::ping_hosts) {
+ update_status($mach, '-', 'PING', undef);
+ my $host = BEX::Config::host_name($mach);
+ `ping -c1 -n $host >/dev/null 2>/dev/null`;
+ $pings{$mach} = !$?;
+ } else {
+ $pings{$mach} = 1;
+ }
+ }
+ if ($pings{$mach}) {
+ return ('OK', undef);
+ } else {
+ return ('NOPING', 'Does not ping');
+ }
}
-sub run_job($$$) {
+sub exit_status($) {
+ my ($s) = @_;
+ if ($s >> 8) {
+ return "with exit code " . ($s >> 8);
+ } else {
+ return "on fatal signal " . ($s & 127);
+ }
+}
+
+sub run_job_prep($$$) {
my ($job, $queue, $mach) = @_;
- my $jid = $job->{'ID'};
+ my $prep = $job->attr('Prep');
+ defined($prep) && $prep !~ /^\s*$/ or return 'OK';
+
+ my $jid = $job->id;
+ update_status($mach, $jid, 'PREP', $queue);
+ my $lf = $queue->log_file($mach, $jid);
+ $ENV{'HOST'} = BEX::Config::host_name($mach);
+ system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
+ delete $ENV{'HOST'};
+ if ($?) {
+ return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
+ } else {
+ return 'OK';
+ }
+}
+
+sub run_job_body($$$) {
+ my ($job, $queue, $mach) = @_;
+
+ if ($job->attr('body') =~ /^\s*$/s) {
+ # Shortcut if the body is empty
+ return 'OK'
+ }
- # FIXME: rsyncing, rsync-only jobs
- # FIXME: Locking
+ my $host = BEX::Config::host_name($mach);
+ my $jid = $job->id;
my $tmp = $queue->temp_file($mach, $jid);
open T, '>', $tmp or die;
if (defined $BEX::Config::job_prolog) {
- open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!";
+ open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
while (<P>) { print T; }
close P;
} else {
print T "#!/bin/sh\n";
}
print T "# BEX job ", $jid, "\n";
- print T $job->{'body'};
+ print T $job->attr('body');
if (defined $BEX::Config::job_epilog) {
- open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!";
+ open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
while (<E>) { print T; }
close E;
}
close T;
- send_status($mach, $jid, 'SEND');
+ update_status($mach, $jid, 'SEND', undef);
my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
- my $rtmp = `ssh <$tmp $mach '$cmd'`;
- !$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed";
+ my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
+ !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
chomp $rtmp;
- send_status($mach, $jid, 'RUN');
- system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e";
+ update_status($mach, $jid, 'RUN', $queue);
+ my $lf = $queue->log_file($mach, $jid);
+ system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
if ($?) {
- return 'Failed';
+ return ('FAILED', 'Job failed ' . exit_status($?));
} else {
return 'OK';
}
}
+sub run_job($$$) {
+ my ($job, $queue, $mach) = @_;
+ my ($stat, $msg);
+
+ ($stat, $msg) = ping_machine($mach);
+ $stat eq 'OK' or return ($stat, $msg);
+
+ ($stat, $msg) = run_job_prep($job, $queue, $mach);
+ $stat eq 'OK' or return ($stat, $msg);
+
+ return run_job_body($job, $queue, $mach);
+}
+
my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
my $queue = BEX::Queue->new($queue_name);
+$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
+
for my $mach (@machines) {
my @q = $queue->scan($mach) or next;
- send_status($mach, '-', 'INIT');
- my $ping;
- for my $jid (@q) {
+ if (!$queue->lock($mach, undef)) {
+ print "### Machine $mach is locked by another brun, skipping...\n";
+ update_status($mach, '-', 'LOCKED', undef);
+ update_status($mach, '-', 'DONE', undef);
+ next;
+ }
+ update_status($mach, '-', 'INIT', undef);
+ while (my $jid = shift @q) {
if (defined $given_job) {
$jid eq $given_job or next;
}
my $job = BEX::Job->new_from_file($queue->job_file($jid));
- my $stat = {
- 'Time' => time,
- };
- print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n";
- $ping //= ping_machine($mach);
- my $s;
- if (!$ping) {
- $s = 'No ping';
- } else {
- $s = run_job($job, $queue, $mach);
+ update_status($mach, $jid, 'INIT', undef);
+ if (!$queue->lock($mach, $jid)) {
+ print "### Skipping locked $jid on $mach ###\n";
+ update_status($mach, $jid, 'LOCKED', undef);
+ next;
}
+ print "### Running ", $job->name, " on $mach ###\n";
+ my ($s, $msg) = run_job($job, $queue, $mach);
+ update_status($mach, $jid, $s, $queue, $msg);
- BEX::log("$mach $jid $s");
if ($s eq 'OK') {
print "+++ OK\n";
$queue->remove($mach, $jid);
- send_status($mach, $jid, 'OK');
} else {
- print "--- $s\n";
- $stat->{'Status'} = $s;
- $queue->write_job_status($mach, $jid, $stat);
- send_status($mach, $jid, 'ERR');
+ print "--- $s: $msg\n";
+ if ($BEX::Config::skip_on_fail) {
+ print "### Skipping other jobs on the same host ###\n" if @q;
+ last;
+ }
}
}
- send_status($mach, '-', 'DONE');
+} continue {
+ update_status($mach, '-', 'DONE', undef);
}
+$queue->unlock;