From: Martin Mares Date: Fri, 14 Jun 2013 15:05:19 +0000 (+0200) Subject: Implemented job attachments X-Git-Tag: v3.2~6 X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=d4674b5aa83e0d917910e67634b0371f3d41c16b;p=bex.git Implemented job attachments --- diff --git a/NOTES b/NOTES index 28adf19..499ad43 100644 --- a/NOTES +++ b/NOTES @@ -9,8 +9,10 @@ /.log (Optional) transcript of output produced by the job (including previous failed attempts) -/jobs/.job All jobs issued on this queue, including those which +/jobs/ All jobs issued on this queue, including those which are no longer queued for any machine + /.job Description of the job (see below) + /.attach/ A directory containing attachments (if any) /history// Successfully completed jobs (their .job, .stat and .log files) are moved here if the keep_history config switch is set. diff --git a/lib/bin/bex-add b/lib/bin/bex-add index 56f58fa..4f2c3b3 100755 --- a/lib/bin/bex-add +++ b/lib/bin/bex-add @@ -1,11 +1,12 @@ #!/usr/bin/perl # Batch EXecutor 3.0 -- Insert to Queue -# (c) 2011-2012 Martin Mares +# (c) 2011-2013 Martin Mares use strict; use warnings; use Getopt::Long; use File::stat; +use File::Spec; use BEX; my $given_body; @@ -16,18 +17,21 @@ my $queue_name; my $requeue_id; my $given_subject; my $given_template; +my @attach = (); sub usage() { print <] [!] ... Options: +-a, --attach= Attach a file or directory to the job -b, --body= Load job body from the given file -e, --execute= Set job body to the given command -g, --go Do not run editor, go enqueue the job immediately -i, --id= Set job ID of the new job -q, --queue= Insert new jobs to the given queue -r, --requeue= Re-queue an existing job instead of creating a new one + (and possibly add/replace its attachments) -s, --subject= Set subject of the new job -t, --template= Load job template (headers and body) from the given file AMEN @@ -35,6 +39,7 @@ AMEN } GetOptions( + "a|attach=s" => \@attach, "b|body=s" => \$given_body, "e|execute=s" => \$given_execute, "g|go!" => \$given_go, @@ -51,6 +56,11 @@ GetOptions( my @machines = BEX::Config::parse_machine_list(@ARGV); @machines or die "No machines match\n"; +# Verify that all attachments exist +for my $a (@attach) { + -f $a || -d $a or die "Attachment $a does not exist\n"; +} + my $queue = BEX::Queue->new($queue_name); my $job; my $tmp_fn; @@ -98,6 +108,32 @@ if (defined $requeue_id) { # Put the job to the queue print "New job ", $job->id, "\n"; $queue->save_job($job); + +# Create attachments +if (@attach) { + my $adir = $queue->attachment_dir($job->id); + -d $adir || mkdir $adir or die "Cannot create $adir: $!\n"; + for my $asrc (@attach) { + $asrc =~ s{/+$}{}; + my ($vol, $dir, $base) = File::Spec->splitpath($asrc); + my $adest = File::Spec->catfile($adir, $base); + my $msg = "Attached"; + if (-e $adest) { + system "/bin/rm", "-rf", $adest; + $? and die "Cannot delete old attachment $adest\n"; + $msg = "Updated attachment"; + } + system "/bin/cp", "-a", $asrc, $adest; + $? and die "Cannot copy attachment $asrc to $adest\n"; + if (-d $asrc) { + print "$msg $base/...\n"; + } else { + print "$msg $base\n"; + } + } +} + +# Enqueue the job on all machines for my $m (@machines) { if ($queue->enqueue($m, $job)) { $queue->update_job_status($m, $job->id, 'NEW'); diff --git a/lib/bin/bex-queue b/lib/bin/bex-queue index a109f70..18e7556 100755 --- a/lib/bin/bex-queue +++ b/lib/bin/bex-queue @@ -117,11 +117,14 @@ sub do_ls() for my $j (@{$machs{$m}}) { my $st = get_status($m, $j); my $s = get_stat($m, $j); + my $adir = $queue->attachment_dir($j); + my $has_adir = -d $adir; $cnt_by_job{$j}{$s}++; $cnt_by_mach{$m}{$s}++; $why{$m}{$j} = ""; if ($filenames) { $why{$m}{$j} .= "\t\t== Job file: " . $queue->queue_file($m, $j) . "\n"; + $why{$m}{$j} .= "\t\t== Attachments: $adir\n" if $has_adir; } if (defined($st->{'Time'}) && defined($st->{'Status'})) { $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' . @@ -139,6 +142,7 @@ sub do_ls() if ($mach_locked{$m} || $queue->is_locked($m, $j)) { $stat{$m}{$j} .= ' [LOCKED]'; } + $stat{$m}{$j} .= ' +ATT' if $has_adir; } } diff --git a/lib/bin/bex-run b/lib/bin/bex-run index 5ace224..e540053 100755 --- a/lib/bin/bex-run +++ b/lib/bin/bex-run @@ -97,17 +97,9 @@ sub run_job_prep($$$) { } } -sub run_job_body($$$) { +sub make_job_body($$$) { my ($job, $queue, $mach) = @_; - - if ($job->attr('body') =~ /^\s*$/s) { - # Shortcut if the body is empty - return 'OK' - } - - my $host = BEX::Config::host_name($mach); my $jid = $job->id; - my $tmp = $queue->temp_file($mach, $jid); open T, '>', $tmp or die; if (defined $BEX::Config::job_prolog) { @@ -125,10 +117,19 @@ sub run_job_body($$$) { close E; } close T; + return ('OK', $tmp); +} + +sub run_simple_job($$$$) { + my ($job, $queue, $mach, $body) = @_; + + my $host = BEX::Config::host_name($mach); + my $jid = $job->id; update_status($mach, $jid, 'SEND', undef); + my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t'; - my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`; + my $rtmp = `$BEX::Config::ssh_command <$body $host '$cmd'`; !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed'); chomp $rtmp; @@ -142,6 +143,35 @@ sub run_job_body($$$) { } } +sub run_complex_job($$$$) { + my ($job, $queue, $mach, $body) = @_; + + my $host = BEX::Config::host_name($mach); + my $jid = $job->id; + + update_status($mach, $jid, 'SEND', undef); + + my $cmd = 't=$(mktemp -d -t bex-XXXXXXXX) && cd $t && cat >job && chmod u+x job && mkdir attach && echo $t'; + my $rtmp = `$BEX::Config::ssh_command <$body $host '$cmd'` // ""; + !$? && $rtmp ne '' or return ('NOXFER', 'Transfer failed'); + chomp $rtmp; + + # Send attachments. We created an extra level of directory hierarchy for attachments + # to avoid rsync relaxing permissions on the temporary directory. + my $adir = $queue->attachment_dir($jid); + `$BEX::Config::rsync_command $adir/ $host:$rtmp/attach/`; + !$? or return ('NOXREF', 'Attachment transfer failed'); + + update_status($mach, $jid, 'RUN', $queue); + my $lf = $queue->log_file($mach, $jid); + system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command -t $host 'cd $rtmp/attach && $rtmp/job ; e=\$? ; rm -rf $rtmp ; exit \$e' 2>&1 | tee -a $lf"; + if ($?) { + return ('FAILED', 'Job failed ' . exit_status($?)); + } else { + return 'OK'; + } +} + sub run_job($$$) { my ($job, $queue, $mach) = @_; my ($stat, $msg); @@ -152,7 +182,20 @@ sub run_job($$$) { ($stat, $msg) = run_job_prep($job, $queue, $mach); $stat eq 'OK' or return ($stat, $msg); - return run_job_body($job, $queue, $mach); + if ($job->attr('body') =~ /^\s*$/s) { + # Shortcut if the body is empty + return 'OK' + } + + ($stat, $msg) = make_job_body($job, $queue, $mach); + $stat eq 'OK' or return ($stat, $msg); + my $body = $msg; + + if (-d $queue->attachment_dir($job->id)) { + return run_complex_job($job, $queue, $mach, $body); + } else { + return run_simple_job($job, $queue, $mach, $body); + } } my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); diff --git a/lib/perl/BEX/Config.pm b/lib/perl/BEX/Config.pm index 8a36d45..c5a422d 100644 --- a/lib/perl/BEX/Config.pm +++ b/lib/perl/BEX/Config.pm @@ -56,6 +56,9 @@ our $skip_on_fail = 0; # How we run ssh (including options) our $ssh_command = "ssh"; +# How we run rsync to upload attachments (including options) +our $rsync_command = "rsync -a"; + # Various utility functions sub parse_machine_list(@); diff --git a/lib/perl/BEX/Queue.pm b/lib/perl/BEX/Queue.pm index 298c25d..d3c42be 100644 --- a/lib/perl/BEX/Queue.pm +++ b/lib/perl/BEX/Queue.pm @@ -73,6 +73,11 @@ sub job_file($$) { return $queue->{'Path'} . '/jobs/' . $jid. '.job'; } +sub attachment_dir($$) { + my ($queue, $jid) = @_; + return $queue->{'Path'} . '/jobs/' . $jid. '.attach'; +} + sub save_job($$) { my ($queue, $job) = @_; # If the job already exists, it is shamelessly rewritten by new contents