]> mj.ucw.cz Git - bex.git/commitdiff
BEX module split
authorMartin Mares <mj@ucw.cz>
Mon, 31 Oct 2011 09:25:48 +0000 (10:25 +0100)
committerMartin Mares <mj@ucw.cz>
Mon, 31 Oct 2011 09:25:48 +0000 (10:25 +0100)
benq
brun
lib/BEX.pm
lib/BEX/Job.pm [new file with mode: 0644]
lib/BEX/Queue.pm [new file with mode: 0644]

diff --git a/benq b/benq
index a26660f06c5e4356de0d8c6c952c40b765efad4b..c401720c477c03223ae55db10793884e6fb5114c 100755 (executable)
--- a/benq
+++ b/benq
@@ -39,12 +39,11 @@ if ($new_stat->mtime <= $orig_stat->mtime && $new_stat->size == $orig_stat->size
 
 # Re-load the job and put it to the queue
 $job = BEX::Job->new_from_file($fn);
-print "New job ", $job->{'ID'}, "\n";
+print "New job ", $job->id, "\n";
 my $queue = BEX::Queue->new($queue_name);
 for my $m (@machines) {
        if ($queue->enqueue($m, $job)) {
                print "\t$m\n";
-               BEX::log("$m " . $job->{'ID'} . " queued");
        } else {
                print "\t$m (already queued)\n";
        }
diff --git a/brun b/brun
index 01bff140e7368ea823743936c7e0d80fadb37b20..760bf9ca172f1f7e8f0bcaf92df7c64109bb0386 100755 (executable)
--- a/brun
+++ b/brun
@@ -22,7 +22,7 @@ Usage: brun [<options>] [[!]<machine-or-class> ...]
 
 Options:
 -j, --job=<id>         Run only the specified job
--q, --queue=<name>     Run jobs in the given queue
+-q, --queue=<name>     Select job queue
     --status-fifo=<f>  Send status updates to the given named pipe
 AMEN
 
@@ -32,16 +32,19 @@ if (defined $status_fifo) {
        autoflush $status_fd, 1;
 }
 
-sub send_status($$$) {
-       my ($mach, $job, $status) = @_;
+sub update_status($$$$;$) {
+       my ($mach, $job, $status, $log_on_queue, $msg) = @_;
        if ($status_fd) {
                print $status_fd "! $mach $job $status\n";
        }
+       if ($log_on_queue) {
+               $log_on_queue->log($mach, $job, $status, $msg);
+       }
 }
 
 sub ping_machine($) {
        my ($mach) = @_;
-       send_status($mach, '-', 'PING');
+       update_status($mach, '-', 'PING', undef);
        `ping -c1 -n $mach >/dev/null 2>/dev/null`;
        return !$?;
 }
@@ -56,7 +59,7 @@ sub run_job($$$) {
        my $tmp = $queue->temp_file($mach, $jid);
        open T, '>', $tmp or die;
        if (defined $BEX::Config::job_prolog) {
-               open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!";
+               open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
                while (<P>) { print T; }
                close P;
        } else {
@@ -65,24 +68,24 @@ sub run_job($$$) {
        print T "# BEX job ", $jid, "\n";
        print T $job->{'body'};
        if (defined $BEX::Config::job_epilog) {
-               open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!";
+               open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
                while (<E>) { print T; }
                close E;
        }
        close T;
 
-       send_status($mach, $jid, 'SEND');
+       update_status($mach, $jid, 'SEND', undef);
        my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
        my $rtmp = `ssh <$tmp $mach '$cmd'`;
-       !$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed";
+       !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
        chomp $rtmp;
 
-       send_status($mach, $jid, 'RUN');
+       update_status($mach, $jid, 'RUN', $queue);
        system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e";
        if ($?) {
-               return 'Failed';
+               return ('FAILED', 'Job failed');
        } else {
-               return 'OK';
+               return ('OK', undef);
        }
 }
 
@@ -91,7 +94,7 @@ my $queue = BEX::Queue->new($queue_name);
 
 for my $mach (@machines) {
        my @q = $queue->scan($mach) or next;
-       send_status($mach, '-', 'INIT');
+       update_status($mach, '-', 'INIT', undef);
        my $ping;
        for my $jid (@q) {
                if (defined $given_job) {
@@ -103,24 +106,23 @@ for my $mach (@machines) {
                };
                print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n";
                $ping //= ping_machine($mach);
-               my $s;
+               my $s, $msg;
                if (!$ping) {
-                       $s = 'No ping';
+                       ($s, $msg) = ('NOPING', 'Does not ping');
                } else {
-                       $s = run_job($job, $queue, $mach);
+                       ($s, $msg) = run_job($job, $queue, $mach);
                }
+               update_status($mach, $jid, $s, $queue, $msg);
 
-               BEX::log("$mach $jid $s");
                if ($s eq 'OK') {
                        print "+++ OK\n";
                        $queue->remove($mach, $jid);
-                       send_status($mach, $jid, 'OK');
                } else {
-                       print "--- $s\n";
+                       print "--- $s: $msg\n";
                        $stat->{'Status'} = $s;
+                       $stat->{'Message'} = $msg;
                        $queue->write_job_status($mach, $jid, $stat);
-                       send_status($mach, $jid, 'ERR');
                }
        }
-       send_status($mach, '-', 'DONE');
+       update_status($mach, '-', 'DONE', undef);
 }
index 7d7f99c5bb5d4da949d12466419990ca23f90ce0..579eff29454858a4ebbc636bca41a2529acb6551 100644 (file)
 use strict;
 use warnings;
 
-use BEX::Config;
-
 package BEX;
 
-use IO::File;
-
-use POSIX;
-
-my $log_file;
-
-sub log($) {
-       $log_file //= new IO::File '>>log' or die "Cannot open log: $!";
-       print $log_file POSIX::strftime("%Y-%m-%d %H:%M:%S ", localtime), $_[0], "\n";
-}
-
-package BEX::Job;
-
-use POSIX;
-
-our $job_cnt = 0;
-
-sub new($;$) {
-       my ($class, $id) = @_;
-       my $job = { };
-       bless $job;
-       if (defined $id) {
-               $job->{'ID'} = $id;
-       } else {
-               $job_cnt++;
-               $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime);
-       }
-       $job->{'Subject'} = '(no subject)';
-       return $job;
-}
-
-sub new_from_file($$;$) {
-       my ($class, $file, $header_only) = @_;
-       my $job = { };
-       open T, '<', $file or die "Cannot open $file: $!";
-       while (<T>) {
-               chomp;
-               /^$/ and last;
-               /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error";
-               !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined";
-               $job->{$1} = $2;
-       }
-       if (!$header_only) {
-               my @cmds = <T>;
-               $job->{'body'} = join("", @cmds);
-       }
-       close T;
-       $job->{'Subject'} //= '?';
-       $job->{'ID'} or die "Cannot load $file: Missing ID";
-       return bless $job;
-}
-
-sub attr($$;$) {
-       my ($job, $attr, $val) = @_;
-       $job->{$attr} = $val if defined $val;
-       return $job->{$attr};
-}
-
-sub dump($) {
-       my ($job) = @_;
-       for my $k (sort keys %$job) {
-               print "$k: ", $job->{$k}, "\n";
-       }
-}
-
-sub save($;$) {
-       my ($job, $fn) = @_;
-       -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!";
-       $fn //= 'tmp/' . $job->{'ID'};
-       open T, '>', $fn or die "Cannot create $fn: $!";
-       for my $k (sort grep { /^[A-Z]/ } keys %$job) {
-               print T "$k: ", $job->{$k}, "\n";
-       }
-       print T "\n";
-       print T $job->{'body'} if defined $job->{'body'};
-       close T;
-       return $fn;
-}
-
-package BEX::Queue;
-
-sub new($;$) {
-       my ($class, $name) = @_;
-       $name //= 'queue';
-       -d $name or die "Queue directory $name does not exist\n";
-       for my $d ("hosts", "jobs") {
-               -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
-       }
-       my $queue = {
-               'Name' => $name,
-               'MetaCache' => {},
-       };
-       return bless $queue;
-}
-
-sub host_dir($$) {
-       my ($queue, $machine) = @_;
-       return $queue->{'Name'} . '/hosts/' . $machine;
-}
-
-sub queue_file($$) {
-       my ($queue, $machine, $jid) = @_;
-       return $queue->host_dir($machine) . '/' . $jid . '.job';
-}
-
-sub status_file($$) {
-       my ($queue, $machine, $jid) = @_;
-       return $queue->host_dir($machine) . '/' . $jid . '.stat';
-}
-
-sub temp_file($$) {
-       my ($queue, $machine, $jid) = @_;
-       return $queue->host_dir($machine) . '/' . $jid . '.tmp';
-}
-
-sub job_file($$) {
-       my ($queue, $jid) = @_;
-       return $queue->{'Name'} . '/jobs/' . $jid. '.job';
-}
-
-sub enqueue($$$) {
-       my ($queue, $machine, $job) = @_;
-       my $qf = $queue->queue_file($machine, $job->{'ID'});
-       if (-f $qf) { return 0; }
-       my $fn = $queue->job_file($job->{'ID'});
-       -f $fn or $job->save($fn);
-       my $dir = $queue->host_dir($machine);
-       -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
-       symlink '../../jobs/' . $job->{'ID'} . '.job', $qf or die "Cannot create $qf: $!";
-       return 1;
-}
-
-sub scan($$) {
-       my ($queue, $machine) = @_;
-       my @list = ();
-       if (opendir D, $queue->host_dir($machine)) {
-               while ($_ = readdir D) {
-                       /^\./ and next;
-                       s{\.job}{} or next;
-                       push @list, $_;
-               }
-               closedir D;
-       }
-       return @list;
-}
-
-sub remove($$) {
-       my ($queue, $machine, $jid) = @_;
-       unlink $queue->queue_file($machine, $jid);
-       unlink $queue->status_file($machine, $jid);
-       unlink $queue->temp_file($machine, $jid);
-}
-
-sub job_metadata($$) {
-       my ($queue, $jid) = @_;
-       my $cache = $queue->{'MetaCache'};
-       if (!defined $cache->{$jid}) {
-               $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
-       }
-       return $cache->{$jid};
-}
-
-sub read_job_status($$$) {
-       my ($queue, $machine, $jid) = @_;
-       my %s = ();
-       my $sf = $queue->status_file($machine, $jid);
-       if (open S, '<', $sf) {
-               while (<S>) {
-                       chomp;
-                       /^(\w+):\s*(.*)/ or die "Parse error in $sf";
-                       $s{$1} = $2;
-               }
-               close S;
-       }
-       return \%s;
-}
-
-sub write_job_status($$$$) {
-       my ($queue, $machine, $jid, $stat) = @_;
-       my $sf = $queue->status_file($machine, $jid);
-       open S, '>', $sf or die "Cannot create $sf: $!";
-       for my $k (sort keys %$stat) {
-               print S "$k: ", $stat->{$k}, "\n";
-       }
-       close S;
-}
+use BEX::Config;
+use BEX::Job;
+use BEX::Queue;
 
 42;
diff --git a/lib/BEX/Job.pm b/lib/BEX/Job.pm
new file mode 100644 (file)
index 0000000..16e55f5
--- /dev/null
@@ -0,0 +1,80 @@
+# Batch EXecutor 2.0 -- Jobs
+# (c) 2011 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+
+package BEX::Job;
+
+use POSIX ();
+
+our $job_cnt = 0;
+
+sub new($;$) {
+       my ($class, $id) = @_;
+       my $job = { };
+       bless $job;
+       if (defined $id) {
+               $job->{'ID'} = $id;
+       } else {
+               $job_cnt++;
+               $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime);
+       }
+       $job->{'Subject'} = '(no subject)';
+       return $job;
+}
+
+sub new_from_file($$;$) {
+       my ($class, $file, $header_only) = @_;
+       my $job = { };
+       open T, '<', $file or die "Cannot open $file: $!";
+       while (<T>) {
+               chomp;
+               /^$/ and last;
+               /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error";
+               !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined";
+               $job->{$1} = $2;
+       }
+       if (!$header_only) {
+               my @cmds = <T>;
+               $job->{'body'} = join("", @cmds);
+       }
+       close T;
+       $job->{'Subject'} //= '?';
+       $job->{'ID'} or die "Cannot load $file: Missing ID";
+       $job->{'ID'} !~ /\.[a-z]+$/ or die "Cannot load $file: Invalid ID syntax";
+       return bless $job;
+}
+
+sub id($) {
+       return $_->{'ID'};
+}
+
+sub attr($$;$) {
+       my ($job, $attr, $val) = @_;
+       $job->{$attr} = $val if defined $val;
+       return $job->{$attr};
+}
+
+sub dump($) {
+       my ($job) = @_;
+       for my $k (sort keys %$job) {
+               print "$k: ", $job->{$k}, "\n";
+       }
+}
+
+sub save($;$) {
+       my ($job, $fn) = @_;
+       -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!";
+       $fn //= 'tmp/' . $job->id;
+       open T, '>', $fn or die "Cannot create $fn: $!";
+       for my $k (sort grep { /^[A-Z]/ } keys %$job) {
+               print T "$k: ", $job->{$k}, "\n";
+       }
+       print T "\n";
+       print T $job->{'body'} if defined $job->{'body'};
+       close T;
+       return $fn;
+}
+
+42;
diff --git a/lib/BEX/Queue.pm b/lib/BEX/Queue.pm
new file mode 100644 (file)
index 0000000..429c7a4
--- /dev/null
@@ -0,0 +1,131 @@
+# Batch EXecutor 2.0 -- Queues
+# (c) 2011 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+
+package BEX::Queue;
+
+use IO::File;
+use POSIX ();
+
+sub new($;$) {
+       my ($class, $name) = @_;
+       $name //= 'queue';
+       -d $name or die "Queue directory $name does not exist\n";
+       for my $d ("hosts", "jobs") {
+               -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
+       }
+       my $queue = {
+               'Name' => $name,
+               'MetaCache' => {},
+       };
+       return bless $queue;
+}
+
+# Most actions have to be logged by the caller
+sub log($$$$;$) {
+       my ($queue, $mach, $jid, $stat, $msg) = @_;
+       my $fh = $queue->{'LogFH'} //= new IO::File '>>', $queue->{'Name'} . '/log' or die "Cannot open log: $!";
+       my $m = join(" ", POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime), $mach, $jid, $stat);
+       $m .= " $msg" if defined $msg;
+       print $fh "$m\n";
+}
+
+sub host_dir($$) {
+       my ($queue, $machine) = @_;
+       return $queue->{'Name'} . '/hosts/' . $machine;
+}
+
+sub queue_file($$) {
+       my ($queue, $machine, $jid) = @_;
+       return $queue->host_dir($machine) . '/' . $jid . '.job';
+}
+
+sub status_file($$) {
+       my ($queue, $machine, $jid) = @_;
+       return $queue->host_dir($machine) . '/' . $jid . '.stat';
+}
+
+sub temp_file($$) {
+       my ($queue, $machine, $jid) = @_;
+       return $queue->host_dir($machine) . '/' . $jid . '.tmp';
+}
+
+sub job_file($$) {
+       my ($queue, $jid) = @_;
+       return $queue->{'Name'} . '/jobs/' . $jid. '.job';
+}
+
+sub enqueue($$$) {
+       my ($queue, $machine, $job) = @_;
+       my $qf = $queue->queue_file($machine, $job->id);
+       if (-f $qf) {
+               $queue->log($machine, $job->id, 'REQUEUE');
+               return 0;
+       }
+       my $fn = $queue->job_file($job->id);
+       -f $fn or $job->save($fn);
+       my $dir = $queue->host_dir($machine);
+       -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
+       symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
+       $queue->log($machine, $job->id, 'QUEUE');
+       return 1;
+}
+
+sub scan($$) {
+       my ($queue, $machine) = @_;
+       my @list = ();
+       if (opendir D, $queue->host_dir($machine)) {
+               while ($_ = readdir D) {
+                       /^\./ and next;
+                       s{\.job}{} or next;
+                       push @list, $_;
+               }
+               closedir D;
+       }
+       return @list;
+}
+
+sub remove($$) {
+       my ($queue, $machine, $jid) = @_;
+       unlink $queue->queue_file($machine, $jid);
+       unlink $queue->status_file($machine, $jid);
+       unlink $queue->temp_file($machine, $jid);
+}
+
+sub job_metadata($$) {
+       my ($queue, $jid) = @_;
+       my $cache = $queue->{'MetaCache'};
+       if (!defined $cache->{$jid}) {
+               $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
+       }
+       return $cache->{$jid};
+}
+
+sub read_job_status($$$) {
+       my ($queue, $machine, $jid) = @_;
+       my %s = ();
+       my $sf = $queue->status_file($machine, $jid);
+       if (open S, '<', $sf) {
+               while (<S>) {
+                       chomp;
+                       /^(\w+):\s*(.*)/ or die "Parse error in $sf";
+                       $s{$1} = $2;
+               }
+               close S;
+       }
+       return \%s;
+}
+
+sub write_job_status($$$$) {
+       my ($queue, $machine, $jid, $stat) = @_;
+       my $sf = $queue->status_file($machine, $jid);
+       open S, '>', $sf or die "Cannot create $sf: $!";
+       for my $k (sort keys %$stat) {
+               print S "$k: ", $stat->{$k}, "\n";
+       }
+       close S;
+}
+
+42;