/<job-id>.log (Optional) transcript of output produced by the job (including
previous failed attempts)
-<queue>/jobs/<job-id>.job All jobs issued on this queue, including those which
+<queue>/jobs/ All jobs issued on this queue, including those which
are no longer queued for any machine
+ /<job-id>.job Description of the job (see below)
+ /<job-id>.attach/ A directory containing attachments (if any)
<queue>/history/<hostname>/ Successfully completed jobs (their .job, .stat and .log files)
are moved here if the keep_history config switch is set.
#!/usr/bin/perl
# Batch EXecutor 3.0 -- Insert to Queue
-# (c) 2011-2012 Martin Mares <mj@ucw.cz>
+# (c) 2011-2013 Martin Mares <mj@ucw.cz>
use strict;
use warnings;
use Getopt::Long;
use File::stat;
+use File::Spec;
use BEX;
my $given_body;
my $requeue_id;
my $given_subject;
my $given_template;
+my @attach = ();
sub usage() {
print <<AMEN ;
Usage: bex add [<options>] [!]<machine-or-class> ...
Options:
+-a, --attach=<path> Attach a file or directory to the job
-b, --body=<file> Load job body from the given file
-e, --execute=<command> Set job body to the given command
-g, --go Do not run editor, go enqueue the job immediately
-i, --id=<id> Set job ID of the new job
-q, --queue=<name> Insert new jobs to the given queue
-r, --requeue=<id> Re-queue an existing job instead of creating a new one
+ (and possibly add/replace its attachments)
-s, --subject=<subj> Set subject of the new job
-t, --template=<file> Load job template (headers and body) from the given file
AMEN
}
GetOptions(
+ "a|attach=s" => \@attach,
"b|body=s" => \$given_body,
"e|execute=s" => \$given_execute,
"g|go!" => \$given_go,
my @machines = BEX::Config::parse_machine_list(@ARGV);
@machines or die "No machines match\n";
+# Verify that all attachments exist
+for my $a (@attach) {
+ -f $a || -d $a or die "Attachment $a does not exist\n";
+}
+
my $queue = BEX::Queue->new($queue_name);
my $job;
my $tmp_fn;
# Put the job to the queue
print "New job ", $job->id, "\n";
$queue->save_job($job);
+
+# Create attachments
+if (@attach) {
+ my $adir = $queue->attachment_dir($job->id);
+ -d $adir || mkdir $adir or die "Cannot create $adir: $!\n";
+ for my $asrc (@attach) {
+ $asrc =~ s{/+$}{};
+ my ($vol, $dir, $base) = File::Spec->splitpath($asrc);
+ my $adest = File::Spec->catfile($adir, $base);
+ my $msg = "Attached";
+ if (-e $adest) {
+ system "/bin/rm", "-rf", $adest;
+ $? and die "Cannot delete old attachment $adest\n";
+ $msg = "Updated attachment";
+ }
+ system "/bin/cp", "-a", $asrc, $adest;
+ $? and die "Cannot copy attachment $asrc to $adest\n";
+ if (-d $asrc) {
+ print "$msg $base/...\n";
+ } else {
+ print "$msg $base\n";
+ }
+ }
+}
+
+# Enqueue the job on all machines
for my $m (@machines) {
if ($queue->enqueue($m, $job)) {
$queue->update_job_status($m, $job->id, 'NEW');
for my $j (@{$machs{$m}}) {
my $st = get_status($m, $j);
my $s = get_stat($m, $j);
+ my $adir = $queue->attachment_dir($j);
+ my $has_adir = -d $adir;
$cnt_by_job{$j}{$s}++;
$cnt_by_mach{$m}{$s}++;
$why{$m}{$j} = "";
if ($filenames) {
$why{$m}{$j} .= "\t\t== Job file: " . $queue->queue_file($m, $j) . "\n";
+ $why{$m}{$j} .= "\t\t== Attachments: $adir\n" if $has_adir;
}
if (defined($st->{'Time'}) && defined($st->{'Status'})) {
$stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' .
if ($mach_locked{$m} || $queue->is_locked($m, $j)) {
$stat{$m}{$j} .= ' [LOCKED]';
}
+ $stat{$m}{$j} .= ' +ATT' if $has_adir;
}
}
}
}
-sub run_job_body($$$) {
+sub make_job_body($$$) {
my ($job, $queue, $mach) = @_;
-
- if ($job->attr('body') =~ /^\s*$/s) {
- # Shortcut if the body is empty
- return 'OK'
- }
-
- my $host = BEX::Config::host_name($mach);
my $jid = $job->id;
-
my $tmp = $queue->temp_file($mach, $jid);
open T, '>', $tmp or die;
if (defined $BEX::Config::job_prolog) {
close E;
}
close T;
+ return ('OK', $tmp);
+}
+
+sub run_simple_job($$$$) {
+ my ($job, $queue, $mach, $body) = @_;
+
+ my $host = BEX::Config::host_name($mach);
+ my $jid = $job->id;
update_status($mach, $jid, 'SEND', undef);
+
my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
- my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
+ my $rtmp = `$BEX::Config::ssh_command <$body $host '$cmd'`;
!$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
chomp $rtmp;
}
}
+sub run_complex_job($$$$) {
+ my ($job, $queue, $mach, $body) = @_;
+
+ my $host = BEX::Config::host_name($mach);
+ my $jid = $job->id;
+
+ update_status($mach, $jid, 'SEND', undef);
+
+ my $cmd = 't=$(mktemp -d -t bex-XXXXXXXX) && cd $t && cat >job && chmod u+x job && mkdir attach && echo $t';
+ my $rtmp = `$BEX::Config::ssh_command <$body $host '$cmd'` // "";
+ !$? && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
+ chomp $rtmp;
+
+ # Send attachments. We created an extra level of directory hierarchy for attachments
+ # to avoid rsync relaxing permissions on the temporary directory.
+ my $adir = $queue->attachment_dir($jid);
+ `$BEX::Config::rsync_command $adir/ $host:$rtmp/attach/`;
+ !$? or return ('NOXREF', 'Attachment transfer failed');
+
+ update_status($mach, $jid, 'RUN', $queue);
+ my $lf = $queue->log_file($mach, $jid);
+ system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command -t $host 'cd $rtmp/attach && $rtmp/job ; e=\$? ; rm -rf $rtmp ; exit \$e' 2>&1 | tee -a $lf";
+ if ($?) {
+ return ('FAILED', 'Job failed ' . exit_status($?));
+ } else {
+ return 'OK';
+ }
+}
+
sub run_job($$$) {
my ($job, $queue, $mach) = @_;
my ($stat, $msg);
($stat, $msg) = run_job_prep($job, $queue, $mach);
$stat eq 'OK' or return ($stat, $msg);
- return run_job_body($job, $queue, $mach);
+ if ($job->attr('body') =~ /^\s*$/s) {
+ # Shortcut if the body is empty
+ return 'OK'
+ }
+
+ ($stat, $msg) = make_job_body($job, $queue, $mach);
+ $stat eq 'OK' or return ($stat, $msg);
+ my $body = $msg;
+
+ if (-d $queue->attachment_dir($job->id)) {
+ return run_complex_job($job, $queue, $mach, $body);
+ } else {
+ return run_simple_job($job, $queue, $mach, $body);
+ }
}
my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
# How we run ssh (including options)
our $ssh_command = "ssh";
+# How we run rsync to upload attachments (including options)
+our $rsync_command = "rsync -a";
+
# Various utility functions
sub parse_machine_list(@);
return $queue->{'Path'} . '/jobs/' . $jid. '.job';
}
+sub attachment_dir($$) {
+ my ($queue, $jid) = @_;
+ return $queue->{'Path'} . '/jobs/' . $jid. '.attach';
+}
+
sub save_job($$) {
my ($queue, $job) = @_;
# If the job already exists, it is shamelessly rewritten by new contents