]> mj.ucw.cz Git - bex.git/blob - lib/bin/bex-prun
592bcbc5d7d7298d18d339ecdf0291d3604c6f21
[bex.git] / lib / bin / bex-prun
1 #!/usr/bin/perl
2 # Batch EXecutor -- Parallel Execution Using Screen
3 # (c) 2011-2013 Martin Mares <mj@ucw.cz>
4
5 use strict;
6 use warnings;
7 use feature 'switch';
8 use Getopt::Long;
9 use POSIX;
10 use BEX;
11
12 my $queue_name;
13 my $text_mode;
14 my $debug = 0;
15 my $debug_children;
16
17 sub usage() {
18         print <<AMEN ;
19 Usage: bex prun [<options>] [[!]<machine-or-class> ...]
20
21 Options:
22     --debug             Log status changes to stderr
23     --debug-children    Log stdout and stderr of child processes to ./debug.log
24 -p, --parallel=<n>      Set limit on the number of jobs run in parallel
25 -q, --queue=<name>      Run jobs in the given queue
26     --text              Use plain-text user interface instead of curses
27 AMEN
28         exit 0;
29 }
30
31 GetOptions(
32         "q|queue=s" => \$queue_name,
33         "text!" => \$text_mode,
34         "debug+" => \$debug,
35         "debug-children!" => \$debug_children,
36         "p|parallel=i" => \$BEX::Config::max_parallel_jobs,
37         "help" => \&usage,
38 ) or die "Try `bex prun --help' for more information.\n";
39
40 system 'tmux', 'has-session';
41 !$? or die "You need to start tmux first.\n";
42
43 my $queue = BEX::Queue->new($queue_name);
44 my $fifo_name = $queue->{'Path'} . '/status-fifo';
45 unlink $fifo_name;
46 mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!";
47 open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!";
48
49 my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new);
50
51 my @machines = ();
52 for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) {
53         my @jobs = $queue->scan($mach);
54         @jobs or next;
55         push @machines, $mach;
56         for (@jobs) { $ui->update($mach, $_, 'READY'); }
57 }
58
59 my %running = ();
60 my $max = $BEX::Config::max_parallel_jobs;
61
62 while (keys %running || @machines) {
63         if (@machines && keys %running < $max) {
64                 my $mach = shift @machines;
65                 $ui->update($mach, undef, 'START');
66                 my @tm = ('tmux', 'new-window', '-n', $mach, '-d');
67                 my $P5LIB = $ENV{"PERL5LIB"} // "";
68                 my @cmd = (
69                         "BEX_HOME='$BEX::Config::home'",
70                         "BEX_LIB='$BEX::Config::lib'",
71                         "PERL5LIB='$P5LIB'",
72                         "$BEX::Config::lib/bin/bex-run",
73                         "--status-fifo=$fifo_name",
74                         "--queue=" . $queue->{'Name'},
75                         $mach,
76                         );
77                 push @cmd, ">>debug.log", "2>&1" if $debug_children;
78                 push @tm, join(" ", @cmd);
79                 system @tm;
80                 !$? or $ui->update($mach, undef, 'INTERR');
81                 $running{$mach} = 'START';
82                 next;
83         }
84         $_ = <FIFO>;
85         chomp;
86         print STDERR "<< $_\n" if $debug;
87         my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/;
88         if (!defined $stat) {
89                 $ui->err("Received invalid status message <$_>");
90                 next;
91         }
92         if (!defined $running{$mach}) {
93                 $ui->err("Received status message <$_> for a machine which does not run")
94                         unless $stat eq 'DONE';
95                 next;
96         }
97         $running{$mach} = $stat;
98         $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat);
99         if ($stat eq 'DONE') {
100                 delete $running{$mach};
101         }
102 }
103
104 close FIFO;
105 unlink $fifo_name;
106 $ui->done;
107
108 package BEX::bprun::text;
109
110 sub new($) {
111         return bless {};
112 }
113
114 sub done($) {
115 }
116
117 sub update($$$$) {
118         my ($ui, $mach, $jid, $stat) = @_;
119         print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n";
120 }
121
122 sub err($$) {
123         my ($ui, $msg) = @_;
124         print STDERR "ERROR: $msg\n";
125 }
126
127 package BEX::bprun::curses;
128
129 use Curses;
130
131 my $C;
132
133 my $nrows;
134 my @by_row;
135 my %by_host;
136
137 my %host_state;
138 my %host_cnt;
139
140 my %job_state;
141 my %job_cnt;
142
143 my %host_last_fail_job;
144 my %host_last_fail_stat;
145
146 my @states;
147 my %state_to_pri;
148
149 BEGIN {
150         @by_row = ();
151         %by_host = ();
152         @states = qw(unknown ready running done failed);
153         %state_to_pri = (
154                 'unknown' => 0,
155                 'ready' => 1,
156                 'done' => 2,
157                 'failed' => 3,
158                 'running' => 4,
159         );
160 }
161
162 sub new($) {
163         $C = new Curses;
164         start_color;
165         has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n";
166         cbreak; noecho;
167         $C->intrflush(0);
168         $C->keypad(1);
169         $C->meta(1);
170         $C->clear;
171         init_pair(1, COLOR_YELLOW, COLOR_BLUE);
172         init_pair(2, COLOR_YELLOW, COLOR_RED);
173         init_pair(3, COLOR_YELLOW, COLOR_BLACK);
174         init_pair(4, COLOR_RED, COLOR_BLACK);
175         init_pair(5, COLOR_BLUE, COLOR_BLACK);
176
177         $nrows = $C->getmaxy - 2;
178         if ($BEX::Config::max_parallel_jobs > $nrows) {
179                 $BEX::Config::max_parallel_jobs = $nrows;
180         }
181
182         %host_state = %host_cnt = ();
183         %job_state = %job_cnt = ();
184         for my $s (@states) {
185                 $host_cnt{$s} = 0;
186                 $job_cnt{'*'}{$s} = 0;
187         }
188
189         my $ui = bless {};
190         $ui->refresh_status;
191         return $ui;
192 }
193
194 sub done($)
195 {
196         $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
197         $C->addstr($C->getmaxy-1, 0, "Press any key to quit...");
198         $C->clrtoeol;
199         $C->getch;
200         endwin;
201 }
202
203 sub err($$) {
204         my ($ui, $msg) = @_;
205         $C->bkgdset(COLOR_PAIR(2) | A_BOLD);
206         $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx);
207         $C->clrtoeol;
208         $C->refresh;
209 }
210
211 sub set_host_status($$$) {
212         my ($ui, $mach, $stat) = @_;
213         print STDERR "H: $mach $stat\n" if $debug;
214         my $prev_stat = $host_state{$mach};
215         if (defined $prev_stat) {
216                 $host_cnt{$prev_stat}--;
217         } else {
218                 for my $s (@states) { $job_cnt{$mach}{$s} = 0; }
219         }
220         $host_state{$mach} = $stat;
221         $host_cnt{$stat}++;
222 }
223
224 sub set_job_status($$$$) {
225         my ($ui, $mach, $jid, $stat) = @_;
226         print STDERR "J: $mach $jid $stat\n" if $debug;
227         my $prev_stat = $job_state{$mach}{$jid} // 'unknown';
228         $job_cnt{$mach}{$prev_stat}--;
229         $job_cnt{'*'}{$prev_stat}--;
230         $job_state{$mach}{$jid} = $stat;
231         $job_cnt{$mach}{$stat}++;
232         $job_cnt{'*'}{$stat}++;
233 }
234
235 sub refresh_status($) {
236         $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
237         $C->addnstr(0, 0,
238                 sprintf("BEX  Hosts: %dR %dD %dE %dW  Jobs: %dR %dD %dE %dW",
239                         $host_cnt{'running'},
240                         $host_cnt{'done'},
241                         $host_cnt{'failed'},
242                         $host_cnt{'ready'},
243                         $job_cnt{'*'}{'running'},
244                         $job_cnt{'*'}{'done'},
245                         $job_cnt{'*'}{'failed'},
246                         $job_cnt{'*'}{'ready'},
247                 ), $C->getmaxx);
248         $C->clrtoeol;
249         $C->refresh;
250 }
251
252 sub get_slot($) {
253         my ($mach) = @_;
254         my $s = $by_host{$mach};
255         if (!defined $s) {
256                 $s = $by_host{$mach} = { 'Host' => $mach };
257         }
258         return $s;
259 }
260
261 my $place_counter;
262
263 sub place_slot($) {
264         my ($s) = @_;
265         $s->{'LastUpdate'} = $place_counter++;
266         return $s if defined $s->{'Row'};
267
268         my $pri = $state_to_pri{$host_state{$s->{'Host'}}};
269         my ($best, $besti);
270         my $bestpri = 99;
271         for my $i (0..$nrows-1) {
272                 my $r = $by_row[$i];
273                 if (!defined $r) {
274                         $besti = $i;
275                         $best = undef;
276                         last;
277                 }
278                 my $rpri = $state_to_pri{$host_state{$r->{'Host'}}};
279                 print STDERR "I: ... considering ", $r->{'Host'}, " (pri $rpri, lu ", $r->{'LastUpdate'}, ")\n" if $debug > 1;
280                 next if $rpri > $pri;
281
282                 if ($rpri < $bestpri ||
283                     $rpri == $bestpri && $r->{'LastUpdate'} < $best->{'LastUpdate'}) {
284                         # Trick: $best must be defined, as otherwise $bestpri == 99
285                         $best = $r;
286                         $besti = $i;
287                         $bestpri = $rpri;
288                 }
289         }
290
291         if (defined $besti) {
292                 if ($best) {
293                         print STDERR "I: Replacing ", $best->{'Host'}, " (pri $bestpri)\n" if $debug;
294                         delete $best->{'Row'};
295                 }
296                 print STDERR "I: Allocated ", $s->{'Host'}, " \@$besti (pri $pri)\n" if $debug;
297                 $s->{'Row'} = $besti;
298                 $by_row[$besti] = $s;
299         } else {
300                 print STDERR "I: No place for ", $s->{'Host'}, " (pri $pri)\n" if $debug;
301         }
302 }
303
304 sub redraw_slot($) {
305         my ($s) = @_;
306         my $r = $s->{'Row'} // return;
307         $r++;
308         my $mach = $s->{'Host'};
309         my $stat = $s->{'Status'} // "?";
310         my $jid = $s->{'Job'} // "";
311         my $jname = ($jid eq "" ? "" : $queue->job_name($jid));
312         my $jcnt = $job_cnt{$mach};
313         if ($jcnt->{'running'}) {
314                 if ($jcnt->{'failed'}) {
315                         $C->bkgdset(COLOR_PAIR(4) | A_BOLD);
316                 } else {
317                         $C->bkgdset(COLOR_PAIR(3) | A_BOLD);
318                 }
319         } else {
320                 if ($jcnt->{'failed'}) {
321                         $C->bkgdset(COLOR_PAIR(4));
322                 } else {
323                         $C->bkgdset(0);
324                 }
325         }
326         $C->addstr($r, 0, sprintf("%-20.20s", $mach));
327         if ($jcnt->{'failed'}) {
328                 $C->bkgdset(COLOR_PAIR(4));
329                 $C->addstr(sprintf("%3dE ", $jcnt->{'failed'}));
330         } else {
331                 $C->bkgdset(0);
332                 $C->addstr("     ");
333         }
334         $C->bkgdset(0);
335         $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'}));
336         if ($stat eq 'DONE') {
337                 my $lfs = $host_last_fail_stat{$mach};
338                 my $lfj = $host_last_fail_job{$mach};
339                 if (defined $lfs) {
340                         $C->bkgdset(($lfs eq 'NOPING') ? COLOR_PAIR(5) : COLOR_PAIR(4));
341                         $C->addstr(sprintf("  %-8s %s", $lfs, $lfj ? $queue->job_name($lfj) : ""));
342                 }
343         } else {
344                 my $text = sprintf("  %-8s %s", $stat, $jname);
345                 $C->addstr($text);
346         }
347         $C->clrtoeol;
348         $C->refresh;
349 }
350
351 sub update($$$$) {
352         my ($ui, $mach, $jid, $stat) = @_;
353         given ($stat) {
354                 when ('READY') {
355                         # Pseudo-state generated internally
356                         $ui->set_host_status($mach, 'ready');
357                         $ui->set_job_status($mach, $jid, 'ready');
358                 }
359                 when ('OK') {
360                         $ui->set_job_status($mach, $jid, 'done');
361                 }
362                 when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL', 'NOXFER']) {
363                         $ui->set_job_status($mach, $jid, 'failed');
364                         $host_last_fail_job{$mach} = $jid;
365                         $host_last_fail_stat{$mach} = $stat;
366                 }
367                 when ('DONE') {
368                         if ($job_cnt{$mach}{'failed'}) {
369                                 $ui->set_host_status($mach, 'failed');
370                         } else {
371                                 $ui->set_host_status($mach, 'done');
372                         }
373                 }
374                 when ('INIT') {
375                         $ui->set_host_status($mach, 'running');
376                         $ui->set_job_status($mach, $jid, 'running') if defined $jid;
377                 }
378                 when ('LOCKED') {
379                         if (defined $jid) {
380                                 $ui->set_job_status($mach, $jid, 'failed');
381                         } else {
382                                 for my $j (keys %{$job_state{$mach}}) {
383                                         $ui->set_job_status($mach, $j, 'failed');
384                                 }
385                                 $ui->set_host_status($mach, 'failed');
386                                 $host_last_fail_job{$mach} = $jid;
387                                 $host_last_fail_stat{$mach} = $stat;
388                         }
389                 }
390                 when (['START', 'PING', 'PREP', 'SEND', 'RUN']) {
391                 }
392                 default {
393                         $ui->err("Received unknown job status $stat");
394                 }
395         }
396         my $s = get_slot($mach);
397         $s->{'Job'} = $jid;
398         $s->{'Status'} = $stat;
399         place_slot($s);
400         redraw_slot($s);
401         $ui->refresh_status;
402 }