2 # Batch EXecutor 2.0 -- Run Queued Jobs
3 # (c) 2011 Martin Mares <mj@ucw.cz>
17 "j|job=s" => \$given_job,
18 "q|queue=s" => \$queue_name,
19 "s|status-fifo=s" => \$status_fifo,
21 Usage: brun [<options>] [[!]<machine-or-class> ...]
24 -j, --job=<id> Run only the specified job
25 -q, --queue=<name> Select job queue
26 --status-fifo=<f> Send status updates to the given named pipe
30 if (defined $status_fifo) {
31 open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!";
32 autoflush $status_fd, 1;
35 sub update_status($$$$;$) {
36 my ($mach, $job, $status, $log_on_queue, $msg) = @_;
38 print $status_fd "! $mach $job $status\n";
41 $log_on_queue->log($mach, $job, $status, $msg);
47 return 1 unless $BEX::Config::ping_hosts;
48 update_status($mach, '-', 'PING', undef);
49 my $host = BEX::Config::host_name($mach);
50 `ping -c1 -n $host >/dev/null 2>/dev/null`;
55 my ($job, $queue, $mach) = @_;
56 my $jid = $job->{'ID'};
57 my $host = BEX::Config::host_name($mach);
59 my $tmp = $queue->temp_file($mach, $jid);
60 open T, '>', $tmp or die;
61 if (defined $BEX::Config::job_prolog) {
62 open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!");
63 while (<P>) { print T; }
66 print T "#!/bin/sh\n";
68 print T "# BEX job ", $jid, "\n";
69 print T $job->{'body'};
70 if (defined $BEX::Config::job_epilog) {
71 open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!");
72 while (<E>) { print T; }
77 update_status($mach, $jid, 'SEND', undef);
78 my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t';
79 my $rtmp = `ssh <$tmp $host '$cmd'`;
80 !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed');
83 update_status($mach, $jid, 'RUN', $queue);
84 my $lf = $queue->log_file($mach, $jid);
85 system 'bash', '-o', 'pipefail', '-c', "ssh -t $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf";
87 return ('FAILED', 'Job failed');
93 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
94 my $queue = BEX::Queue->new($queue_name);
96 $queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
98 for my $mach (@machines) {
99 my @q = $queue->scan($mach) or next;
100 if (!$queue->lock($mach, undef)) {
101 print "### Machine $mach is locked by another brun, skipping...\n";
102 update_status($mach, '-', 'LOCKED', undef);
103 update_status($mach, '-', 'DONE', undef);
106 update_status($mach, '-', 'INIT', undef);
109 if (defined $given_job) {
110 $jid eq $given_job or next;
112 my $job = BEX::Job->new_from_file($queue->job_file($jid));
113 if (!$queue->lock($mach, $jid)) {
114 print "### Skipping locked $jid on $mach ###\n";
115 update_status($mach, $jid, 'LOCKED', undef);
121 print "### Running $jid (", $job->attr('Subject'), ") on $mach ###\n";
122 $ping //= ping_machine($mach);
125 ($s, $msg) = ('NOPING', 'Does not ping');
127 ($s, $msg) = run_job($job, $queue, $mach);
130 $stat->{'Status'} = $s;
131 $stat->{'Message'} = $msg;
132 $queue->write_job_status($mach, $jid, $stat);
134 # Called after writing the status file, so that the front-end watching
135 # our status FIFO can see the new status file.
136 update_status($mach, $jid, $s, $queue, $msg);
140 $queue->remove($mach, $jid);
142 print "--- $s: $msg\n";
145 update_status($mach, '-', 'DONE', undef);