]> mj.ucw.cz Git - bex.git/blobdiff - brun
TODO
[bex.git] / brun
diff --git a/brun b/brun
index 48afd8c8b5ec77d817a263b83a511fd2b11019e6..040b6ba40d66d6abc9f83bd8b00401062a76e069 100755 (executable)
--- a/brun
+++ b/brun
@@ -9,86 +9,186 @@ use Getopt::Long;
 use lib 'lib';
 use BEX;
 
+my $given_job;
 my $queue_name;
+my $status_fifo;
 
 GetOptions(
+       "j|job=s" => \$given_job,
        "q|queue=s" => \$queue_name,
+       "s|status-fifo=s" => \$status_fifo,
 ) or die <<AMEN ;
 Usage: brun [<options>] [[!]<machine-or-class> ...]
 
 Options:
--q, --queue=<name>     Run jobs in the given queue
+-j, --job=<id>         Run only the specified job
+-q, --queue=<name>     Select job queue
+    --status-fifo=<f>  Send status updates to the given named pipe
 AMEN
 
+my $status_fd;
+if (defined $status_fifo) {
+       open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
+       autoflush $status_fd, 1;
+}
+
+sub update_status($$$$;$) {
+       my ($mach, $job, $status, $log_on_queue, $msg) = @_;
+       if ($status_fd) {
+               print $status_fd "! $mach $job $status\n";
+       }
+       if ($log_on_queue) {
+               $log_on_queue->update_job_status($mach, $job, $status, $msg);
+       }
+}
+
+my %pings;
+
 sub ping_machine($) {
        my ($mach) = @_;
-       `ping -c1 -n $mach >/dev/null 2>/dev/null`;
-       return !$?;
+       if (!defined $pings{$mach}) {
+               if ($BEX::Config::ping_hosts) {
+                       update_status($mach, '-', 'PING', undef);
+                       my $host = BEX::Config::host_name($mach);
+                       `ping -c1 -n $host >/dev/null 2>/dev/null`;
+                       $pings{$mach} = !$?;
+               } else {
+                       $pings{$mach} = 1;
+               }
+       }
+       if ($pings{$mach}) {
+               return ('OK', undef);
+       } else {
+               return ('NOPING', 'Does not ping');
+       }
 }
 
-sub run_job($$$) {
+sub exit_status($) {
+       my ($s) = @_;
+       if ($s >> 8) {
+               return "with exit code " . ($s >> 8);
+       } else {
+               return "on fatal signal " . ($s & 127);
+       }
+}
+
+sub run_job_prep($$$) {
+       my ($job, $queue, $mach) = @_;
+       my $prep = $job->attr('Prep');
+       defined($prep) && $prep !~ /^\s*$/ or return 'OK';
+
+       my $jid = $job->id;
+       update_status($mach, $jid, 'PREP', $queue);
+       my $lf = $queue->log_file($mach, $jid);
+       $ENV{'HOST'} = BEX::Config::host_name($mach);
+       system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
+       delete $ENV{'HOST'};
+       if ($?) {
+               return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
+       } else {
+               return 'OK';
+       }
+}
+
+sub run_job_body($$$) {
        my ($job, $queue, $mach) = @_;
-       # FIXME: rsyncing, rsync-only jobs
-       # FIXME: Locking
 
-       my $tmp = $queue->temp_file($mach, $job->{'ID'});
+       if ($job->attr('body') =~ /^\s*$/s) {
+               # Shortcut if the body is empty
+               return 'OK'
+       }
+
+       my $host = BEX::Config::host_name($mach);
+       my $jid = $job->id;
+
+       my $tmp = $queue->temp_file($mach, $jid);
        open T, '>', $tmp or die;
        if (defined $BEX::Config::job_prolog) {
-               open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!";
+               open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
                while (<P>) { print T; }
                close P;
        } else {
                print T "#!/bin/sh\n";
        }
-       print T "# BEX job ", $job->{'ID'}, "\n";
-       print T $job->{'body'};
+       print T "# BEX job ", $jid, "\n";
+       print T $job->attr('body');
        if (defined $BEX::Config::job_epilog) {
-               open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!";
+               open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
                while (<E>) { print T; }
                close E;
        }
        close T;
 
+       update_status($mach, $jid, 'SEND', undef);
        my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
-       my $rtmp = `ssh <$tmp $mach '$cmd'`;
-       !$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed";
+       my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
+       !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
        chomp $rtmp;
 
-       system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e";
+       update_status($mach, $jid, 'RUN', $queue);
+       my $lf = $queue->log_file($mach, $jid);
+       system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
        if ($?) {
-               return 'Failed';
+               return ('FAILED', 'Job failed ' . exit_status($?));
        } else {
                return 'OK';
        }
 }
 
+sub run_job($$$) {
+       my ($job, $queue, $mach) = @_;
+       my ($stat, $msg);
+
+       ($stat, $msg) = ping_machine($mach);
+       $stat eq 'OK' or return ($stat, $msg);
+
+       ($stat, $msg) = run_job_prep($job, $queue, $mach);
+       $stat eq 'OK' or return ($stat, $msg);
+
+       return run_job_body($job, $queue, $mach);
+}
+
 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
 my $queue = BEX::Queue->new($queue_name);
+
+$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
+
 for my $mach (@machines) {
        my @q = $queue->scan($mach) or next;
-       my $ping;
-       for my $jid (@q) {
+       if (!$queue->lock($mach, undef)) {
+               print "### Machine $mach is locked by another brun, skipping...\n";
+               update_status($mach, '-', 'LOCKED', undef);
+               update_status($mach, '-', 'DONE', undef);
+               next;
+       }
+       update_status($mach, '-', 'INIT', undef);
+       while (my $jid = shift @q) {
+               if (defined $given_job) {
+                       $jid eq $given_job or next;
+               }
                my $job = BEX::Job->new_from_file($queue->job_file($jid));
-               my $stat = {
-                       'Time' => time,
-               };
-               print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n";
-               $ping //= ping_machine($mach);
-               my $s;
-               if (!$ping) {
-                       $s = 'No ping';
-               } else {
-                       $s = run_job($job, $queue, $mach);
+               update_status($mach, $jid, 'INIT', undef);
+               if (!$queue->lock($mach, $jid)) {
+                       print "### Skipping locked $jid on $mach ###\n";
+                       update_status($mach, $jid, 'LOCKED', undef);
+                       next;
                }
+               print "### Running ", $job->name, " on $mach ###\n";
+               my ($s, $msg) = run_job($job, $queue, $mach);
+               update_status($mach, $jid, $s, $queue, $msg);
 
-               BEX::log("$mach $jid $s");
                if ($s eq 'OK') {
                        print "+++ OK\n";
                        $queue->remove($mach, $jid);
                } else {
-                       print "--- $s\n";
-                       $stat->{'Status'} = $s;
-                       $queue->write_job_status($mach, $jid, $stat);
+                       print "--- $s: $msg\n";
+                       if ($BEX::Config::skip_on_fail) {
+                               print "### Skipping other jobs on the same host ###\n" if @q;
+                               last;
+                       }
                }
        }
+} continue {
+       update_status($mach, '-', 'DONE', undef);
 }
+$queue->unlock;