From: Martin Mares Date: Wed, 15 Feb 2012 20:22:16 +0000 (+0100) Subject: The big move -- introduced subcommands X-Git-Tag: v3.0~23 X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=d8250bd26578871e624d5e513ff7919f37357fe2;p=bex.git The big move -- introduced subcommands --- diff --git a/BEX.cf b/BEX.cf deleted file mode 100644 index 09c1b1b..0000000 --- a/BEX.cf +++ /dev/null @@ -1,12 +0,0 @@ -# Configuration of the Batch EXecutor -# This is a Perl script, which can set anything in the BEX::Config package - -package BEX::Config; - -%machines = ( - 'albireo' => { 'Host' => 'albireo.burrow.ucw.cz' }, - 'localhost' => {}, - 'home' => ['albireo', 'localhost'], -); - -42; diff --git a/BEX/config b/BEX/config new file mode 100644 index 0000000..09c1b1b --- /dev/null +++ b/BEX/config @@ -0,0 +1,12 @@ +# Configuration of the Batch EXecutor +# This is a Perl script, which can set anything in the BEX::Config package + +package BEX::Config; + +%machines = ( + 'albireo' => { 'Host' => 'albireo.burrow.ucw.cz' }, + 'localhost' => {}, + 'home' => ['albireo', 'localhost'], +); + +42; diff --git a/BEX/epilog b/BEX/epilog new file mode 100644 index 0000000..bc2da7a --- /dev/null +++ b/BEX/epilog @@ -0,0 +1 @@ +# BEX epilog diff --git a/BEX/prolog b/BEX/prolog new file mode 100644 index 0000000..3350a91 --- /dev/null +++ b/BEX/prolog @@ -0,0 +1,3 @@ +#!/bin/sh +# BEX prolog +set -e diff --git a/benq b/benq deleted file mode 100755 index 1f92e94..0000000 --- a/benq +++ /dev/null @@ -1,102 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 2.0 -- Insert to Queue -# (c) 2011 Martin Mares - -use strict; -use warnings; -use Getopt::Long; -use File::stat; - -use lib 'lib'; -use BEX; - -my $given_body; -my $given_go; -my $given_id; -my $queue_name; -my $requeue_id; -my $given_subject; -my $given_template; - -GetOptions( - "b|body=s" => \$given_body, - "g|go!" => \$given_go, - "i|id=s" => \$given_id, - "q|queue=s" => \$queue_name, - "r|requeue=s" => \$requeue_id, - "s|subject=s" => \$given_subject, - "t|template=s" => \$given_template, -) or die <] [!] ... - -Options: --b, --body= Load job body from the given file --g, --go Do not run editor, go enqueue the job immediately --i, --id= Set job ID of the new job --q, --queue= Insert new jobs to the given queue --r, --requeue= Re-queue an existing job instead of creating a new one --s, --subject= Set subject of the new job --t, --template= Load job template (headers and body) from the given file -AMEN - -# Prepare machine set -@ARGV or die "No machines specified\n"; -my @machines = BEX::Config::parse_machine_list(@ARGV); -@machines or die "No machines match\n"; - -my $queue = BEX::Queue->new($queue_name); -my $job; -my $tmp_fn; - -if (defined $requeue_id) { - # When requeueing, just fetch the existing job - if (defined($given_body) || defined($given_id) || defined($given_subject) || defined($given_template)) { - die "Parameters of a requeued job cannot be changed\n"; - } - my $fn = $queue->job_file($requeue_id); - -f $fn or die "Job $requeue_id not known\n"; - $job = BEX::Job->new_from_file($fn); -} else { - # Create job template - if (defined $given_template) { - $job = BEX::Job->new_from_file($given_template); - } else { - $job = BEX::Job->new; - } - $job->attr('ID', $given_id) if defined $given_id; - $job->attr('Subject', $given_subject) if defined $given_subject; - if (defined $given_body) { - open B, '<', $given_body or die "Cannot open $given_body: $!\n"; - local $/; - $job->attr('body', ); - close B; - } - - # Let the user edit the template - if (!$given_go) { - $tmp_fn = $job->save; - my $orig_stat = stat($tmp_fn) or die; - system "editor", $tmp_fn and die "Editor exited with an error, file kept as $tmp_fn\n"; - my $new_stat = stat($tmp_fn) or die "File $tmp_fn disappeared under my hands: $!\n"; - if ($new_stat->mtime <= $orig_stat->mtime && $new_stat->size == $orig_stat->size) { - unlink $tmp_fn; - die "Cancelled\n"; - } - $job = BEX::Job->new_from_file($tmp_fn); - } -} - -# Put the job to the queue -print "New job ", $job->id, "\n"; -for my $m (@machines) { - if ($queue->enqueue($m, $job)) { - $queue->update_job_status($m, $job->id, 'NEW'); - print "\t$m\n"; - } else { - $queue->log($m, $job->id, 'REQUEUE'); - print "\t$m (already queued)\n"; - } -} - -# Remove the temporary file if there's any -unlink $tmp_fn if defined $tmp_fn; diff --git a/bex b/bex new file mode 100755 index 0000000..ac89512 --- /dev/null +++ b/bex @@ -0,0 +1,51 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Master Program +# (c) 2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; + +my $bex_home = $ENV{"BEX_HOME"} // "."; +my $bex_lib = $ENV{"BEX_LIB"} // "lib"; + +Getopt::Long::Configure('require_order'); +GetOptions( + "home=s" => \$bex_home, + "lib=s" => \$bex_lib, + "help" => sub { + print "Usage: brum\n"; + exit 0; + }, + "version" => sub { + print "BEX 3.0 (c) 2011-2012 Martin Mares \n"; + }, +) or die "Try `bex --help' for more information.\n"; +Getopt::Long::Configure('default'); + +if (!-d $bex_home) { + die "BEX home directory $bex_home does not exist.\n"; +} +if (!-d "$bex_home/BEX") { + die "BEX home directory $bex_home does not contain the BEX subdirectory.\n"; +} + +@ARGV or die "Missing subcommand.\n"; +my $sub = shift @ARGV; +$sub =~ /^[0-9a-zA-Z]+$/ or die "Invalid subcommand $sub\n"; + +my %aliases = ( + 'a' => 'add', + 'q' => 'queue', + 'r' => 'run', +); +if (defined $aliases{$sub}) { $sub = $aliases{$sub}; } + +my $sub_path = "$bex_lib/bin/$sub"; +-x $sub_path or die "Unknown subcommand $sub\n"; + +$ENV{"BEX_HOME"} = $bex_home; +$ENV{"BEX_LIB"} = $bex_lib; +$ENV{"PERL5LIB"} = join(":", $bex_lib . "/perl", $ENV{"PERL5LIB"} // ()); +exec $sub_path, @ARGV; +die "Cannot execute $sub_path: $!\n"; diff --git a/bjob b/bjob deleted file mode 100755 index 0d6f468..0000000 --- a/bjob +++ /dev/null @@ -1,34 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 2.0 -- Operations on a Job -# (c) 2011 Martin Mares - -use strict; -use warnings; -use Getopt::Long; - -use lib 'lib'; -use BEX; - -my $edit; -my $queue_name; - -GetOptions( - "e|edit!" => \$edit, - "q|queue=s" => \$queue_name, -) && @ARGV == 1 or die <] - -Options: --e, --edit Run editor on the given job (no locking) --q, --queue= Act on the given queue -AMEN - -my $queue = BEX::Queue->new($queue_name); -my $fn = $queue->job_file($ARGV[0]); --f $fn or die "No such job " . $ARGV[0] . "\n"; - -if ($edit) { - system "editor", $fn; -} else { - system "cat", $fn; -} diff --git a/bls b/bls deleted file mode 100755 index be558d3..0000000 --- a/bls +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 2.0 -- List Machines and Groups -# (c) 2011 Martin Mares - -use strict; -use warnings; -use Getopt::Long; - -use lib 'lib'; -use BEX; - -my $edit; -my $queue_name; - -GetOptions( -) && @ARGV == 0 or die <] - -Options: -None defined so far. -AMEN - -my $machines = \%BEX::Config::machines; - -print "# Hosts:\n"; -for my $h (sort keys %$machines) { - my $m = $machines->{$h}; - ref $m eq 'HASH' or next; - print "$h\n"; -} - -print "\n# Groups:\n"; -for my $h (sort keys %$machines) { - my $m = $machines->{$h}; - ref $m eq 'ARRAY' or next; - print "$h = ", join(" ", - map { - my $x = $machines->{$_}; - !defined($x) ? "$_?" : - ref $x eq 'HASH' ? $_ : - ref $x eq 'ARRAY' ? "\@$_" : - "$_???" - } @$m), "\n"; -} diff --git a/bprun b/bprun deleted file mode 100755 index 7a66f0b..0000000 --- a/bprun +++ /dev/null @@ -1,344 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 2.0 -- Parallel Execution Using Screen -# (c) 2011 Martin Mares - -use strict; -use warnings; -use feature 'switch'; - -use Getopt::Long; -use POSIX; - -use lib 'lib'; -use BEX; - -my $queue_name; -my $screen_session = 'BEX'; -my $text_mode; - -GetOptions( - "q|queue=s" => \$queue_name, - "session=s" => \$screen_session, - "text!" => \$text_mode, -) or die <] [[!] ...] - -Options: --q, --queue= Run jobs in the given queue - --session= Job windows should be opened within the given screen - session (default: BEX) - --text Use textual user interface instead of curses -AMEN - -system 'screen', '-S', $screen_session, '-X', 'select', '.'; -!$? or die "Screen session $screen_session not found\n"; - -my $queue = BEX::Queue->new($queue_name); -my $fifo_name = $queue->{'Name'} . '/status-fifo'; -unlink $fifo_name; -mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!"; -open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!"; - -my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new); - -my @machines = (); -for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) { - my @jobs = $queue->scan($mach); - @jobs or next; - push @machines, $mach; - for (@jobs) { $ui->update($mach, $_, 'READY'); } -} - -my %running = (); -my $max = $BEX::Config::max_parallel_jobs; - -while (keys %running || @machines) { - if (@machines && keys %running < $max) { - my $mach = shift @machines; - $ui->update($mach, undef, 'START'); - my @scr = ('screen', '-t', $mach); - push @scr, '-S', $screen_session if defined $screen_session; - push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach; - system @scr; - !$? or $ui->update($mach, undef, 'INTERR'); - $running{$mach} = 'START'; - next; - } - $_ = ; - chomp; - my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/; - if (!defined $stat) { - $ui->err("Received invalid status message <$_>"); - next; - } - if (!defined $running{$mach}) { - $ui->err("Received status message <$_> for a machine which does not run"); - next; - } - $running{$mach} = $stat; - $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat); - if ($stat eq 'DONE') { - delete $running{$mach}; - } -} - -close FIFO; -unlink $fifo_name; -$ui->done; - -package BEX::bprun::text; - -sub new($) { - return bless {}; -} - -sub done($) { -} - -sub update($$$$) { - my ($ui, $mach, $jid, $stat) = @_; - print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n"; -} - -sub err($$) { - my ($ui, $msg) = @_; - print STDERR "ERROR: $msg\n"; -} - -package BEX::bprun::curses; - -use Curses; - -my $C; - -my $nrows; -my @by_row = (); -my %by_host = (); - -my %host_state; -my %host_cnt; - -my %job_state; -my %job_cnt; - -my %host_last_fail_job; -my %host_last_fail_stat; - -sub new($) { - $C = new Curses; - start_color; - has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n"; - cbreak; noecho; - $C->intrflush(0); - $C->keypad(1); - $C->meta(1); - $C->clear; - init_pair(1, COLOR_YELLOW, COLOR_BLUE); - init_pair(2, COLOR_YELLOW, COLOR_RED); - init_pair(3, COLOR_YELLOW, COLOR_BLACK); - init_pair(4, COLOR_RED, COLOR_BLACK); - - $nrows = $C->getmaxy - 2; - if ($BEX::Config::max_parallel_jobs > $nrows) { - $BEX::Config::max_parallel_jobs = $nrows; - } - - %host_state = %host_cnt = (); - %job_state = %job_cnt = (); - for my $s ('unknown', 'ready', 'running', 'done', 'failed') { - $host_cnt{$s} = 0; - $job_cnt{'*'}{$s} = 0; - } - - my $ui = bless {}; - $ui->refresh_status; - return $ui; -} - -sub done($) -{ - $C->bkgdset(COLOR_PAIR(1) | A_BOLD); - $C->addstr($C->getmaxy-1, 0, "Press any key to quit..."); - $C->clrtoeol; - $C->getch; - endwin; -} - -sub err($$) { - my ($ui, $msg) = @_; - $C->bkgdset(COLOR_PAIR(2) | A_BOLD); - $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx); - $C->clrtoeol; - $C->refresh; -} - -sub set_host_status($$$) { - my ($ui, $mach, $stat) = @_; - my $prev_stat = $host_state{$mach}; - if (defined $prev_stat) { - $host_cnt{$prev_stat}--; - } else { - for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; } - } - $host_state{$mach} = $stat; - $host_cnt{$stat}++; -} - -sub set_job_status($$$$) { - my ($ui, $mach, $jid, $stat) = @_; - my $prev_stat = $job_state{$mach}{$jid} // 'unknown'; - $job_cnt{$mach}{$prev_stat}--; - $job_cnt{'*'}{$prev_stat}--; - $job_state{$mach}{$jid} = $stat; - $job_cnt{$mach}{$stat}++; - $job_cnt{'*'}{$stat}++; -} - -sub refresh_status($) { - $C->bkgdset(COLOR_PAIR(1) | A_BOLD); - $C->addnstr(0, 0, - sprintf("BEX Hosts: %dR %dD %dE %dW Jobs: %dR %dD %dE %dW", - $host_cnt{'running'}, - $host_cnt{'done'}, - $host_cnt{'failed'}, - $host_cnt{'ready'}, - $job_cnt{'*'}{'running'}, - $job_cnt{'*'}{'done'}, - $job_cnt{'*'}{'failed'}, - $job_cnt{'*'}{'ready'}, - ), $C->getmaxx); - $C->clrtoeol; - $C->refresh; -} - -sub get_slot($) { - my ($mach) = @_; - my $s; - if (defined ($s = $by_host{$mach})) { - delete $s->{'Gone'}; - } else { - my ($best, $besti); - for my $i (0..$nrows-1) { - my $r = $by_row[$i]; - if (!defined $r) { - $besti = $i; - $best = undef; - last; - } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) { - $besti = $i; - $best = $r; - } - } - if ($best) { - delete $by_host{$best->{'Host'}}; - } - $s->{'Host'} = $mach; - $s->{'Row'} = $besti; - $by_host{$mach} = $s; - $by_row[$besti] = $s; - } - return $s; -} - -my $gone_counter = 1; -sub delete_slot($) { - my ($s) = @_; - $s->{'Gone'} = $gone_counter++; -} - -sub redraw_slot($) { - my ($s) = @_; - my $mach = $s->{'Host'}; - my $stat = $s->{'Status'} // "?"; - my $jid = $s->{'Job'} // ""; - my $jname = ($jid eq "" ? "" : $queue->job_name($jid)); - my $jcnt = $job_cnt{$mach}; - if ($jcnt->{'running'}) { - if ($jcnt->{'failed'}) { - $C->bkgdset(COLOR_PAIR(4) | A_BOLD); - } else { - $C->bkgdset(COLOR_PAIR(3) | A_BOLD); - } - } else { - if ($jcnt->{'failed'}) { - $C->bkgdset(COLOR_PAIR(4)); - } else { - $C->bkgdset(0); - } - } - my $r = $s->{'Row'} + 1; - $C->addstr($r, 0, sprintf("%-20.20s", $mach)); - if ($jcnt->{'failed'}) { - $C->bkgdset(COLOR_PAIR(4)); - $C->addstr(sprintf("%3dE ", $jcnt->{'failed'})); - } else { - $C->bkgdset(0); - $C->addstr(" "); - } - $C->bkgdset(0); - $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'})); - if ($stat eq 'DONE') { - if (defined $host_last_fail_stat{$mach}) { - $C->bkgdset(COLOR_PAIR(4)); - $C->addstr(sprintf(" %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach}))); - } - } else { - my $text = sprintf(" %-8s %s", $stat, $jname); - $C->addstr($text); - } - $C->clrtoeol; - $C->refresh; -} - -sub update($$$$) { - my ($ui, $mach, $jid, $stat) = @_; - my $s = get_slot($mach); - given ($stat) { - when ('READY') { - # Pseudo-state generated internally - $ui->set_host_status($mach, 'ready'); - $ui->set_job_status($mach, $jid, 'ready'); - } - when ('OK') { - $ui->set_job_status($mach, $jid, 'done'); - } - when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) { - $ui->set_job_status($mach, $jid, 'failed'); - $host_last_fail_job{$mach} = $jid; - $host_last_fail_stat{$mach} = $stat; - } - when ('DONE') { - if ($job_cnt{$mach}{'failed'}) { - $ui->set_host_status($mach, 'failed'); - } else { - $ui->set_host_status($mach, 'done'); - } - } - when ('INIT') { - $ui->set_host_status($mach, 'running'); - $ui->set_job_status($mach, $jid, 'running') if defined $jid; - } - when ('LOCKED') { - if (defined $jid) { - $ui->set_job_status($mach, $jid, 'failed'); - } else { - for my $j (keys %{$job_state{$mach}}) { - $ui->set_job_status($mach, $jid, 'failed'); - } - $ui->set_host_status($mach, 'failed'); - $host_last_fail_job{$mach} = $jid; - $host_last_fail_stat{$mach} = $stat; - } - } - when (['START', 'PING', 'SEND', 'RUN']) { - } - default { - $ui->err("Received unknown job status $stat"); - } - } - $s->{'Job'} = $jid; - $s->{'Status'} = $stat; - redraw_slot($s); - if ($stat eq 'DONE') { delete_slot($s); } - $ui->refresh_status; -} diff --git a/bq b/bq deleted file mode 100755 index 6fb7a17..0000000 --- a/bq +++ /dev/null @@ -1,158 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 2.0 -- Show Queued Jobs -# (c) 2011 Martin Mares - -use strict; -use warnings; -use Getopt::Long; -use POSIX; - -use lib 'lib'; -use BEX; - -my $op_by_job; -my $op_by_host; -my $op_rm; -my $op_move_to; - -my $queue_name; -my $given_job; - -GetOptions( - "by-job!" => \$op_by_job, - "h|by-host!" => \$op_by_host, - "rm!" => \$op_rm, - "move-to=s" => \$op_move_to, - "j|job=s" => \$given_job, - "q|queue=s" => \$queue_name, -) or die <] [[!] ...] - -Actions: - --by-job Show jobs sorted by job ID (default) --h, --by-host Show jobs sorted by host - --rm Remove jobs from the queue - --move-to= Move jobs to a different queue - -Options: --j, --job= Act on the specified job (default: on all) --q, --queue= Act on the given queue -AMEN - -my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); -my $queue = BEX::Queue->new($queue_name); - -# Select jobs -my %jobs = (); -my %machs = (); -for my $m (@machines) { - for my $j ($queue->scan($m)) { - if (defined $given_job) { - next if $j ne $given_job; - } - push @{$jobs{$j}}, $m; - push @{$machs{$m}}, $j; - } -} - -sub do_ls(); -sub do_rm(); -sub do_move_to(); - -my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to); -if ($ops > 1) { die "Multiple actions are not allowed\n"; } - -if ($op_rm) { do_rm(); } -elsif (defined $op_move_to) { do_move_to(); } -else { do_ls(); } -exit 0; - -sub do_ls() -{ - my %stat = (); - my %mach_locked = (); - for my $m (keys %machs) { - $mach_locked{$m} = $queue->is_locked($m, undef); - for my $j (@{$machs{$m}}) { - my $st = $queue->read_job_status($m, $j); - if (defined($st->{'Time'}) && defined($st->{'Status'})) { - $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' . - POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']'; - } else { - $stat{$m}{$j} = ''; - } - if ($mach_locked{$m} || $queue->is_locked($m, $j)) { - $stat{$m}{$j} .= ' [LOCKED]'; - } - } - } - - if ($queue->is_locked(undef, undef)) { - print "### Queue lock present\n\n"; - } - - if ($op_by_host) { - for my $m (sort keys %machs) { - print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n"; - for my $j (@{$machs{$m}}) { - print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n"; - } - } - } else { - for my $j (sort keys %jobs) { - print $queue->job_name($j), "\n"; - for my $m (sort @{$jobs{$j}}) { - print "\t$m", $stat{$m}{$j}, "\n"; - } - } - } -} - -sub do_rm() -{ - my $err = 0; - for my $m (sort keys %machs) { - for my $j (sort @{$machs{$m}}) { - if (!$queue->lock($m, $j)) { - print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n"; - $err = 1; - } else { - $queue->update_job_status($m, $j, 'REMOVED'); - $queue->remove($m, $j); - print "Removed $m:", $queue->job_name($j), "\n"; - } - } - } - $queue->unlock; - exit $err; -} - -sub do_move_to() -{ - my $err = 0; - my $dest = BEX::Queue->new($op_move_to); - $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n"; - for my $j (sort keys %jobs) { - my $job = BEX::Job->new_from_file($queue->job_file($j)); - for my $m (sort @{$jobs{$j}}) { - if (!$queue->lock($m, $j)) { - print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n"; - $err = 1; - } else { - my $enq = $dest->enqueue($m, $job); - if ($enq) { - $dest->update_job_status($m, $job->id, 'NEW', 'Moved to this queue'); - } else { - $dest->log($m, $job->id, 'REQUEUE', 'Moved to this queue'); - } - $queue->update_job_status($m, $job->id, 'REMOVED', 'Moved from this queue'); - $queue->remove($m, $j); - print "Moved $m:", $dest->job_name($j); - print " (already queued)" if !$enq; - print "\n"; - } - } - } - $queue->unlock; - exit $err; -} diff --git a/brun b/brun deleted file mode 100755 index 040b6ba..0000000 --- a/brun +++ /dev/null @@ -1,194 +0,0 @@ -#!/usr/bin/perl -# Batch EXecutor 2.0 -- Run Queued Jobs -# (c) 2011 Martin Mares - -use strict; -use warnings; -use Getopt::Long; - -use lib 'lib'; -use BEX; - -my $given_job; -my $queue_name; -my $status_fifo; - -GetOptions( - "j|job=s" => \$given_job, - "q|queue=s" => \$queue_name, - "s|status-fifo=s" => \$status_fifo, -) or die <] [[!] ...] - -Options: --j, --job= Run only the specified job --q, --queue= Select job queue - --status-fifo= Send status updates to the given named pipe -AMEN - -my $status_fd; -if (defined $status_fifo) { - open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!"; - autoflush $status_fd, 1; -} - -sub update_status($$$$;$) { - my ($mach, $job, $status, $log_on_queue, $msg) = @_; - if ($status_fd) { - print $status_fd "! $mach $job $status\n"; - } - if ($log_on_queue) { - $log_on_queue->update_job_status($mach, $job, $status, $msg); - } -} - -my %pings; - -sub ping_machine($) { - my ($mach) = @_; - if (!defined $pings{$mach}) { - if ($BEX::Config::ping_hosts) { - update_status($mach, '-', 'PING', undef); - my $host = BEX::Config::host_name($mach); - `ping -c1 -n $host >/dev/null 2>/dev/null`; - $pings{$mach} = !$?; - } else { - $pings{$mach} = 1; - } - } - if ($pings{$mach}) { - return ('OK', undef); - } else { - return ('NOPING', 'Does not ping'); - } -} - -sub exit_status($) { - my ($s) = @_; - if ($s >> 8) { - return "with exit code " . ($s >> 8); - } else { - return "on fatal signal " . ($s & 127); - } -} - -sub run_job_prep($$$) { - my ($job, $queue, $mach) = @_; - my $prep = $job->attr('Prep'); - defined($prep) && $prep !~ /^\s*$/ or return 'OK'; - - my $jid = $job->id; - update_status($mach, $jid, 'PREP', $queue); - my $lf = $queue->log_file($mach, $jid); - $ENV{'HOST'} = BEX::Config::host_name($mach); - system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf"; - delete $ENV{'HOST'}; - if ($?) { - return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?)); - } else { - return 'OK'; - } -} - -sub run_job_body($$$) { - my ($job, $queue, $mach) = @_; - - if ($job->attr('body') =~ /^\s*$/s) { - # Shortcut if the body is empty - return 'OK' - } - - my $host = BEX::Config::host_name($mach); - my $jid = $job->id; - - my $tmp = $queue->temp_file($mach, $jid); - open T, '>', $tmp or die; - if (defined $BEX::Config::job_prolog) { - open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!"); - while (

) { print T; } - close P; - } else { - print T "#!/bin/sh\n"; - } - print T "# BEX job ", $jid, "\n"; - print T $job->attr('body'); - if (defined $BEX::Config::job_epilog) { - open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!"); - while () { print T; } - close E; - } - close T; - - update_status($mach, $jid, 'SEND', undef); - my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t'; - my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`; - !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed'); - chomp $rtmp; - - update_status($mach, $jid, 'RUN', $queue); - my $lf = $queue->log_file($mach, $jid); - system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf"; - if ($?) { - return ('FAILED', 'Job failed ' . exit_status($?)); - } else { - return 'OK'; - } -} - -sub run_job($$$) { - my ($job, $queue, $mach) = @_; - my ($stat, $msg); - - ($stat, $msg) = ping_machine($mach); - $stat eq 'OK' or return ($stat, $msg); - - ($stat, $msg) = run_job_prep($job, $queue, $mach); - $stat eq 'OK' or return ($stat, $msg); - - return run_job_body($job, $queue, $mach); -} - -my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); -my $queue = BEX::Queue->new($queue_name); - -$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n"; - -for my $mach (@machines) { - my @q = $queue->scan($mach) or next; - if (!$queue->lock($mach, undef)) { - print "### Machine $mach is locked by another brun, skipping...\n"; - update_status($mach, '-', 'LOCKED', undef); - update_status($mach, '-', 'DONE', undef); - next; - } - update_status($mach, '-', 'INIT', undef); - while (my $jid = shift @q) { - if (defined $given_job) { - $jid eq $given_job or next; - } - my $job = BEX::Job->new_from_file($queue->job_file($jid)); - update_status($mach, $jid, 'INIT', undef); - if (!$queue->lock($mach, $jid)) { - print "### Skipping locked $jid on $mach ###\n"; - update_status($mach, $jid, 'LOCKED', undef); - next; - } - print "### Running ", $job->name, " on $mach ###\n"; - my ($s, $msg) = run_job($job, $queue, $mach); - update_status($mach, $jid, $s, $queue, $msg); - - if ($s eq 'OK') { - print "+++ OK\n"; - $queue->remove($mach, $jid); - } else { - print "--- $s: $msg\n"; - if ($BEX::Config::skip_on_fail) { - print "### Skipping other jobs on the same host ###\n" if @q; - last; - } - } - } -} continue { - update_status($mach, '-', 'DONE', undef); -} -$queue->unlock; diff --git a/epilog b/epilog deleted file mode 100644 index bc2da7a..0000000 --- a/epilog +++ /dev/null @@ -1 +0,0 @@ -# BEX epilog diff --git a/lib/BEX.pm b/lib/BEX.pm deleted file mode 100644 index 579eff2..0000000 --- a/lib/BEX.pm +++ /dev/null @@ -1,13 +0,0 @@ -# Batch EXecutor 2.0 -# (c) 2011 Martin Mares - -use strict; -use warnings; - -package BEX; - -use BEX::Config; -use BEX::Job; -use BEX::Queue; - -42; diff --git a/lib/BEX/Config.pm b/lib/BEX/Config.pm deleted file mode 100644 index a5ee67e..0000000 --- a/lib/BEX/Config.pm +++ /dev/null @@ -1,89 +0,0 @@ -# Batch EXecutor 2.0 -- Configuration -# (c) 2011 Martin Mares - -use strict; -use warnings; - -package BEX::Config; - -# This file specifies default values, which can be overridden in BEX.cf - -# A hash of all known machines and groups -# 'name' => { Option => ... } for a machine; options: -# Host => 'name' Host name to use in ssh, ping, ... -# 'name' => ['a','b','c'] for a group containing specified machines/subgroups -our %machines = ( -); - - -# A file whose contents should be prepended before the job. Should start with the "#!" line. -our $job_prolog = 'prolog'; - -# A file whose contents should be appended to the job -our $job_epilog = 'epilog'; - -# Keep history of successfully completed jobs -our $keep_history = 1; - -# Before we try to connect to a host, ping it to check if it's alive -our $ping_hosts = 1; - -# Whenever we want to run a job on a machine, we must obtain a lock. -# Available locking schemes are: -# none - no locking takes place (dangerous!) -# job - obtain exclusive access to the job, other jobs on the same -# host can run in parallel -# host - obtain exclusive access to the host, other hosts can run -# queue - obtain exclusive access to the whole queue -our $locking_scheme = 'host'; - -# Maximum number of simultaneously running jobs in `bprun' -our $max_parallel_jobs = 5; - -# When a job fails, skip all other jobs on the same host -# (however, when locking_scheme is set to `job', another instance of `brun' -# still could run such jobs in parallel) -our $skip_on_fail = 0; - -# How we run ssh (including options) -our $ssh_command = "ssh"; - -# Various utility functions - -sub parse_machine_list(@); - -sub parse_machine_list(@) { - my %set = (); - for my $m (@_) { - if ($m eq '*') { - for my $mm (keys %machines) { - if (ref($machines{$mm}) eq 'HASH') { - $set{$mm} = 1; - } - } - next; - } - my $op = 1; - if ($m =~ s{^!}{}) { $op = 0; } - my $v = $machines{$m}; - if (!defined $v) { - die "Unknown machine or class: $m\n"; - } elsif (ref($v) eq 'HASH') { - $set{$m} = $op; - } elsif (ref($v) eq 'ARRAY') { - for my $mm (parse_machine_list(@$v)) { - $set{$mm} = $op; - } - } - } - return sort grep { $set{$_} } keys %set; -} - -sub host_name($) { - my ($mach) = @_; - return $machines{$mach}->{'Host'} // $mach; -} - -require 'BEX.cf'; - -42; diff --git a/lib/BEX/Job.pm b/lib/BEX/Job.pm deleted file mode 100644 index d040f1a..0000000 --- a/lib/BEX/Job.pm +++ /dev/null @@ -1,94 +0,0 @@ -# Batch EXecutor 2.0 -- Jobs -# (c) 2011 Martin Mares - -use strict; -use warnings; - -package BEX::Job; - -use POSIX (); - -our $job_cnt = 0; - -sub check_id($) { - my ($id) = @_; - return $id =~ /^([0-9A-Za-z-]+)$/; -} - -sub new($;$) { - my ($class, $id) = @_; - my $job = { }; - bless $job; - if (defined $id) { - check_id($id) or die "Invalid job ID"; - $job->{'ID'} = $id; - } else { - $job_cnt++; - $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime); - } - $job->{'Subject'} = ''; - return $job; -} - -sub new_from_file($$;$) { - my ($class, $file, $header_only) = @_; - my $job = { }; - open T, '<', $file or die "Cannot open $file: $!"; - while () { - chomp; - /^$/ and last; - /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error"; - !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined"; - $job->{$1} = $2; - } - if (!$header_only) { - my @cmds = ; - $job->{'body'} = join("", @cmds); - } - close T; - $job->{'Subject'} //= ''; - $job->{'ID'} or die "Cannot load $file: Missing ID"; - check_id($job->{'ID'}) or die "Cannot load $file: Invalid ID syntax"; - return bless $job; -} - -sub id($) { - return $_[0]->{'ID'}; -} - -sub name($) { - my ($job) = @_; - my $name = $job->{'ID'}; - my $subj = $job->{'Subject'} // ""; - $name .= " ($subj)" if $subj !~ /^\s*$/; - return $name; -} - -sub attr($$;$) { - my ($job, $attr, $val) = @_; - $job->{$attr} = $val if defined $val; - return $job->{$attr}; -} - -sub dump($) { - my ($job) = @_; - for my $k (sort keys %$job) { - print "$k: ", $job->{$k}, "\n"; - } -} - -sub save($;$) { - my ($job, $fn) = @_; - -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!"; - $fn //= 'tmp/' . $job->id; - open T, '>', $fn or die "Cannot create $fn: $!"; - for my $k (sort grep { /^[A-Z]/ } keys %$job) { - print T "$k: ", $job->{$k}, "\n"; - } - print T "\n"; - print T $job->{'body'} if defined $job->{'body'}; - close T; - return $fn; -} - -42; diff --git a/lib/BEX/Queue.pm b/lib/BEX/Queue.pm deleted file mode 100644 index 3fb94f3..0000000 --- a/lib/BEX/Queue.pm +++ /dev/null @@ -1,249 +0,0 @@ -# Batch EXecutor 2.0 -- Queues -# (c) 2011 Martin Mares - -use strict; -use warnings; -use feature 'switch'; - -package BEX::Queue; - -use IO::File; -use File::Path; -use Fcntl qw(:flock); -use POSIX (); - -sub new($;$) { - my ($class, $name) = @_; - $name //= 'queue'; - -d $name or die "Queue directory $name does not exist\n"; - for my $d ("hosts", "jobs") { - -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!"; - } - my $queue = { - 'Name' => $name, - 'MetaCache' => {}, - }; - return bless $queue; -} - -sub log_file($$) { - my ($queue, $machine, $jid) = @_; - return $queue->host_dir($machine) . '/' . $jid . '.log'; -} - -# Most actions have to be logged by the caller -sub log($$$$;$) { - my ($queue, $mach, $jid, $stat, $msg) = @_; - my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime); - my $m = join(" ", $t, $mach, $jid, $stat); - $m .= " $msg" if defined $msg; - - my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Name'} . '/log', '>>' or die "Cannot open log: $!"; - print $fh "$m\n"; - - # Append to the per-job log file - if (open L, '>>', $queue->log_file($mach, $jid)) { - print L "### $m\n"; - close L; - } -} - -sub host_dir($$) { - my ($queue, $machine) = @_; - return $queue->{'Name'} . '/hosts/' . $machine; -} - -sub queue_file($$) { - my ($queue, $machine, $jid) = @_; - return $queue->host_dir($machine) . '/' . $jid . '.job'; -} - -sub status_file($$) { - my ($queue, $machine, $jid) = @_; - return $queue->host_dir($machine) . '/' . $jid . '.stat'; -} - -sub temp_file($$) { - my ($queue, $machine, $jid) = @_; - return $queue->host_dir($machine) . '/' . $jid . '.tmp'; -} - -sub job_file($$) { - my ($queue, $jid) = @_; - return $queue->{'Name'} . '/jobs/' . $jid. '.job'; -} - -sub enqueue($$$) { - my ($queue, $machine, $job) = @_; - my $qf = $queue->queue_file($machine, $job->id); - if (-f $qf) { - return 0; - } - my $fn = $queue->job_file($job->id); - -f $fn or $job->save($fn); - my $dir = $queue->host_dir($machine); - -d $dir or mkdir $dir or die "Cannot create directory $dir: $!"; - symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!"; - return 1; -} - -sub scan($$) { - my ($queue, $machine) = @_; - my @list = (); - if (opendir D, $queue->host_dir($machine)) { - while ($_ = readdir D) { - /^\./ and next; - s{\.job}{} or next; - push @list, $_; - } - closedir D; - } - return sort @list; -} - -sub remove($$;$) { - my ($queue, $machine, $jid, $force_remove) = @_; - if ($BEX::Config::keep_history && !$force_remove) { - my $s = $queue->{'Name'} . '/hosts/' . $machine; - my $d = $queue->{'Name'} . '/history/' . $machine; - File::Path::mkpath($d); - for my $suff ('job', 'stat', 'log') { - my $src = "$s/$jid.$suff"; - my $dst = "$d/$jid.$suff"; - if (-f $src) { - rename $src, $dst or die "Cannot rename $src to $dst: $!"; - } else { - # Might be present from the previous incarnation of the same job - unlink $dst; - } - } - } else { - unlink $queue->queue_file($machine, $jid); - unlink $queue->status_file($machine, $jid); - unlink $queue->log_file($machine, $jid); - } - unlink $queue->temp_file($machine, $jid); -} - -sub job_metadata($$) { - my ($queue, $jid) = @_; - my $cache = $queue->{'MetaCache'}; - if (!defined $cache->{$jid}) { - $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1); - } - return $cache->{$jid}; -} - -sub job_name($$) { - my ($queue, $jid) = @_; - return $queue->job_metadata($jid)->name; -} - -sub read_job_status($$$) { - my ($queue, $machine, $jid) = @_; - my %s = (); - my $sf = $queue->status_file($machine, $jid); - if (open S, '<', $sf) { - while () { - chomp; - /^(\w+):\s*(.*)/ or die "Parse error in $sf"; - $s{$1} = $2; - } - close S; - } - return \%s; -} - -sub write_job_status($$$$) { - my ($queue, $machine, $jid, $stat) = @_; - my $sf = $queue->status_file($machine, $jid); - open S, '>', "$sf.$$" or die "Cannot create $sf.$$: $!"; - for my $k (sort keys %$stat) { - print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k}; - } - close S; - rename "$sf.$$", $sf or die "Cannot rename $sf.$$ to $sf: $!"; -} - -sub update_job_status($$$$;$) { - my ($queue, $machine, $jid, $stat, $msg) = @_; - my $s = { - 'Time' => time, - 'Status' => $stat, - 'Message' => $msg, - }; - $queue->write_job_status($machine, $jid, $s); - $queue->log($machine, $jid, $stat, $msg); -} - -sub lock_name($$$) { - my ($queue, $machine, $jid) = @_; - my $lock = $queue->{'Name'}; - if (defined $jid) { - $lock .= "/hosts/$machine/$jid.lock"; - } elsif (defined $machine) { - $lock .= "/hosts/$machine/lock"; - } else { - $lock .= '/lock'; - } -} - -# Whenever we want to run a job on a machine, we must obtain a lock; -# at most one lock can be held at a time by a single BEX::Queue object. -# See the description of locking schemes in BEX::Config. -sub lock($$$) { - my ($queue, $machine, $jid) = @_; - my $lock; - given ($BEX::Config::locking_scheme) { - when ('queue') { - $lock = lock_name($queue, undef, undef); - } - when ('host') { - defined($machine) or return 1; - $lock = lock_name($queue, $machine, undef); - } - when ('job') { - defined($machine) && defined($jid) or return 1; - $lock = lock_name($queue, $machine, $jid); - } - when ('none') { return 1; } - default { die "Invalid BEX::Config::locking_scheme"; } - } - if (defined($queue->{'LockName'})) { - return 1 if ($queue->{'LockName'} eq $lock); - $queue->unlock; - } - open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!"; - if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) { - close $queue->{'LockHandle'}; - delete $queue->{'LockHandle'}; - return 0; - } - $queue->{'LockName'} = $lock; - return 1; -} - -sub unlock($) { - my ($queue) = @_; - defined $queue->{'LockName'} or return; - unlink $queue->{'LockName'}; - flock $queue->{'LockHandle'}, LOCK_UN; - close $queue->{'LockHandle'}; - delete $queue->{'LockHandle'}; - delete $queue->{'LockName'}; -} - -# Unsafe (does not check fcntl, only existence of a lock file), but should be enough for bq -sub is_locked($$$) { - my ($queue, $machine, $jid) = @_; - given ($BEX::Config::locking_scheme) { - # Shortcuts - when ('host') { return unless defined $machine; } - when ('jid') { return unless defined $jid; } - when ('none') { return; } - } - my $lock = lock_name($queue, $machine, $jid); - return -f $lock; -} - -42; diff --git a/lib/bin/add b/lib/bin/add new file mode 100755 index 0000000..95fbbb8 --- /dev/null +++ b/lib/bin/add @@ -0,0 +1,100 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Insert to Queue +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use File::stat; +use BEX; + +my $given_body; +my $given_go; +my $given_id; +my $queue_name; +my $requeue_id; +my $given_subject; +my $given_template; + +GetOptions( + "b|body=s" => \$given_body, + "g|go!" => \$given_go, + "i|id=s" => \$given_id, + "q|queue=s" => \$queue_name, + "r|requeue=s" => \$requeue_id, + "s|subject=s" => \$given_subject, + "t|template=s" => \$given_template, +) or die <] [!] ... + +Options: +-b, --body= Load job body from the given file +-g, --go Do not run editor, go enqueue the job immediately +-i, --id= Set job ID of the new job +-q, --queue= Insert new jobs to the given queue +-r, --requeue= Re-queue an existing job instead of creating a new one +-s, --subject= Set subject of the new job +-t, --template= Load job template (headers and body) from the given file +AMEN + +# Prepare machine set +@ARGV or die "No machines specified\n"; +my @machines = BEX::Config::parse_machine_list(@ARGV); +@machines or die "No machines match\n"; + +my $queue = BEX::Queue->new($queue_name); +my $job; +my $tmp_fn; + +if (defined $requeue_id) { + # When requeueing, just fetch the existing job + if (defined($given_body) || defined($given_id) || defined($given_subject) || defined($given_template)) { + die "Parameters of a requeued job cannot be changed\n"; + } + my $fn = $queue->job_file($requeue_id); + -f $fn or die "Job $requeue_id not known\n"; + $job = BEX::Job->new_from_file($fn); +} else { + # Create job template + if (defined $given_template) { + $job = BEX::Job->new_from_file($given_template); + } else { + $job = BEX::Job->new; + } + $job->attr('ID', $given_id) if defined $given_id; + $job->attr('Subject', $given_subject) if defined $given_subject; + if (defined $given_body) { + open B, '<', $given_body or die "Cannot open $given_body: $!\n"; + local $/; + $job->attr('body', ); + close B; + } + + # Let the user edit the template + if (!$given_go) { + $tmp_fn = $job->save; + my $orig_stat = stat($tmp_fn) or die; + system "editor", $tmp_fn and die "Editor exited with an error, file kept as $tmp_fn\n"; + my $new_stat = stat($tmp_fn) or die "File $tmp_fn disappeared under my hands: $!\n"; + if ($new_stat->mtime <= $orig_stat->mtime && $new_stat->size == $orig_stat->size) { + unlink $tmp_fn; + die "Cancelled\n"; + } + $job = BEX::Job->new_from_file($tmp_fn); + } +} + +# Put the job to the queue +print "New job ", $job->id, "\n"; +for my $m (@machines) { + if ($queue->enqueue($m, $job)) { + $queue->update_job_status($m, $job->id, 'NEW'); + print "\t$m\n"; + } else { + $queue->log($m, $job->id, 'REQUEUE'); + print "\t$m (already queued)\n"; + } +} + +# Remove the temporary file if there's any +unlink $tmp_fn if defined $tmp_fn; diff --git a/lib/bin/job b/lib/bin/job new file mode 100755 index 0000000..bfd2f31 --- /dev/null +++ b/lib/bin/job @@ -0,0 +1,32 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Insert to Queue +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use BEX; + +my $edit; +my $queue_name; + +GetOptions( + "e|edit!" => \$edit, + "q|queue=s" => \$queue_name, +) && @ARGV == 1 or die <] + +Options: +-e, --edit Run editor on the given job (no locking) +-q, --queue= Act on the given queue +AMEN + +my $queue = BEX::Queue->new($queue_name); +my $fn = $queue->job_file($ARGV[0]); +-f $fn or die "No such job " . $ARGV[0] . "\n"; + +if ($edit) { + system "editor", $fn; +} else { + system "cat", $fn; +} diff --git a/lib/bin/ls b/lib/bin/ls new file mode 100755 index 0000000..5da501e --- /dev/null +++ b/lib/bin/ls @@ -0,0 +1,42 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Insert to Queue +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use BEX; + +my $edit; +my $queue_name; + +GetOptions( +) && @ARGV == 0 or die <] + +Options: +None defined so far. +AMEN + +my $machines = \%BEX::Config::machines; + +print "# Hosts:\n"; +for my $h (sort keys %$machines) { + my $m = $machines->{$h}; + ref $m eq 'HASH' or next; + print "$h\n"; +} + +print "\n# Groups:\n"; +for my $h (sort keys %$machines) { + my $m = $machines->{$h}; + ref $m eq 'ARRAY' or next; + print "$h = ", join(" ", + map { + my $x = $machines->{$_}; + !defined($x) ? "$_?" : + ref $x eq 'HASH' ? $_ : + ref $x eq 'ARRAY' ? "\@$_" : + "$_???" + } @$m), "\n"; +} diff --git a/lib/bin/prun b/lib/bin/prun new file mode 100755 index 0000000..d8b9872 --- /dev/null +++ b/lib/bin/prun @@ -0,0 +1,341 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Insert to Queue +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use feature 'switch'; +use Getopt::Long; +use POSIX; +use BEX; + +my $queue_name; +my $screen_session = 'BEX'; +my $text_mode; + +GetOptions( + "q|queue=s" => \$queue_name, + "session=s" => \$screen_session, + "text!" => \$text_mode, +) or die <] [[!] ...] + +Options: +-q, --queue= Run jobs in the given queue + --session= Job windows should be opened within the given screen + session (default: BEX) + --text Use textual user interface instead of curses +AMEN + +system 'screen', '-S', $screen_session, '-X', 'select', '.'; +!$? or die "Screen session $screen_session not found\n"; + +my $queue = BEX::Queue->new($queue_name); +my $fifo_name = $queue->{'Name'} . '/status-fifo'; +unlink $fifo_name; +mkfifo $fifo_name, 0700 or die "Cannot create $fifo_name: $!"; +open FIFO, '+<', $fifo_name or die "Cannot open $fifo_name: $!"; + +my $ui = ($text_mode ? BEX::bprun::text->new : BEX::bprun::curses->new); + +my @machines = (); +for my $mach (BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*')) { + my @jobs = $queue->scan($mach); + @jobs or next; + push @machines, $mach; + for (@jobs) { $ui->update($mach, $_, 'READY'); } +} + +my %running = (); +my $max = $BEX::Config::max_parallel_jobs; + +while (keys %running || @machines) { + if (@machines && keys %running < $max) { + my $mach = shift @machines; + $ui->update($mach, undef, 'START'); + my @scr = ('screen', '-t', $mach); + push @scr, '-S', $screen_session if defined $screen_session; + push @scr, '-X', 'screen', './brun', "--status-fifo=$fifo_name", $mach; + system @scr; + !$? or $ui->update($mach, undef, 'INTERR'); + $running{$mach} = 'START'; + next; + } + $_ = ; + chomp; + my ($mach, $jid, $stat) = /^! (\S+) (\S+) (\S+)$/; + if (!defined $stat) { + $ui->err("Received invalid status message <$_>"); + next; + } + if (!defined $running{$mach}) { + $ui->err("Received status message <$_> for a machine which does not run"); + next; + } + $running{$mach} = $stat; + $ui->update($mach, ($jid eq '-' ? undef : $jid), $stat); + if ($stat eq 'DONE') { + delete $running{$mach}; + } +} + +close FIFO; +unlink $fifo_name; +$ui->done; + +package BEX::bprun::text; + +sub new($) { + return bless {}; +} + +sub done($) { +} + +sub update($$$$) { + my ($ui, $mach, $jid, $stat) = @_; + print +($mach // '-'), (defined($jid) ? ":$jid" : ""), " $stat\n"; +} + +sub err($$) { + my ($ui, $msg) = @_; + print STDERR "ERROR: $msg\n"; +} + +package BEX::bprun::curses; + +use Curses; + +my $C; + +my $nrows; +my @by_row = (); +my %by_host = (); + +my %host_state; +my %host_cnt; + +my %job_state; +my %job_cnt; + +my %host_last_fail_job; +my %host_last_fail_stat; + +sub new($) { + $C = new Curses; + start_color; + has_colors && COLORS >= 8 && COLOR_PAIRS >= 8 or die "Your terminal is too dumb for me\n"; + cbreak; noecho; + $C->intrflush(0); + $C->keypad(1); + $C->meta(1); + $C->clear; + init_pair(1, COLOR_YELLOW, COLOR_BLUE); + init_pair(2, COLOR_YELLOW, COLOR_RED); + init_pair(3, COLOR_YELLOW, COLOR_BLACK); + init_pair(4, COLOR_RED, COLOR_BLACK); + + $nrows = $C->getmaxy - 2; + if ($BEX::Config::max_parallel_jobs > $nrows) { + $BEX::Config::max_parallel_jobs = $nrows; + } + + %host_state = %host_cnt = (); + %job_state = %job_cnt = (); + for my $s ('unknown', 'ready', 'running', 'done', 'failed') { + $host_cnt{$s} = 0; + $job_cnt{'*'}{$s} = 0; + } + + my $ui = bless {}; + $ui->refresh_status; + return $ui; +} + +sub done($) +{ + $C->bkgdset(COLOR_PAIR(1) | A_BOLD); + $C->addstr($C->getmaxy-1, 0, "Press any key to quit..."); + $C->clrtoeol; + $C->getch; + endwin; +} + +sub err($$) { + my ($ui, $msg) = @_; + $C->bkgdset(COLOR_PAIR(2) | A_BOLD); + $C->addnstr($C->getmaxy-1, 0, "ERROR: $msg", $C->getmaxx); + $C->clrtoeol; + $C->refresh; +} + +sub set_host_status($$$) { + my ($ui, $mach, $stat) = @_; + my $prev_stat = $host_state{$mach}; + if (defined $prev_stat) { + $host_cnt{$prev_stat}--; + } else { + for my $s ('unknown', 'ready', 'running', 'done', 'failed') { $job_cnt{$mach}{$s} = 0; } + } + $host_state{$mach} = $stat; + $host_cnt{$stat}++; +} + +sub set_job_status($$$$) { + my ($ui, $mach, $jid, $stat) = @_; + my $prev_stat = $job_state{$mach}{$jid} // 'unknown'; + $job_cnt{$mach}{$prev_stat}--; + $job_cnt{'*'}{$prev_stat}--; + $job_state{$mach}{$jid} = $stat; + $job_cnt{$mach}{$stat}++; + $job_cnt{'*'}{$stat}++; +} + +sub refresh_status($) { + $C->bkgdset(COLOR_PAIR(1) | A_BOLD); + $C->addnstr(0, 0, + sprintf("BEX Hosts: %dR %dD %dE %dW Jobs: %dR %dD %dE %dW", + $host_cnt{'running'}, + $host_cnt{'done'}, + $host_cnt{'failed'}, + $host_cnt{'ready'}, + $job_cnt{'*'}{'running'}, + $job_cnt{'*'}{'done'}, + $job_cnt{'*'}{'failed'}, + $job_cnt{'*'}{'ready'}, + ), $C->getmaxx); + $C->clrtoeol; + $C->refresh; +} + +sub get_slot($) { + my ($mach) = @_; + my $s; + if (defined ($s = $by_host{$mach})) { + delete $s->{'Gone'}; + } else { + my ($best, $besti); + for my $i (0..$nrows-1) { + my $r = $by_row[$i]; + if (!defined $r) { + $besti = $i; + $best = undef; + last; + } elsif ($r->{'Gone'} && (!$best || $best->{'Gone'} > $r->{'Gone'})) { + $besti = $i; + $best = $r; + } + } + if ($best) { + delete $by_host{$best->{'Host'}}; + } + $s->{'Host'} = $mach; + $s->{'Row'} = $besti; + $by_host{$mach} = $s; + $by_row[$besti] = $s; + } + return $s; +} + +my $gone_counter = 1; +sub delete_slot($) { + my ($s) = @_; + $s->{'Gone'} = $gone_counter++; +} + +sub redraw_slot($) { + my ($s) = @_; + my $mach = $s->{'Host'}; + my $stat = $s->{'Status'} // "?"; + my $jid = $s->{'Job'} // ""; + my $jname = ($jid eq "" ? "" : $queue->job_name($jid)); + my $jcnt = $job_cnt{$mach}; + if ($jcnt->{'running'}) { + if ($jcnt->{'failed'}) { + $C->bkgdset(COLOR_PAIR(4) | A_BOLD); + } else { + $C->bkgdset(COLOR_PAIR(3) | A_BOLD); + } + } else { + if ($jcnt->{'failed'}) { + $C->bkgdset(COLOR_PAIR(4)); + } else { + $C->bkgdset(0); + } + } + my $r = $s->{'Row'} + 1; + $C->addstr($r, 0, sprintf("%-20.20s", $mach)); + if ($jcnt->{'failed'}) { + $C->bkgdset(COLOR_PAIR(4)); + $C->addstr(sprintf("%3dE ", $jcnt->{'failed'})); + } else { + $C->bkgdset(0); + $C->addstr(" "); + } + $C->bkgdset(0); + $C->addstr(sprintf("%3dD %3dW", $jcnt->{'done'}, $jcnt->{'ready'})); + if ($stat eq 'DONE') { + if (defined $host_last_fail_stat{$mach}) { + $C->bkgdset(COLOR_PAIR(4)); + $C->addstr(sprintf(" %-8s %s", $host_last_fail_stat{$mach}, $queue->job_name($host_last_fail_job{$mach}))); + } + } else { + my $text = sprintf(" %-8s %s", $stat, $jname); + $C->addstr($text); + } + $C->clrtoeol; + $C->refresh; +} + +sub update($$$$) { + my ($ui, $mach, $jid, $stat) = @_; + my $s = get_slot($mach); + given ($stat) { + when ('READY') { + # Pseudo-state generated internally + $ui->set_host_status($mach, 'ready'); + $ui->set_job_status($mach, $jid, 'ready'); + } + when ('OK') { + $ui->set_job_status($mach, $jid, 'done'); + } + when (['FAILED', 'INTERR', 'NOPING', 'PREPFAIL']) { + $ui->set_job_status($mach, $jid, 'failed'); + $host_last_fail_job{$mach} = $jid; + $host_last_fail_stat{$mach} = $stat; + } + when ('DONE') { + if ($job_cnt{$mach}{'failed'}) { + $ui->set_host_status($mach, 'failed'); + } else { + $ui->set_host_status($mach, 'done'); + } + } + when ('INIT') { + $ui->set_host_status($mach, 'running'); + $ui->set_job_status($mach, $jid, 'running') if defined $jid; + } + when ('LOCKED') { + if (defined $jid) { + $ui->set_job_status($mach, $jid, 'failed'); + } else { + for my $j (keys %{$job_state{$mach}}) { + $ui->set_job_status($mach, $jid, 'failed'); + } + $ui->set_host_status($mach, 'failed'); + $host_last_fail_job{$mach} = $jid; + $host_last_fail_stat{$mach} = $stat; + } + } + when (['START', 'PING', 'SEND', 'RUN']) { + } + default { + $ui->err("Received unknown job status $stat"); + } + } + $s->{'Job'} = $jid; + $s->{'Status'} = $stat; + redraw_slot($s); + if ($stat eq 'DONE') { delete_slot($s); } + $ui->refresh_status; +} diff --git a/lib/bin/queue b/lib/bin/queue new file mode 100755 index 0000000..483e794 --- /dev/null +++ b/lib/bin/queue @@ -0,0 +1,156 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Insert to Queue +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use POSIX; +use BEX; + +my $op_by_job; +my $op_by_host; +my $op_rm; +my $op_move_to; + +my $queue_name; +my $given_job; + +GetOptions( + "by-job!" => \$op_by_job, + "h|by-host!" => \$op_by_host, + "rm!" => \$op_rm, + "move-to=s" => \$op_move_to, + "j|job=s" => \$given_job, + "q|queue=s" => \$queue_name, +) or die <] [[!] ...] + +Actions: + --by-job Show jobs sorted by job ID (default) +-h, --by-host Show jobs sorted by host + --rm Remove jobs from the queue + --move-to= Move jobs to a different queue + +Options: +-j, --job= Act on the specified job (default: on all) +-q, --queue= Act on the given queue +AMEN + +my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); +my $queue = BEX::Queue->new($queue_name); + +# Select jobs +my %jobs = (); +my %machs = (); +for my $m (@machines) { + for my $j ($queue->scan($m)) { + if (defined $given_job) { + next if $j ne $given_job; + } + push @{$jobs{$j}}, $m; + push @{$machs{$m}}, $j; + } +} + +sub do_ls(); +sub do_rm(); +sub do_move_to(); + +my $ops = 0 + defined($op_by_host) + defined($op_by_job) + defined($op_rm) + defined($op_move_to); +if ($ops > 1) { die "Multiple actions are not allowed\n"; } + +if ($op_rm) { do_rm(); } +elsif (defined $op_move_to) { do_move_to(); } +else { do_ls(); } +exit 0; + +sub do_ls() +{ + my %stat = (); + my %mach_locked = (); + for my $m (keys %machs) { + $mach_locked{$m} = $queue->is_locked($m, undef); + for my $j (@{$machs{$m}}) { + my $st = $queue->read_job_status($m, $j); + if (defined($st->{'Time'}) && defined($st->{'Status'})) { + $stat{$m}{$j} = ' [' . $st->{'Status'} . ' on ' . + POSIX::strftime('%Y-%m-%d', localtime $st->{'Time'}) . ']'; + } else { + $stat{$m}{$j} = ''; + } + if ($mach_locked{$m} || $queue->is_locked($m, $j)) { + $stat{$m}{$j} .= ' [LOCKED]'; + } + } + } + + if ($queue->is_locked(undef, undef)) { + print "### Queue lock present\n\n"; + } + + if ($op_by_host) { + for my $m (sort keys %machs) { + print "$m", ($mach_locked{$m} ? ' [LOCKED]' : ''), "\n"; + for my $j (@{$machs{$m}}) { + print "\t" . $queue->job_name($j) . $stat{$m}{$j}, "\n"; + } + } + } else { + for my $j (sort keys %jobs) { + print $queue->job_name($j), "\n"; + for my $m (sort @{$jobs{$j}}) { + print "\t$m", $stat{$m}{$j}, "\n"; + } + } + } +} + +sub do_rm() +{ + my $err = 0; + for my $m (sort keys %machs) { + for my $j (sort @{$machs{$m}}) { + if (!$queue->lock($m, $j)) { + print STDERR "Cannot remove $m:", $queue->job_name($j), ", it is locked\n"; + $err = 1; + } else { + $queue->update_job_status($m, $j, 'REMOVED'); + $queue->remove($m, $j); + print "Removed $m:", $queue->job_name($j), "\n"; + } + } + } + $queue->unlock; + exit $err; +} + +sub do_move_to() +{ + my $err = 0; + my $dest = BEX::Queue->new($op_move_to); + $dest->{'Name'} ne $queue->{'Name'} or die "Moving to the same queue is not permitted\n"; + for my $j (sort keys %jobs) { + my $job = BEX::Job->new_from_file($queue->job_file($j)); + for my $m (sort @{$jobs{$j}}) { + if (!$queue->lock($m, $j)) { + print STDERR "Cannot move $m:", $queue->job_name($j), ", it is locked\n"; + $err = 1; + } else { + my $enq = $dest->enqueue($m, $job); + if ($enq) { + $dest->update_job_status($m, $job->id, 'NEW', 'Moved to this queue'); + } else { + $dest->log($m, $job->id, 'REQUEUE', 'Moved to this queue'); + } + $queue->update_job_status($m, $job->id, 'REMOVED', 'Moved from this queue'); + $queue->remove($m, $j); + print "Moved $m:", $dest->job_name($j); + print " (already queued)" if !$enq; + print "\n"; + } + } + } + $queue->unlock; + exit $err; +} diff --git a/lib/bin/run b/lib/bin/run new file mode 100755 index 0000000..e311729 --- /dev/null +++ b/lib/bin/run @@ -0,0 +1,192 @@ +#!/usr/bin/perl +# Batch EXecutor 3.0 -- Insert to Queue +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use Getopt::Long; +use BEX; + +my $given_job; +my $queue_name; +my $status_fifo; + +GetOptions( + "j|job=s" => \$given_job, + "q|queue=s" => \$queue_name, + "s|status-fifo=s" => \$status_fifo, +) or die <] [[!] ...] + +Options: +-j, --job= Run only the specified job +-q, --queue= Select job queue + --status-fifo= Send status updates to the given named pipe +AMEN + +my $status_fd; +if (defined $status_fifo) { + open $status_fd, '>>', $status_fifo or die "Cannot open status FIFO: $!"; + autoflush $status_fd, 1; +} + +sub update_status($$$$;$) { + my ($mach, $job, $status, $log_on_queue, $msg) = @_; + if ($status_fd) { + print $status_fd "! $mach $job $status\n"; + } + if ($log_on_queue) { + $log_on_queue->update_job_status($mach, $job, $status, $msg); + } +} + +my %pings; + +sub ping_machine($) { + my ($mach) = @_; + if (!defined $pings{$mach}) { + if ($BEX::Config::ping_hosts) { + update_status($mach, '-', 'PING', undef); + my $host = BEX::Config::host_name($mach); + `ping -c1 -n $host >/dev/null 2>/dev/null`; + $pings{$mach} = !$?; + } else { + $pings{$mach} = 1; + } + } + if ($pings{$mach}) { + return ('OK', undef); + } else { + return ('NOPING', 'Does not ping'); + } +} + +sub exit_status($) { + my ($s) = @_; + if ($s >> 8) { + return "with exit code " . ($s >> 8); + } else { + return "on fatal signal " . ($s & 127); + } +} + +sub run_job_prep($$$) { + my ($job, $queue, $mach) = @_; + my $prep = $job->attr('Prep'); + defined($prep) && $prep !~ /^\s*$/ or return 'OK'; + + my $jid = $job->id; + update_status($mach, $jid, 'PREP', $queue); + my $lf = $queue->log_file($mach, $jid); + $ENV{'HOST'} = BEX::Config::host_name($mach); + system 'bash', '-o', 'pipefail', '-c', "( $prep ) 2>&1 | tee -a $lf"; + delete $ENV{'HOST'}; + if ($?) { + return ('PREPFAIL', 'Preparatory command failed ' . exit_status($?)); + } else { + return 'OK'; + } +} + +sub run_job_body($$$) { + my ($job, $queue, $mach) = @_; + + if ($job->attr('body') =~ /^\s*$/s) { + # Shortcut if the body is empty + return 'OK' + } + + my $host = BEX::Config::host_name($mach); + my $jid = $job->id; + + my $tmp = $queue->temp_file($mach, $jid); + open T, '>', $tmp or die; + if (defined $BEX::Config::job_prolog) { + open P, $BEX::Config::job_prolog or return ('INTERR', "Cannot open prolog: $!"); + while (

) { print T; } + close P; + } else { + print T "#!/bin/sh\n"; + } + print T "# BEX job ", $jid, "\n"; + print T $job->attr('body'); + if (defined $BEX::Config::job_epilog) { + open E, $BEX::Config::job_epilog or return ('INTERR', "Cannot open epilog: $!"); + while () { print T; } + close E; + } + close T; + + update_status($mach, $jid, 'SEND', undef); + my $cmd = 't=$(mktemp -t bex-XXXXXXXX) && cat >$t && chmod u+x $t && echo $t'; + my $rtmp = `$BEX::Config::ssh_command <$tmp $host '$cmd'`; + !$? && defined($rtmp) && $rtmp ne '' or return ('NOXFER', 'Transfer failed'); + chomp $rtmp; + + update_status($mach, $jid, 'RUN', $queue); + my $lf = $queue->log_file($mach, $jid); + system 'bash', '-o', 'pipefail', '-c', "$BEX::Config::ssh_command $host '$rtmp ; e=\$? ; rm -f $rtmp ; exit \$e' 2>&1 | tee -a $lf"; + if ($?) { + return ('FAILED', 'Job failed ' . exit_status($?)); + } else { + return 'OK'; + } +} + +sub run_job($$$) { + my ($job, $queue, $mach) = @_; + my ($stat, $msg); + + ($stat, $msg) = ping_machine($mach); + $stat eq 'OK' or return ($stat, $msg); + + ($stat, $msg) = run_job_prep($job, $queue, $mach); + $stat eq 'OK' or return ($stat, $msg); + + return run_job_body($job, $queue, $mach); +} + +my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); +my $queue = BEX::Queue->new($queue_name); + +$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n"; + +for my $mach (@machines) { + my @q = $queue->scan($mach) or next; + if (!$queue->lock($mach, undef)) { + print "### Machine $mach is locked by another brun, skipping...\n"; + update_status($mach, '-', 'LOCKED', undef); + update_status($mach, '-', 'DONE', undef); + next; + } + update_status($mach, '-', 'INIT', undef); + while (my $jid = shift @q) { + if (defined $given_job) { + $jid eq $given_job or next; + } + my $job = BEX::Job->new_from_file($queue->job_file($jid)); + update_status($mach, $jid, 'INIT', undef); + if (!$queue->lock($mach, $jid)) { + print "### Skipping locked $jid on $mach ###\n"; + update_status($mach, $jid, 'LOCKED', undef); + next; + } + print "### Running ", $job->name, " on $mach ###\n"; + my ($s, $msg) = run_job($job, $queue, $mach); + update_status($mach, $jid, $s, $queue, $msg); + + if ($s eq 'OK') { + print "+++ OK\n"; + $queue->remove($mach, $jid); + } else { + print "--- $s: $msg\n"; + if ($BEX::Config::skip_on_fail) { + print "### Skipping other jobs on the same host ###\n" if @q; + last; + } + } + } +} continue { + update_status($mach, '-', 'DONE', undef); +} +$queue->unlock; diff --git a/lib/perl/BEX.pm b/lib/perl/BEX.pm new file mode 100644 index 0000000..2c257de --- /dev/null +++ b/lib/perl/BEX.pm @@ -0,0 +1,13 @@ +# Batch EXecutor 3.0 +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; + +package BEX; + +use BEX::Config; +use BEX::Job; +use BEX::Queue; + +42; diff --git a/lib/perl/BEX/Config.pm b/lib/perl/BEX/Config.pm new file mode 100644 index 0000000..6a2efdd --- /dev/null +++ b/lib/perl/BEX/Config.pm @@ -0,0 +1,94 @@ +# Batch EXecutor 3.0 -- Configuration +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; + +package BEX::Config; + +# This file specifies default values, which can be overridden in BEX.cf + +# A hash of all known machines and groups +# 'name' => { Option => ... } for a machine; options: +# Host => 'name' Host name to use in ssh, ping, ... +# 'name' => ['a','b','c'] for a group containing specified machines/subgroups +our %machines = ( +); + +# Home directory in which everything resides +our $bex_home = $ENV{"BEX_HOME"} // "."; + +# Configuration directory +our $bex_cf_dir = $bex_home . "/BEX"; + +# A file whose contents should be prepended before the job. Should start with the "#!" line. +our $job_prolog = $bex_cf_dir . '/prolog'; + +# A file whose contents should be appended to the job +our $job_epilog = $bex_cf_dir . '/epilog'; + +# Keep history of successfully completed jobs +our $keep_history = 1; + +# Before we try to connect to a host, ping it to check if it's alive +our $ping_hosts = 1; + +# Whenever we want to run a job on a machine, we must obtain a lock. +# Available locking schemes are: +# none - no locking takes place (dangerous!) +# job - obtain exclusive access to the job, other jobs on the same +# host can run in parallel +# host - obtain exclusive access to the host, other hosts can run +# queue - obtain exclusive access to the whole queue +our $locking_scheme = 'host'; + +# Maximum number of simultaneously running jobs in `bprun' +our $max_parallel_jobs = 5; + +# When a job fails, skip all other jobs on the same host +# (however, when locking_scheme is set to `job', another instance of `brun' +# still could run such jobs in parallel) +our $skip_on_fail = 0; + +# How we run ssh (including options) +our $ssh_command = "ssh"; + +# Various utility functions + +sub parse_machine_list(@); + +sub parse_machine_list(@) { + my %set = (); + for my $m (@_) { + if ($m eq '*') { + for my $mm (keys %machines) { + if (ref($machines{$mm}) eq 'HASH') { + $set{$mm} = 1; + } + } + next; + } + my $op = 1; + if ($m =~ s{^!}{}) { $op = 0; } + my $v = $machines{$m}; + if (!defined $v) { + die "Unknown machine or class: $m\n"; + } elsif (ref($v) eq 'HASH') { + $set{$m} = $op; + } elsif (ref($v) eq 'ARRAY') { + for my $mm (parse_machine_list(@$v)) { + $set{$mm} = $op; + } + } + } + return sort grep { $set{$_} } keys %set; +} + +sub host_name($) { + my ($mach) = @_; + return $machines{$mach}->{'Host'} // $mach; +} + +require $bex_cf_dir . '/config'; + +42; diff --git a/lib/perl/BEX/Job.pm b/lib/perl/BEX/Job.pm new file mode 100644 index 0000000..a459cc3 --- /dev/null +++ b/lib/perl/BEX/Job.pm @@ -0,0 +1,94 @@ +# Batch EXecutor 3.0 -- Jobs +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; + +package BEX::Job; + +use POSIX (); + +our $job_cnt = 0; + +sub check_id($) { + my ($id) = @_; + return $id =~ /^([0-9A-Za-z-]+)$/; +} + +sub new($;$) { + my ($class, $id) = @_; + my $job = { }; + bless $job; + if (defined $id) { + check_id($id) or die "Invalid job ID"; + $job->{'ID'} = $id; + } else { + $job_cnt++; + $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime); + } + $job->{'Subject'} = ''; + return $job; +} + +sub new_from_file($$;$) { + my ($class, $file, $header_only) = @_; + my $job = { }; + open T, '<', $file or die "Cannot open $file: $!"; + while () { + chomp; + /^$/ and last; + /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error"; + !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined"; + $job->{$1} = $2; + } + if (!$header_only) { + my @cmds = ; + $job->{'body'} = join("", @cmds); + } + close T; + $job->{'Subject'} //= ''; + $job->{'ID'} or die "Cannot load $file: Missing ID"; + check_id($job->{'ID'}) or die "Cannot load $file: Invalid ID syntax"; + return bless $job; +} + +sub id($) { + return $_[0]->{'ID'}; +} + +sub name($) { + my ($job) = @_; + my $name = $job->{'ID'}; + my $subj = $job->{'Subject'} // ""; + $name .= " ($subj)" if $subj !~ /^\s*$/; + return $name; +} + +sub attr($$;$) { + my ($job, $attr, $val) = @_; + $job->{$attr} = $val if defined $val; + return $job->{$attr}; +} + +sub dump($) { + my ($job) = @_; + for my $k (sort keys %$job) { + print "$k: ", $job->{$k}, "\n"; + } +} + +sub save($;$) { + my ($job, $fn) = @_; + -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!"; + $fn //= 'tmp/' . $job->id; + open T, '>', $fn or die "Cannot create $fn: $!"; + for my $k (sort grep { /^[A-Z]/ } keys %$job) { + print T "$k: ", $job->{$k}, "\n"; + } + print T "\n"; + print T $job->{'body'} if defined $job->{'body'}; + close T; + return $fn; +} + +42; diff --git a/lib/perl/BEX/Queue.pm b/lib/perl/BEX/Queue.pm new file mode 100644 index 0000000..6ae9240 --- /dev/null +++ b/lib/perl/BEX/Queue.pm @@ -0,0 +1,249 @@ +# Batch EXecutor 3.0 -- Queues +# (c) 2011-2012 Martin Mares + +use strict; +use warnings; +use feature 'switch'; + +package BEX::Queue; + +use IO::File; +use File::Path; +use Fcntl qw(:flock); +use POSIX (); + +sub new($;$) { + my ($class, $name) = @_; + $name //= 'queue'; + -d $name or die "Queue directory $name does not exist\n"; + for my $d ("hosts", "jobs") { + -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!"; + } + my $queue = { + 'Name' => $name, + 'MetaCache' => {}, + }; + return bless $queue; +} + +sub log_file($$) { + my ($queue, $machine, $jid) = @_; + return $queue->host_dir($machine) . '/' . $jid . '.log'; +} + +# Most actions have to be logged by the caller +sub log($$$$;$) { + my ($queue, $mach, $jid, $stat, $msg) = @_; + my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime); + my $m = join(" ", $t, $mach, $jid, $stat); + $m .= " $msg" if defined $msg; + + my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Name'} . '/log', '>>' or die "Cannot open log: $!"; + print $fh "$m\n"; + + # Append to the per-job log file + if (open L, '>>', $queue->log_file($mach, $jid)) { + print L "### $m\n"; + close L; + } +} + +sub host_dir($$) { + my ($queue, $machine) = @_; + return $queue->{'Name'} . '/hosts/' . $machine; +} + +sub queue_file($$) { + my ($queue, $machine, $jid) = @_; + return $queue->host_dir($machine) . '/' . $jid . '.job'; +} + +sub status_file($$) { + my ($queue, $machine, $jid) = @_; + return $queue->host_dir($machine) . '/' . $jid . '.stat'; +} + +sub temp_file($$) { + my ($queue, $machine, $jid) = @_; + return $queue->host_dir($machine) . '/' . $jid . '.tmp'; +} + +sub job_file($$) { + my ($queue, $jid) = @_; + return $queue->{'Name'} . '/jobs/' . $jid. '.job'; +} + +sub enqueue($$$) { + my ($queue, $machine, $job) = @_; + my $qf = $queue->queue_file($machine, $job->id); + if (-f $qf) { + return 0; + } + my $fn = $queue->job_file($job->id); + -f $fn or $job->save($fn); + my $dir = $queue->host_dir($machine); + -d $dir or mkdir $dir or die "Cannot create directory $dir: $!"; + symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!"; + return 1; +} + +sub scan($$) { + my ($queue, $machine) = @_; + my @list = (); + if (opendir D, $queue->host_dir($machine)) { + while ($_ = readdir D) { + /^\./ and next; + s{\.job}{} or next; + push @list, $_; + } + closedir D; + } + return sort @list; +} + +sub remove($$;$) { + my ($queue, $machine, $jid, $force_remove) = @_; + if ($BEX::Config::keep_history && !$force_remove) { + my $s = $queue->{'Name'} . '/hosts/' . $machine; + my $d = $queue->{'Name'} . '/history/' . $machine; + File::Path::mkpath($d); + for my $suff ('job', 'stat', 'log') { + my $src = "$s/$jid.$suff"; + my $dst = "$d/$jid.$suff"; + if (-f $src) { + rename $src, $dst or die "Cannot rename $src to $dst: $!"; + } else { + # Might be present from the previous incarnation of the same job + unlink $dst; + } + } + } else { + unlink $queue->queue_file($machine, $jid); + unlink $queue->status_file($machine, $jid); + unlink $queue->log_file($machine, $jid); + } + unlink $queue->temp_file($machine, $jid); +} + +sub job_metadata($$) { + my ($queue, $jid) = @_; + my $cache = $queue->{'MetaCache'}; + if (!defined $cache->{$jid}) { + $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1); + } + return $cache->{$jid}; +} + +sub job_name($$) { + my ($queue, $jid) = @_; + return $queue->job_metadata($jid)->name; +} + +sub read_job_status($$$) { + my ($queue, $machine, $jid) = @_; + my %s = (); + my $sf = $queue->status_file($machine, $jid); + if (open S, '<', $sf) { + while () { + chomp; + /^(\w+):\s*(.*)/ or die "Parse error in $sf"; + $s{$1} = $2; + } + close S; + } + return \%s; +} + +sub write_job_status($$$$) { + my ($queue, $machine, $jid, $stat) = @_; + my $sf = $queue->status_file($machine, $jid); + open S, '>', "$sf.$$" or die "Cannot create $sf.$$: $!"; + for my $k (sort keys %$stat) { + print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k}; + } + close S; + rename "$sf.$$", $sf or die "Cannot rename $sf.$$ to $sf: $!"; +} + +sub update_job_status($$$$;$) { + my ($queue, $machine, $jid, $stat, $msg) = @_; + my $s = { + 'Time' => time, + 'Status' => $stat, + 'Message' => $msg, + }; + $queue->write_job_status($machine, $jid, $s); + $queue->log($machine, $jid, $stat, $msg); +} + +sub lock_name($$$) { + my ($queue, $machine, $jid) = @_; + my $lock = $queue->{'Name'}; + if (defined $jid) { + $lock .= "/hosts/$machine/$jid.lock"; + } elsif (defined $machine) { + $lock .= "/hosts/$machine/lock"; + } else { + $lock .= '/lock'; + } +} + +# Whenever we want to run a job on a machine, we must obtain a lock; +# at most one lock can be held at a time by a single BEX::Queue object. +# See the description of locking schemes in BEX::Config. +sub lock($$$) { + my ($queue, $machine, $jid) = @_; + my $lock; + given ($BEX::Config::locking_scheme) { + when ('queue') { + $lock = lock_name($queue, undef, undef); + } + when ('host') { + defined($machine) or return 1; + $lock = lock_name($queue, $machine, undef); + } + when ('job') { + defined($machine) && defined($jid) or return 1; + $lock = lock_name($queue, $machine, $jid); + } + when ('none') { return 1; } + default { die "Invalid BEX::Config::locking_scheme"; } + } + if (defined($queue->{'LockName'})) { + return 1 if ($queue->{'LockName'} eq $lock); + $queue->unlock; + } + open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!"; + if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) { + close $queue->{'LockHandle'}; + delete $queue->{'LockHandle'}; + return 0; + } + $queue->{'LockName'} = $lock; + return 1; +} + +sub unlock($) { + my ($queue) = @_; + defined $queue->{'LockName'} or return; + unlink $queue->{'LockName'}; + flock $queue->{'LockHandle'}, LOCK_UN; + close $queue->{'LockHandle'}; + delete $queue->{'LockHandle'}; + delete $queue->{'LockName'}; +} + +# Unsafe (does not check fcntl, only existence of a lock file), but should be enough for bq +sub is_locked($$$) { + my ($queue, $machine, $jid) = @_; + given ($BEX::Config::locking_scheme) { + # Shortcuts + when ('host') { return unless defined $machine; } + when ('jid') { return unless defined $jid; } + when ('none') { return; } + } + my $lock = lock_name($queue, $machine, $jid); + return -f $lock; +} + +42; diff --git a/prolog b/prolog deleted file mode 100644 index 3350a91..0000000 --- a/prolog +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh -# BEX prolog -set -e