]> mj.ucw.cz Git - bex.git/commitdiff
brun: Status socket
authorMartin Mares <mj@ucw.cz>
Sun, 30 Oct 2011 21:18:07 +0000 (22:18 +0100)
committerMartin Mares <mj@ucw.cz>
Sun, 30 Oct 2011 21:18:07 +0000 (22:18 +0100)
brun

diff --git a/brun b/brun
index 48afd8c8b5ec77d817a263b83a511fd2b11019e6..2c364ee2e74c6ff79da117e53f744f1f3ff2a865 100755 (executable)
--- a/brun
+++ b/brun
@@ -5,33 +5,58 @@
 use strict;
 use warnings;
 use Getopt::Long;
+use IO::Socket::UNIX;
 
 use lib 'lib';
 use BEX;
 
+my $given_job;
 my $queue_name;
+my $status_socket;
 
 GetOptions(
+       "j|job=s" => \$given_job,
        "q|queue=s" => \$queue_name,
+       "s|status-socket=s" => \$status_socket,
 ) or die <<AMEN ;
 Usage: brun [<options>] [[!]<machine-or-class> ...]
 
 Options:
+-j, --job=<id>         Run only the specified job
 -q, --queue=<name>     Run jobs in the given queue
+    --status-socket=<s>        Send status updates to the given filesystem socket
 AMEN
 
+my $status_sk;
+if (defined $status_socket) {
+       $status_sk = IO::Socket::UNIX->new(Type => SOCK_STREAM, Peer => $status_socket) or die;
+}
+
+sub send_status($$$) {
+       my ($mach, $job, $status) = @_;
+       if ($status_sk) {
+               print $status_sk "$mach $job $status\n";
+       } else {
+               # FIXME: Debug
+               print ">>> $mach $job $status\n";
+       }
+}
+
 sub ping_machine($) {
        my ($mach) = @_;
+       send_status($mach, '-', 'PING');
        `ping -c1 -n $mach >/dev/null 2>/dev/null`;
        return !$?;
 }
 
 sub run_job($$$) {
        my ($job, $queue, $mach) = @_;
+       my $jid = $job->{'ID'};
+
        # FIXME: rsyncing, rsync-only jobs
        # FIXME: Locking
 
-       my $tmp = $queue->temp_file($mach, $job->{'ID'});
+       my $tmp = $queue->temp_file($mach, $jid);
        open T, '>', $tmp or die;
        if (defined $BEX::Config::job_prolog) {
                open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!";
@@ -40,7 +65,7 @@ sub run_job($$$) {
        } else {
                print T "#!/bin/sh\n";
        }
-       print T "# BEX job ", $job->{'ID'}, "\n";
+       print T "# BEX job ", $jid, "\n";
        print T $job->{'body'};
        if (defined $BEX::Config::job_epilog) {
                open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!";
@@ -49,11 +74,13 @@ sub run_job($$$) {
        }
        close T;
 
+       send_status($mach, $jid, 'SEND');
        my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
        my $rtmp = `ssh <$tmp $mach '$cmd'`;
        !$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed";
        chomp $rtmp;
 
+       send_status($mach, $jid, 'RUN');
        system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e";
        if ($?) {
                return 'Failed';
@@ -64,10 +91,15 @@ sub run_job($$$) {
 
 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
 my $queue = BEX::Queue->new($queue_name);
+
 for my $mach (@machines) {
        my @q = $queue->scan($mach) or next;
+       send_status($mach, '-', 'INIT');
        my $ping;
        for my $jid (@q) {
+               if (defined $given_job) {
+                       $jid eq $given_job or next;
+               }
                my $job = BEX::Job->new_from_file($queue->job_file($jid));
                my $stat = {
                        'Time' => time,
@@ -85,10 +117,13 @@ for my $mach (@machines) {
                if ($s eq 'OK') {
                        print "+++ OK\n";
                        $queue->remove($mach, $jid);
+                       send_status($mach, $jid, 'OK');
                } else {
                        print "--- $s\n";
                        $stat->{'Status'} = $s;
                        $queue->write_job_status($mach, $jid, $stat);
+                       send_status($mach, $jid, 'ERR');
                }
        }
+       send_status($mach, '-', 'DONE');
 }