]> mj.ucw.cz Git - bex.git/blob - lib/bin/run
The big move -- introduced subcommands
[bex.git] / lib / bin / run
1 #!/usr/bin/perl
2 # Batch EXecutor 3.0 -- Insert to Queue
3 # (c) 2011-2012 Martin Mares <mj@ucw.cz>
4
5 use strict;
6 use warnings;
7 use Getopt::Long;
8 use BEX;
9
10 my $given_job;
11 my $queue_name;
12 my $status_fifo;
13
14 GetOptions(
15         "j|job=s" => \$given_job,
16         "q|queue=s" => \$queue_name,
17         "s|status-fifo=s" => \$status_fifo,
18 ) or die <<AMEN ;
19 Usage: brun [<options>] [[!]<machine-or-class> ...]
20
21 Options:
22 -j, --job=<id>          Run only the specified job
23 -q, --queue=<name>      Select job queue
24     --status-fifo=<f>   Send status updates to the given named pipe
25 AMEN
26
27 my $status_fd;
28 if (defined $status_fifo) {
29         open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
30         autoflush $status_fd, 1;
31 }
32
33 sub update_status($$$$;$) {
34         my ($mach, $job, $status, $log_on_queue, $msg) = @_;
35         if ($status_fd) {
36                 print $status_fd "! $mach $job $status\n";
37         }
38         if ($log_on_queue) {
39                 $log_on_queue->update_job_status($mach, $job, $status, $msg);
40         }
41 }
42
43 my %pings;
44
45 sub ping_machine($) {
46         my ($mach) = @_;
47         if (!defined $pings{$mach}) {
48                 if ($BEX::Config::ping_hosts) {
49                         update_status($mach, '-', 'PING', undef);
50                         my $host = BEX::Config::host_name($mach);
51                         `ping -c1 -n $host >/dev/null 2>/dev/null`;
52                         $pings{$mach} = !$?;
53                 } else {
54                         $pings{$mach} = 1;
55                 }
56         }
57         if ($pings{$mach}) {
58                 return ('OK', undef);
59         } else {
60                 return ('NOPING', 'Does not ping');
61         }
62 }
63
64 sub exit_status($) {
65         my ($s) = @_;
66         if ($s >> 8) {
67                 return "with exit code " . ($s >> 8);
68         } else {
69                 return "on fatal signal " . ($s & 127);
70         }
71 }
72
73 sub run_job_prep($$$) {
74         my ($job, $queue, $mach) = @_;
75         my $prep = $job->attr('Prep');
76         defined($prep) && $prep !~ /^\s*$/ or return 'OK';
77
78         my $jid = $job->id;
79         update_status($mach, $jid, 'PREP', $queue);
80         my $lf = $queue->log_file($mach, $jid);
81         $ENV{'HOST'} = BEX::Config::host_name($mach);
82         system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf";
83         delete $ENV{'HOST'};
84         if ($?) {
85                 return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?));
86         } else {
87                 return 'OK';
88         }
89 }
90
91 sub run_job_body($$$) {
92         my ($job, $queue, $mach) = @_;
93
94         if ($job->attr('body') =~ /^\s*$/s) {
95                 # Shortcut if the body is empty
96                 return 'OK'
97         }
98
99         my $host = BEX::Config::host_name($mach);
100         my $jid = $job->id;
101
102         my $tmp = $queue->temp_file($mach, $jid);
103         open T, '>', $tmp or die;
104         if (defined $BEX::Config::job_prolog) {
105                 open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
106                 while (<P>) { print T; }
107                 close P;
108         } else {
109                 print T "#!/bin/sh\n";
110         }
111         print T "# BEX job ", $jid, "\n";
112         print T $job->attr('body');
113         if (defined $BEX::Config::job_epilog) {
114                 open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
115                 while (<E>) { print T; }
116                 close E;
117         }
118         close T;
119
120         update_status($mach, $jid, 'SEND', undef);
121         my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
122         my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`;
123         !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
124         chomp $rtmp;
125
126         update_status($mach, $jid, 'RUN', $queue);
127         my $lf = $queue->log_file($mach, $jid);
128         system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
129         if ($?) {
130                 return ('FAILED', 'Job failed ' . exit_status($?));
131         } else {
132                 return 'OK';
133         }
134 }
135
136 sub run_job($$$) {
137         my ($job, $queue, $mach) = @_;
138         my ($stat, $msg);
139
140         ($stat, $msg) = ping_machine($mach);
141         $stat eq 'OK' or return ($stat, $msg);
142
143         ($stat, $msg) = run_job_prep($job, $queue, $mach);
144         $stat eq 'OK' or return ($stat, $msg);
145
146         return run_job_body($job, $queue, $mach);
147 }
148
149 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
150 my $queue = BEX::Queue->new($queue_name);
151
152 $queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
153
154 for my $mach (@machines) {
155         my @q = $queue->scan($mach) or next;
156         if (!$queue->lock($mach, undef)) {
157                 print "### Machine $mach is locked by another brun, skipping...\n";
158                 update_status($mach, '-', 'LOCKED', undef);
159                 update_status($mach, '-', 'DONE', undef);
160                 next;
161         }
162         update_status($mach, '-', 'INIT', undef);
163         while (my $jid = shift @q) {
164                 if (defined $given_job) {
165                         $jid eq $given_job or next;
166                 }
167                 my $job = BEX::Job->new_from_file($queue->job_file($jid));
168                 update_status($mach, $jid, 'INIT', undef);
169                 if (!$queue->lock($mach, $jid)) {
170                         print "### Skipping locked $jid on $mach ###\n";
171                         update_status($mach, $jid, 'LOCKED', undef);
172                         next;
173                 }
174                 print "### Running ", $job->name, " on $mach ###\n";
175                 my ($s, $msg) = run_job($job, $queue, $mach);
176                 update_status($mach, $jid, $s, $queue, $msg);
177
178                 if ($s eq 'OK') {
179                         print "+++ OK\n";
180                         $queue->remove($mach, $jid);
181                 } else {
182                         print "--- $s: $msg\n";
183                         if ($BEX::Config::skip_on_fail) {
184                                 print "### Skipping other jobs on the same host ###\n" if @q;
185                                 last;
186                         }
187                 }
188         }
189 } continue {
190         update_status($mach, '-', 'DONE', undef);
191 }
192 $queue->unlock;