2 # Batch EXecutor 3.0 -- Run Queued Jobs
3 # (c) 2011-2012 Martin Mares <mj@ucw.cz>
15 "j|job=s" => \$given_job,
16 "q|queue=s" => \$queue_name,
17 "s|status-fifo=s" => \$status_fifo,
19 Usage: bex run [<options>] [[!]<machine-or-class> ...]
22 -j, --job=<id> Run only the specified job
23 -q, --queue=<name> Select job queue
24 --status-fifo=<f> Send status updates to the given named pipe
28 if (defined $status_fifo) {
29 open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
30 autoflush $status_fd, 1;
33 sub update_status($$$$;$) {
34 my ($mach, $job, $status, $log_on_queue, $msg) = @_;
36 print $status_fd "! $mach $job $status\n";
39 $log_on_queue->update_job_status($mach, $job, $status, $msg);
47 if (!defined $pings{$mach}) {
48 if ($BEX::Config::ping_hosts) {
49 update_status($mach, '-', 'PING', undef);
50 my $host = BEX::Config::host_name($mach);
51 `ping -c1 -n $host >/dev/null 2>/dev/null`;
60 return ('NOPING', 'Does not ping');
67 return "with exit code " . ($s >> 8);
69 return "on fatal signal " . ($s & 127);
73 sub run_job_prep($$$) {
74 my ($job, $queue, $mach) = @_;
75 my $prep = $job->attr('Prep');
76 defined($prep) && $prep !~ /^\s*$/ or return 'OK';
79 update_status($mach, $jid, 'PREP', $queue);
80 my $lf = $queue->log_file($mach, $jid);
81 $ENV{'HOST'} = BEX::Config::host_name($mach);
82 system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
85 return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
91 sub run_job_body($$$) {
92 my ($job, $queue, $mach) = @_;
94 if ($job->attr('body') =~ /^\s*$/s) {
95 # Shortcut if the body is empty
99 my $host = BEX::Config::host_name($mach);
102 my $tmp = $queue->temp_file($mach, $jid);
103 open T, '>', $tmp or die;
104 if (defined $BEX::Config::job_prolog) {
105 open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
106 while (<P>) { print T; }
109 print T "#!/bin/sh\n";
111 print T "# BEX job ", $jid, "\n";
112 print T $job->attr('body');
113 if (defined $BEX::Config::job_epilog) {
114 open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
115 while (<E>) { print T; }
120 update_status($mach, $jid, 'SEND', undef);
121 my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
122 my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
123 !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
126 update_status($mach, $jid, 'RUN', $queue);
127 my $lf = $queue->log_file($mach, $jid);
128 system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
130 return ('FAILED', 'Job failed ' . exit_status($?));
137 my ($job, $queue, $mach) = @_;
140 ($stat, $msg) = ping_machine($mach);
141 $stat eq 'OK' or return ($stat, $msg);
143 ($stat, $msg) = run_job_prep($job, $queue, $mach);
144 $stat eq 'OK' or return ($stat, $msg);
146 return run_job_body($job, $queue, $mach);
149 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
150 my $queue = BEX::Queue->new($queue_name);
152 $queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
154 for my $mach (@machines) {
155 my @q = $queue->scan($mach) or next;
156 if (!$queue->lock($mach, undef)) {
157 print "### Machine $mach is locked by another brun, skipping...\n";
158 update_status($mach, '-', 'LOCKED', undef);
159 update_status($mach, '-', 'DONE', undef);
162 update_status($mach, '-', 'INIT', undef);
163 while (my $jid = shift @q) {
164 if (defined $given_job) {
165 $jid eq $given_job or next;
167 my $job = BEX::Job->new_from_file($queue->job_file($jid));
168 update_status($mach, $jid, 'INIT', undef);
169 if (!$queue->lock($mach, $jid)) {
170 print "### Skipping locked $jid on $mach ###\n";
171 update_status($mach, $jid, 'LOCKED', undef);
174 print "### Running ", $job->name, " on $mach ###\n";
175 my ($s, $msg) = run_job($job, $queue, $mach);
176 update_status($mach, $jid, $s, $queue, $msg);
180 $queue->remove($mach, $jid);
182 print "--- $s: $msg\n";
183 if ($BEX::Config::skip_on_fail) {
184 print "### Skipping other jobs on the same host ###\n" if @q;
190 update_status($mach, '-', 'DONE', undef);