]> mj.ucw.cz Git - bex.git/blob - lib/bin/bex-prun
use experimental 'smartmatch' for new Perls
[bex.git] / lib / bin / bex-prun
1 #!/usr/bin/perl
2 # Batch EXecutor -- Parallel Execution Using Screen
3 # (c) 2011-2013 Martin Mares <mj@ucw.cz>
4
5 use strict;
6 use warnings;
7 use feature 'switch';
8 use experimental 'smartmatch';
9 use Getopt::Long;
10 use POSIX;
11 use BEX;
12
13 my $queue_name;
14 my $text_mode;
15 my $debug = 0;
16 my $debug_children;
17
18 sub usage() {
19         print <<AMEN ;
20 Usage: bex prun [<options>] [[!]<machine-or-class> ...]
21
22 Options:
23     --debug             Log status changes to stderr
24     --debug-children    Log stdout and stderr of child processes to ./debug.log
25 -p, --parallel=<n>      Set limit on the number of jobs run in parallel
26 -q, --queue=<name>      Run jobs in the given queue
27     --text              Use plain-text user interface instead of curses
28 AMEN
29         exit 0;
30 }
31
32 GetOptions(
33         "q|queue=s" => \$queue_name,
34         "text!" => \$text_mode,
35         "debug+" => \$debug,
36         "debug-children!" => \$debug_children,
37         "p|parallel=i" => \$BEX::Config::max_parallel_jobs,
38         "help" => \&usage,
39 ) or die "Try `bex prun --help' for more information.\n";
40
41 system 'tmux', 'has-session';
42 !$? or die "You need to start tmux first.\n";
43
44 my $queue = BEX::Queue->new($queue_name);
45 my $fifo_name = $queue->{'Path'} . '/status-fifo';
46 unlink $fifo_name;
47 mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!";
48 open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!";
49
50 my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new);
51
52 my @machines = ();
53 for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) {
54         my @jobs = $queue->scan($mach);
55         @jobs or next;
56         push @machines, $mach;
57         for (@jobs) { $ui->update($mach, $_, 'READY'); }
58 }
59
60 my %running = ();
61 my $max = $BEX::Config::max_parallel_jobs;
62
63 while (keys %running || @machines) {
64         if (@machines && keys %running < $max) {
65                 my $mach = shift @machines;
66                 $ui->update($mach, undef, 'START');
67                 my @tm = ('tmux', 'new-window', '-n', $mach, '-d');
68                 my $P5LIB = $ENV{"PERL5LIB"} // "";
69                 my @cmd = (
70                         "BEX_HOME='$BEX::Config::home'",
71                         "BEX_LIB='$BEX::Config::lib'",
72                         "PERL5LIB='$P5LIB'",
73                         "$BEX::Config::lib/bin/bex-run",
74                         "--status-fifo=$fifo_name",
75                         "--queue=" . $queue->{'Name'},
76                         $mach,
77                         );
78                 push @cmd, ">>debug.log", "2>&1" if $debug_children;
79                 push @tm, join(" ", @cmd);
80                 system @tm;
81                 !$? or $ui->update($mach, undef, 'INTERR');
82                 $running{$mach} = 'START';
83                 next;
84         }
85         $_ = <FIFO>;
86         chomp;
87         print STDERR "<< $_\n" if $debug;
88         my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/;
89         if (!defined $stat) {
90                 $ui->err("Received invalid status message <$_>");
91                 next;
92         }
93         if (!defined $running{$mach}) {
94                 $ui->err("Received status message <$_> for a machine which does not run")
95                         unless $stat eq 'DONE';
96                 next;
97         }
98         $running{$mach} = $stat;
99         $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat);
100         if ($stat eq 'DONE') {
101                 delete $running{$mach};
102         }
103 }
104
105 close FIFO;
106 unlink $fifo_name;
107 $ui->done;
108
109 package BEX::bprun::text;
110
111 sub new($) {
112         return bless {};
113 }
114
115 sub done($) {
116 }
117
118 sub update($$$$) {
119         my ($ui, $mach, $jid, $stat) = @_;
120         print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n";
121 }
122
123 sub err($$) {
124         my ($ui, $msg) = @_;
125         print STDERR "ERROR: $msg\n";
126 }
127
128 package BEX::bprun::curses;
129
130 use Curses;
131
132 my $C;
133
134 my $nrows;
135 my @by_row;
136 my %by_host;
137
138 my %host_state;
139 my %host_cnt;
140
141 my %job_state;
142 my %job_cnt;
143
144 my %host_last_fail_job;
145 my %host_last_fail_stat;
146
147 my @states;
148 my %state_to_pri;
149
150 BEGIN {
151         @by_row = ();
152         %by_host = ();
153         @states = qw(unknown ready running done failed);
154         %state_to_pri = (
155                 'unknown' => 0,
156                 'ready' => 1,
157                 'done' => 2,
158                 'failed' => 3,
159                 'running' => 4,
160         );
161 }
162
163 sub new($) {
164         $C = new Curses;
165         start_color;
166         has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n";
167         cbreak; noecho;
168         $C->intrflush(0);
169         $C->keypad(1);
170         $C->meta(1);
171         $C->clear;
172         init_pair(1, COLOR_YELLOW, COLOR_BLUE);
173         init_pair(2, COLOR_YELLOW, COLOR_RED);
174         init_pair(3, COLOR_YELLOW, COLOR_BLACK);
175         init_pair(4, COLOR_RED, COLOR_BLACK);
176         init_pair(5, COLOR_BLUE, COLOR_BLACK);
177
178         $nrows = $C->getmaxy - 2;
179         if ($BEX::Config::max_parallel_jobs > $nrows) {
180                 $BEX::Config::max_parallel_jobs = $nrows;
181         }
182
183         %host_state = %host_cnt = ();
184         %job_state = %job_cnt = ();
185         for my $s (@states) {
186                 $host_cnt{$s} = 0;
187                 $job_cnt{'*'}{$s} = 0;
188         }
189
190         my $ui = bless {};
191         $ui->refresh_status;
192         return $ui;
193 }
194
195 sub done($)
196 {
197         $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
198         $C->addstr($C->getmaxy-1, 0, "Press any key to quit...");
199         $C->clrtoeol;
200         $C->getch;
201         endwin;
202 }
203
204 sub err($$) {
205         my ($ui, $msg) = @_;
206         $C->bkgdset(COLOR_PAIR(2) | A_BOLD);
207         $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx);
208         $C->clrtoeol;
209         $C->refresh;
210 }
211
212 sub set_host_status($$$) {
213         my ($ui, $mach, $stat) = @_;
214         print STDERR "H: $mach $stat\n" if $debug;
215         my $prev_stat = $host_state{$mach};
216         if (defined $prev_stat) {
217                 $host_cnt{$prev_stat}--;
218         } else {
219                 for my $s (@states) { $job_cnt{$mach}{$s} = 0; }
220         }
221         $host_state{$mach} = $stat;
222         $host_cnt{$stat}++;
223 }
224
225 sub set_job_status($$$$) {
226         my ($ui, $mach, $jid, $stat) = @_;
227         print STDERR "J: $mach $jid $stat\n" if $debug;
228         my $prev_stat = $job_state{$mach}{$jid} // 'unknown';
229         $job_cnt{$mach}{$prev_stat}--;
230         $job_cnt{'*'}{$prev_stat}--;
231         $job_state{$mach}{$jid} = $stat;
232         $job_cnt{$mach}{$stat}++;
233         $job_cnt{'*'}{$stat}++;
234 }
235
236 sub refresh_status($) {
237         $C->bkgdset(COLOR_PAIR(1) | A_BOLD);
238         $C->addnstr(0, 0,
239                 sprintf("BEX  Hosts: %dR %dD %dE %dW  Jobs: %dR %dD %dE %dW",
240                         $host_cnt{'running'},
241                         $host_cnt{'done'},
242                         $host_cnt{'failed'},
243                         $host_cnt{'ready'},
244                         $job_cnt{'*'}{'running'},
245                         $job_cnt{'*'}{'done'},
246                         $job_cnt{'*'}{'failed'},
247                         $job_cnt{'*'}{'ready'},
248                 ), $C->getmaxx);
249         $C->clrtoeol;
250         $C->refresh;
251 }
252
253 sub get_slot($) {
254         my ($mach) = @_;
255         my $s = $by_host{$mach};
256         if (!defined $s) {
257                 $s = $by_host{$mach} = { 'Host' => $mach };
258         }
259         return $s;
260 }
261
262 my $place_counter;
263
264 sub place_slot($) {
265         my ($s) = @_;
266         $s->{'LastUpdate'} = $place_counter++;
267         return $s if defined $s->{'Row'};
268
269         my $pri = $state_to_pri{$host_state{$s->{'Host'}}};
270         my ($best, $besti);
271         my $bestpri = 99;
272         for my $i (0..$nrows-1) {
273                 my $r = $by_row[$i];
274                 if (!defined $r) {
275                         $besti = $i;
276                         $best = undef;
277                         last;
278                 }
279                 my $rpri = $state_to_pri{$host_state{$r->{'Host'}}};
280                 print STDERR "I: ... considering ", $r->{'Host'}, " (pri $rpri, lu ", $r->{'LastUpdate'}, ")\n" if $debug > 1;
281                 next if $rpri > $pri;
282
283                 if ($rpri < $bestpri ||
284                     $rpri == $bestpri && $r->{'LastUpdate'} < $best->{'LastUpdate'}) {
285                         # Trick: $best must be defined, as otherwise $bestpri == 99
286                         $best = $r;
287                         $besti = $i;
288                         $bestpri = $rpri;
289                 }
290         }
291
292         if (defined $besti) {
293                 if ($best) {
294                         print STDERR "I: Replacing ", $best->{'Host'}, " (pri $bestpri)\n" if $debug;
295                         delete $best->{'Row'};
296                 }
297                 print STDERR "I: Allocated ", $s->{'Host'}, " \@$besti (pri $pri)\n" if $debug;
298                 $s->{'Row'} = $besti;
299                 $by_row[$besti] = $s;
300         } else {
301                 print STDERR "I: No place for ", $s->{'Host'}, " (pri $pri)\n" if $debug;
302         }
303 }
304
305 sub redraw_slot($) {
306         my ($s) = @_;
307         my $r = $s->{'Row'} // return;
308         $r++;
309         my $mach = $s->{'Host'};
310         my $stat = $s->{'Status'} // "?";
311         my $jid = $s->{'Job'} // "";
312         my $jname = ($jid eq "" ? "" : $queue->job_name($jid));
313         my $jcnt = $job_cnt{$mach};
314         if ($jcnt->{'running'}) {
315                 if ($jcnt->{'failed'}) {
316                         $C->bkgdset(COLOR_PAIR(4) | A_BOLD);
317                 } else {
318                         $C->bkgdset(COLOR_PAIR(3) | A_BOLD);
319                 }
320         } else {
321                 if ($jcnt->{'failed'}) {
322                         $C->bkgdset(COLOR_PAIR(4));
323                 } else {
324                         $C->bkgdset(0);
325                 }
326         }
327         $C->addstr($r, 0, sprintf("%-20.20s", $mach));
328         if ($jcnt->{'failed'}) {
329                 $C->bkgdset(COLOR_PAIR(4));
330                 $C->addstr(sprintf("%3dE ", $jcnt->{'failed'}));
331         } else {
332                 $C->bkgdset(0);
333                 $C->addstr("     ");
334         }
335         $C->bkgdset(0);
336         $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'}));
337         if ($stat eq 'DONE') {
338                 my $lfs = $host_last_fail_stat{$mach};
339                 my $lfj = $host_last_fail_job{$mach};
340                 if (defined $lfs) {
341                         $C->bkgdset(($lfs eq 'NOPING') ? COLOR_PAIR(5) : COLOR_PAIR(4));
342                         $C->addstr(sprintf("  %-8s %s", $lfs, $lfj ? $queue->job_name($lfj) : ""));
343                 }
344         } else {
345                 my $text = sprintf("  %-8s %s", $stat, $jname);
346                 $C->addstr($text);
347         }
348         $C->clrtoeol;
349         $C->refresh;
350 }
351
352 sub update($$$$) {
353         my ($ui, $mach, $jid, $stat) = @_;
354         given ($stat) {
355                 when ('READY') {
356                         # Pseudo-state generated internally
357                         $ui->set_host_status($mach, 'ready');
358                         $ui->set_job_status($mach, $jid, 'ready');
359                 }
360                 when ('OK') {
361                         $ui->set_job_status($mach, $jid, 'done');
362                 }
363                 when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL', 'NOXFER']) {
364                         $ui->set_job_status($mach, $jid, 'failed');
365                         $host_last_fail_job{$mach} = $jid;
366                         $host_last_fail_stat{$mach} = $stat;
367                 }
368                 when ('DONE') {
369                         if ($job_cnt{$mach}{'failed'}) {
370                                 $ui->set_host_status($mach, 'failed');
371                         } else {
372                                 $ui->set_host_status($mach, 'done');
373                         }
374                 }
375                 when ('INIT') {
376                         $ui->set_host_status($mach, 'running');
377                         $ui->set_job_status($mach, $jid, 'running') if defined $jid;
378                 }
379                 when ('LOCKED') {
380                         if (defined $jid) {
381                                 $ui->set_job_status($mach, $jid, 'failed');
382                         } else {
383                                 for my $j (keys %{$job_state{$mach}}) {
384                                         $ui->set_job_status($mach, $j, 'failed');
385                                 }
386                                 $ui->set_host_status($mach, 'failed');
387                                 $host_last_fail_job{$mach} = $jid;
388                                 $host_last_fail_stat{$mach} = $stat;
389                         }
390                 }
391                 when (['START', 'PING', 'PREP', 'SEND', 'RUN']) {
392                 }
393                 default {
394                         $ui->err("Received unknown job status $stat");
395                 }
396         }
397         my $s = get_slot($mach);
398         $s->{'Job'} = $jid;
399         $s->{'Status'} = $stat;
400         place_slot($s);
401         redraw_slot($s);
402         $ui->refresh_status;
403 }