# Re-load the job and put it to the queue
$job = BEX::Job->new_from_file($fn);
-print "New job ", $job->{'ID'}, "\n";
+print "New job ", $job->id, "\n";
my $queue = BEX::Queue->new($queue_name);
for my $m (@machines) {
if ($queue->enqueue($m, $job)) {
print "\t$m\n";
- BEX::log("$m " . $job->{'ID'} . " queued");
} else {
print "\t$m (already queued)\n";
}
Options:
-j, --job=<id> Run only the specified job
--q, --queue=<name> Run jobs in the given queue
+-q, --queue=<name> Select job queue
--status-fifo=<f> Send status updates to the given named pipe
AMEN
autoflush $status_fd, 1;
}
-sub send_status($$$) {
- my ($mach, $job, $status) = @_;
+sub update_status($$$$;$) {
+ my ($mach, $job, $status, $log_on_queue, $msg) = @_;
if ($status_fd) {
print $status_fd "! $mach $job $status\n";
}
+ if ($log_on_queue) {
+ $log_on_queue->log($mach, $job, $status, $msg);
+ }
}
sub ping_machine($) {
my ($mach) = @_;
- send_status($mach, '-', 'PING');
+ update_status($mach, '-', 'PING', undef);
`ping -c1 -n $mach >/dev/null 2>/dev/null`;
return !$?;
}
my $tmp = $queue->temp_file($mach, $jid);
open T, '>', $tmp or die;
if (defined $BEX::Config::job_prolog) {
- open P, $BEX::Config::job_prolog or return "Cannot open prolog: $!";
+ open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
while (<P>) { print T; }
close P;
} else {
print T "# BEX job ", $jid, "\n";
print T $job->{'body'};
if (defined $BEX::Config::job_epilog) {
- open E, $BEX::Config::job_epilog or return "Cannot open epilog: $!";
+ open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
while (<E>) { print T; }
close E;
}
close T;
- send_status($mach, $jid, 'SEND');
+ update_status($mach, $jid, 'SEND', undef);
my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
my $rtmp = `ssh <$tmp $mach '$cmd'`;
- !$? && defined($rtmp) && $rtmp ne '' or return "Transfer failed";
+ !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
chomp $rtmp;
- send_status($mach, $jid, 'RUN');
+ update_status($mach, $jid, 'RUN', $queue);
system 'ssh', '-t', $mach, "$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e";
if ($?) {
- return 'Failed';
+ return ('FAILED', 'Job failed');
} else {
- return 'OK';
+ return ('OK', undef);
}
}
for my $mach (@machines) {
my @q = $queue->scan($mach) or next;
- send_status($mach, '-', 'INIT');
+ update_status($mach, '-', 'INIT', undef);
my $ping;
for my $jid (@q) {
if (defined $given_job) {
};
print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n";
$ping //= ping_machine($mach);
- my $s;
+ my $s, $msg;
if (!$ping) {
- $s = 'No ping';
+ ($s, $msg) = ('NOPING', 'Does not ping');
} else {
- $s = run_job($job, $queue, $mach);
+ ($s, $msg) = run_job($job, $queue, $mach);
}
+ update_status($mach, $jid, $s, $queue, $msg);
- BEX::log("$mach $jid $s");
if ($s eq 'OK') {
print "+++ OK\n";
$queue->remove($mach, $jid);
- send_status($mach, $jid, 'OK');
} else {
- print "--- $s\n";
+ print "--- $s: $msg\n";
$stat->{'Status'} = $s;
+ $stat->{'Message'} = $msg;
$queue->write_job_status($mach, $jid, $stat);
- send_status($mach, $jid, 'ERR');
}
}
- send_status($mach, '-', 'DONE');
+ update_status($mach, '-', 'DONE', undef);
}
use strict;
use warnings;
-use BEX::Config;
-
package BEX;
-use IO::File;
-
-use POSIX;
-
-my $log_file;
-
-sub log($) {
- $log_file //= new IO::File '>>log' or die "Cannot open log: $!";
- print $log_file POSIX::strftime("%Y-%m-%d %H:%M:%S ", localtime), $_[0], "\n";
-}
-
-package BEX::Job;
-
-use POSIX;
-
-our $job_cnt = 0;
-
-sub new($;$) {
- my ($class, $id) = @_;
- my $job = { };
- bless $job;
- if (defined $id) {
- $job->{'ID'} = $id;
- } else {
- $job_cnt++;
- $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime);
- }
- $job->{'Subject'} = '(no subject)';
- return $job;
-}
-
-sub new_from_file($$;$) {
- my ($class, $file, $header_only) = @_;
- my $job = { };
- open T, '<', $file or die "Cannot open $file: $!";
- while (<T>) {
- chomp;
- /^$/ and last;
- /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error";
- !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined";
- $job->{$1} = $2;
- }
- if (!$header_only) {
- my @cmds = <T>;
- $job->{'body'} = join("", @cmds);
- }
- close T;
- $job->{'Subject'} //= '?';
- $job->{'ID'} or die "Cannot load $file: Missing ID";
- return bless $job;
-}
-
-sub attr($$;$) {
- my ($job, $attr, $val) = @_;
- $job->{$attr} = $val if defined $val;
- return $job->{$attr};
-}
-
-sub dump($) {
- my ($job) = @_;
- for my $k (sort keys %$job) {
- print "$k: ", $job->{$k}, "\n";
- }
-}
-
-sub save($;$) {
- my ($job, $fn) = @_;
- -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!";
- $fn //= 'tmp/' . $job->{'ID'};
- open T, '>', $fn or die "Cannot create $fn: $!";
- for my $k (sort grep { /^[A-Z]/ } keys %$job) {
- print T "$k: ", $job->{$k}, "\n";
- }
- print T "\n";
- print T $job->{'body'} if defined $job->{'body'};
- close T;
- return $fn;
-}
-
-package BEX::Queue;
-
-sub new($;$) {
- my ($class, $name) = @_;
- $name //= 'queue';
- -d $name or die "Queue directory $name does not exist\n";
- for my $d ("hosts", "jobs") {
- -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
- }
- my $queue = {
- 'Name' => $name,
- 'MetaCache' => {},
- };
- return bless $queue;
-}
-
-sub host_dir($$) {
- my ($queue, $machine) = @_;
- return $queue->{'Name'} . '/hosts/' . $machine;
-}
-
-sub queue_file($$) {
- my ($queue, $machine, $jid) = @_;
- return $queue->host_dir($machine) . '/' . $jid . '.job';
-}
-
-sub status_file($$) {
- my ($queue, $machine, $jid) = @_;
- return $queue->host_dir($machine) . '/' . $jid . '.stat';
-}
-
-sub temp_file($$) {
- my ($queue, $machine, $jid) = @_;
- return $queue->host_dir($machine) . '/' . $jid . '.tmp';
-}
-
-sub job_file($$) {
- my ($queue, $jid) = @_;
- return $queue->{'Name'} . '/jobs/' . $jid. '.job';
-}
-
-sub enqueue($$$) {
- my ($queue, $machine, $job) = @_;
- my $qf = $queue->queue_file($machine, $job->{'ID'});
- if (-f $qf) { return 0; }
- my $fn = $queue->job_file($job->{'ID'});
- -f $fn or $job->save($fn);
- my $dir = $queue->host_dir($machine);
- -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
- symlink '../../jobs/' . $job->{'ID'} . '.job', $qf or die "Cannot create $qf: $!";
- return 1;
-}
-
-sub scan($$) {
- my ($queue, $machine) = @_;
- my @list = ();
- if (opendir D, $queue->host_dir($machine)) {
- while ($_ = readdir D) {
- /^\./ and next;
- s{\.job}{} or next;
- push @list, $_;
- }
- closedir D;
- }
- return @list;
-}
-
-sub remove($$) {
- my ($queue, $machine, $jid) = @_;
- unlink $queue->queue_file($machine, $jid);
- unlink $queue->status_file($machine, $jid);
- unlink $queue->temp_file($machine, $jid);
-}
-
-sub job_metadata($$) {
- my ($queue, $jid) = @_;
- my $cache = $queue->{'MetaCache'};
- if (!defined $cache->{$jid}) {
- $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
- }
- return $cache->{$jid};
-}
-
-sub read_job_status($$$) {
- my ($queue, $machine, $jid) = @_;
- my %s = ();
- my $sf = $queue->status_file($machine, $jid);
- if (open S, '<', $sf) {
- while (<S>) {
- chomp;
- /^(\w+):\s*(.*)/ or die "Parse error in $sf";
- $s{$1} = $2;
- }
- close S;
- }
- return \%s;
-}
-
-sub write_job_status($$$$) {
- my ($queue, $machine, $jid, $stat) = @_;
- my $sf = $queue->status_file($machine, $jid);
- open S, '>', $sf or die "Cannot create $sf: $!";
- for my $k (sort keys %$stat) {
- print S "$k: ", $stat->{$k}, "\n";
- }
- close S;
-}
+use BEX::Config;
+use BEX::Job;
+use BEX::Queue;
42;
--- /dev/null
+# Batch EXecutor 2.0 -- Jobs
+# (c) 2011 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+
+package BEX::Job;
+
+use POSIX ();
+
+our $job_cnt = 0;
+
+sub new($;$) {
+ my ($class, $id) = @_;
+ my $job = { };
+ bless $job;
+ if (defined $id) {
+ $job->{'ID'} = $id;
+ } else {
+ $job_cnt++;
+ $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime);
+ }
+ $job->{'Subject'} = '(no subject)';
+ return $job;
+}
+
+sub new_from_file($$;$) {
+ my ($class, $file, $header_only) = @_;
+ my $job = { };
+ open T, '<', $file or die "Cannot open $file: $!";
+ while (<T>) {
+ chomp;
+ /^$/ and last;
+ /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error";
+ !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined";
+ $job->{$1} = $2;
+ }
+ if (!$header_only) {
+ my @cmds = <T>;
+ $job->{'body'} = join("", @cmds);
+ }
+ close T;
+ $job->{'Subject'} //= '?';
+ $job->{'ID'} or die "Cannot load $file: Missing ID";
+ $job->{'ID'} !~ /\.[a-z]+$/ or die "Cannot load $file: Invalid ID syntax";
+ return bless $job;
+}
+
+sub id($) {
+ return $_->{'ID'};
+}
+
+sub attr($$;$) {
+ my ($job, $attr, $val) = @_;
+ $job->{$attr} = $val if defined $val;
+ return $job->{$attr};
+}
+
+sub dump($) {
+ my ($job) = @_;
+ for my $k (sort keys %$job) {
+ print "$k: ", $job->{$k}, "\n";
+ }
+}
+
+sub save($;$) {
+ my ($job, $fn) = @_;
+ -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!";
+ $fn //= 'tmp/' . $job->id;
+ open T, '>', $fn or die "Cannot create $fn: $!";
+ for my $k (sort grep { /^[A-Z]/ } keys %$job) {
+ print T "$k: ", $job->{$k}, "\n";
+ }
+ print T "\n";
+ print T $job->{'body'} if defined $job->{'body'};
+ close T;
+ return $fn;
+}
+
+42;
--- /dev/null
+# Batch EXecutor 2.0 -- Queues
+# (c) 2011 Martin Mares <mj@ucw.cz>
+
+use strict;
+use warnings;
+
+package BEX::Queue;
+
+use IO::File;
+use POSIX ();
+
+sub new($;$) {
+ my ($class, $name) = @_;
+ $name //= 'queue';
+ -d $name or die "Queue directory $name does not exist\n";
+ for my $d ("hosts", "jobs") {
+ -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
+ }
+ my $queue = {
+ 'Name' => $name,
+ 'MetaCache' => {},
+ };
+ return bless $queue;
+}
+
+# Most actions have to be logged by the caller
+sub log($$$$;$) {
+ my ($queue, $mach, $jid, $stat, $msg) = @_;
+ my $fh = $queue->{'LogFH'} //= new IO::File '>>', $queue->{'Name'} . '/log' or die "Cannot open log: $!";
+ my $m = join(" ", POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime), $mach, $jid, $stat);
+ $m .= " $msg" if defined $msg;
+ print $fh "$m\n";
+}
+
+sub host_dir($$) {
+ my ($queue, $machine) = @_;
+ return $queue->{'Name'} . '/hosts/' . $machine;
+}
+
+sub queue_file($$) {
+ my ($queue, $machine, $jid) = @_;
+ return $queue->host_dir($machine) . '/' . $jid . '.job';
+}
+
+sub status_file($$) {
+ my ($queue, $machine, $jid) = @_;
+ return $queue->host_dir($machine) . '/' . $jid . '.stat';
+}
+
+sub temp_file($$) {
+ my ($queue, $machine, $jid) = @_;
+ return $queue->host_dir($machine) . '/' . $jid . '.tmp';
+}
+
+sub job_file($$) {
+ my ($queue, $jid) = @_;
+ return $queue->{'Name'} . '/jobs/' . $jid. '.job';
+}
+
+sub enqueue($$$) {
+ my ($queue, $machine, $job) = @_;
+ my $qf = $queue->queue_file($machine, $job->id);
+ if (-f $qf) {
+ $queue->log($machine, $job->id, 'REQUEUE');
+ return 0;
+ }
+ my $fn = $queue->job_file($job->id);
+ -f $fn or $job->save($fn);
+ my $dir = $queue->host_dir($machine);
+ -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
+ symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
+ $queue->log($machine, $job->id, 'QUEUE');
+ return 1;
+}
+
+sub scan($$) {
+ my ($queue, $machine) = @_;
+ my @list = ();
+ if (opendir D, $queue->host_dir($machine)) {
+ while ($_ = readdir D) {
+ /^\./ and next;
+ s{\.job}{} or next;
+ push @list, $_;
+ }
+ closedir D;
+ }
+ return @list;
+}
+
+sub remove($$) {
+ my ($queue, $machine, $jid) = @_;
+ unlink $queue->queue_file($machine, $jid);
+ unlink $queue->status_file($machine, $jid);
+ unlink $queue->temp_file($machine, $jid);
+}
+
+sub job_metadata($$) {
+ my ($queue, $jid) = @_;
+ my $cache = $queue->{'MetaCache'};
+ if (!defined $cache->{$jid}) {
+ $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
+ }
+ return $cache->{$jid};
+}
+
+sub read_job_status($$$) {
+ my ($queue, $machine, $jid) = @_;
+ my %s = ();
+ my $sf = $queue->status_file($machine, $jid);
+ if (open S, '<', $sf) {
+ while (<S>) {
+ chomp;
+ /^(\w+):\s*(.*)/ or die "Parse error in $sf";
+ $s{$1} = $2;
+ }
+ close S;
+ }
+ return \%s;
+}
+
+sub write_job_status($$$$) {
+ my ($queue, $machine, $jid, $stat) = @_;
+ my $sf = $queue->status_file($machine, $jid);
+ open S, '>', $sf or die "Cannot create $sf: $!";
+ for my $k (sort keys %$stat) {
+ print S "$k: ", $stat->{$k}, "\n";
+ }
+ close S;
+}
+
+42;