]> mj.ucw.cz Git - bex.git/blob - lib/bin/bex-run
run: make ssh allocate a PTY
[bex.git] / lib / bin / bex-run
1 #!/usr/bin/perl
2 # Batch EXecutor 3.0 -- Run Queued Jobs
3 # (c) 2011-2012 Martin Mares <mj@ucw.cz>
4
5 use strict;
6 use warnings;
7 use Getopt::Long;
8 use BEX;
9
10 sub usage() {
11         print <<AMEN ;
12 Usage: bex run [<options>] [[!]<machine-or-class> ...]
13
14 Options:
15 -j, --job=<id>          Run only the specified job
16 -q, --queue=<name>      Select job queue
17     --status-fifo=<f>   Send status updates to the given named pipe
18 AMEN
19         exit 0;
20 }
21
22 my $given_job;
23 my $queue_name;
24 my $status_fifo;
25
26 GetOptions(
27         "j|job=s" => \$given_job,
28         "q|queue=s" => \$queue_name,
29         "s|status-fifo=s" => \$status_fifo,
30         "help" => \&usage,
31 ) or die "Try `bex run --help' for more information.\n";
32
33 my $status_fd;
34 if (defined $status_fifo) {
35         open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
36         autoflush $status_fd, 1;
37 }
38
39 sub update_status($$$$;$) {
40         my ($mach, $job, $status, $log_on_queue, $msg) = @_;
41         if ($status_fd) {
42                 print $status_fd "! $mach $job $status\n";
43         }
44         if ($log_on_queue) {
45                 $log_on_queue->update_job_status($mach, $job, $status, $msg);
46         }
47 }
48
49 my %pings;
50
51 sub ping_machine($) {
52         my ($mach) = @_;
53         if (!defined $pings{$mach}) {
54                 if ($BEX::Config::ping_hosts) {
55                         update_status($mach, '-', 'PING', undef);
56                         my $host = BEX::Config::host_name($mach);
57                         `ping -c1 -n $host >/dev/null 2>/dev/null`;
58                         $pings{$mach} = !$?;
59                 } else {
60                         $pings{$mach} = 1;
61                 }
62         }
63         if ($pings{$mach}) {
64                 return ('OK', undef);
65         } else {
66                 return ('NOPING', 'Does not ping');
67         }
68 }
69
70 sub exit_status($) {
71         my ($s) = @_;
72         if ($s >> 8) {
73                 return "with exit code " . ($s >> 8);
74         } else {
75                 return "on fatal signal " . ($s & 127);
76         }
77 }
78
79 sub run_job_prep($$$) {
80         my ($job, $queue, $mach) = @_;
81         my $prep = $job->attr('Prep');
82         defined($prep) && $prep !~ /^\s*$/ or return 'OK';
83
84         my $jid = $job->id;
85         update_status($mach, $jid, 'PREP', $queue);
86         my $lf = $queue->log_file($mach, $jid);
87         $ENV{'HOST'} = BEX::Config::host_name($mach);
88         system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
89         delete $ENV{'HOST'};
90         if ($?) {
91                 return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
92         } else {
93                 return 'OK';
94         }
95 }
96
97 sub run_job_body($$$) {
98         my ($job, $queue, $mach) = @_;
99
100         if ($job->attr('body') =~ /^\s*$/s) {
101                 # Shortcut if the body is empty
102                 return 'OK'
103         }
104
105         my $host = BEX::Config::host_name($mach);
106         my $jid = $job->id;
107
108         my $tmp = $queue->temp_file($mach, $jid);
109         open T, '>', $tmp or die;
110         if (defined $BEX::Config::job_prolog) {
111                 open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
112                 while (<P>) { print T; }
113                 close P;
114         } else {
115                 print T "#!/bin/sh\n";
116         }
117         print T "# BEX job ", $jid, "\n";
118         print T $job->attr('body');
119         if (defined $BEX::Config::job_epilog) {
120                 open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
121                 while (<E>) { print T; }
122                 close E;
123         }
124         close T;
125
126         update_status($mach, $jid, 'SEND', undef);
127         my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
128         my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
129         !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
130         chomp $rtmp;
131
132         update_status($mach, $jid, 'RUN', $queue);
133         my $lf = $queue->log_file($mach, $jid);
134         system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command -t $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
135         if ($?) {
136                 return ('FAILED', 'Job failed ' . exit_status($?));
137         } else {
138                 return 'OK';
139         }
140 }
141
142 sub run_job($$$) {
143         my ($job, $queue, $mach) = @_;
144         my ($stat, $msg);
145
146         ($stat, $msg) = ping_machine($mach);
147         $stat eq 'OK' or return ($stat, $msg);
148
149         ($stat, $msg) = run_job_prep($job, $queue, $mach);
150         $stat eq 'OK' or return ($stat, $msg);
151
152         return run_job_body($job, $queue, $mach);
153 }
154
155 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
156 my $queue = BEX::Queue->new($queue_name);
157
158 $queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
159
160 for my $mach (@machines) {
161         my @q = $queue->scan($mach) or next;
162         if (!$queue->lock($mach, undef)) {
163                 print "### Machine $mach is locked by another brun, skipping...\n";
164                 update_status($mach, '-', 'LOCKED', undef);
165                 update_status($mach, '-', 'DONE', undef);
166                 next;
167         }
168         update_status($mach, '-', 'INIT', undef);
169         while (my $jid = shift @q) {
170                 if (defined $given_job) {
171                         $jid eq $given_job or next;
172                 }
173                 my $job = BEX::Job->new_from_file($queue->job_file($jid));
174                 update_status($mach, $jid, 'INIT', undef);
175                 if (!$queue->lock($mach, $jid)) {
176                         print "### Skipping locked $jid on $mach ###\n";
177                         update_status($mach, $jid, 'LOCKED', undef);
178                         next;
179                 }
180                 print "### Running ", $job->name, " on $mach ###\n";
181                 my ($s, $msg) = run_job($job, $queue, $mach);
182                 update_status($mach, $jid, $s, $queue, $msg);
183
184                 if ($s eq 'OK') {
185                         print "+++ OK\n";
186                         $queue->remove($mach, $jid);
187                 } else {
188                         print "--- $s: $msg\n";
189                         if ($BEX::Config::skip_on_fail) {
190                                 print "### Skipping other jobs on the same host ###\n" if @q;
191                                 last;
192                         }
193                 }
194         }
195 } continue {
196         update_status($mach, '-', 'DONE', undef);
197 }
198 $queue->unlock;