From: Martin Mares Date: Mon, 31 Oct 2011 11:50:05 +0000 (+0100) Subject: Implemented locking of queues, hosts and jobs X-Git-Tag: v3.0~52 X-Git-Url: http://mj.ucw.cz/gitweb/?a=commitdiff_plain;h=d54535753059c37ab3c4fec006dd4616b3210ca6;p=bex.git Implemented locking of queues, hosts and jobs --- diff --git a/TODO b/TODO index 7b7081e..1c75f38 100644 --- a/TODO +++ b/TODO @@ -4,9 +4,10 @@ - bprun: option for setting max # of running jobs - bprun --job - bprun --curses -- Locking - rsync, rsync-only - Terminology: machine vs. host - ssh options - Detector of orphans (unused queue dirs, jobs on non-existent machines, non-queued jobs) - job failed => give a more explanatory message +- write_job_status should be atomic +- change ((no subject)) diff --git a/brun b/brun index 449f8a1..8b6daeb 100755 --- a/brun +++ b/brun @@ -93,8 +93,16 @@ sub run_job($$$) { my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*'); my $queue = BEX::Queue->new($queue_name); +$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n"; + for my $mach (@machines) { my @q = $queue->scan($mach) or next; + if (!$queue->lock($mach, undef)) { + print "### Machine $mach is locked by another brun, skipping...\n"; + update_status($mach, '-', 'LOCKED', undef); + update_status($mach, '-', 'DONE', undef); + next; + } update_status($mach, '-', 'INIT', undef); my $ping; for my $jid (@q) { @@ -102,6 +110,11 @@ for my $mach (@machines) { $jid eq $given_job or next; } my $job = BEX::Job->new_from_file($queue->job_file($jid)); + if (!$queue->lock($mach, $jid)) { + print "### Skipping locked $jid on $mach ###\n"; + update_status($mach, $jid, 'LOCKED', undef); + next; + } my $stat = { 'Time' => time, }; @@ -113,6 +126,13 @@ for my $mach (@machines) { } else { ($s, $msg) = run_job($job, $queue, $mach); } + + $stat->{'Status'} = $s; + $stat->{'Message'} = $msg; + $queue->write_job_status($mach, $jid, $stat); + + # Called after writing the status file, so that the front-end watching + # our status FIFO can see the new status file. update_status($mach, $jid, $s, $queue, $msg); if ($s eq 'OK') { @@ -120,10 +140,8 @@ for my $mach (@machines) { $queue->remove($mach, $jid); } else { print "--- $s: $msg\n"; - $stat->{'Status'} = $s; - $stat->{'Message'} = $msg; - $queue->write_job_status($mach, $jid, $stat); } } update_status($mach, '-', 'DONE', undef); } +$queue->unlock; diff --git a/lib/BEX/Config.pm b/lib/BEX/Config.pm index 4526fae..45eebf5 100644 --- a/lib/BEX/Config.pm +++ b/lib/BEX/Config.pm @@ -28,6 +28,15 @@ our $keep_history = 1; # Before we try to connect to a host, ping it to check if it's alive our $ping_hosts = 1; +# Whenever we want to run a job on a machine, we must obtain a lock. +# Available locking schemes are: +# none - no locking takes place (dangerous!) +# job - obtain exclusive access to the job, other jobs on the same +# host can run in parallel +# host - obtain exclusive access to the host, other hosts can run +# queue - obtain exclusive access to the whole queue +our $locking_scheme = 'host'; + # Various utility functions sub parse_machine_list(@); diff --git a/lib/BEX/Queue.pm b/lib/BEX/Queue.pm index 712ecff..4735497 100644 --- a/lib/BEX/Queue.pm +++ b/lib/BEX/Queue.pm @@ -3,11 +3,13 @@ use strict; use warnings; +use feature 'switch'; package BEX::Queue; use IO::File; use File::Path; +use Fcntl qw(:flock); use POSIX (); sub new($;$) { @@ -151,9 +153,52 @@ sub write_job_status($$$$) { my $sf = $queue->status_file($machine, $jid); open S, '>', $sf or die "Cannot create $sf: $!"; for my $k (sort keys %$stat) { - print S "$k: ", $stat->{$k}, "\n"; + print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k}; } close S; } +# Whenever we want to run a job on a machine, we must obtain a lock; +# at most one lock can be held at a time by a single BEX::Queue object. +# See the description of locking schemes in BEX::Config. +sub lock($$$) { + my ($queue, $machine, $jid) = @_; + my $lock = $queue->{'Name'}; + given ($BEX::Config::locking_scheme) { + when ('queue') { $lock .= '/lock'; } + when ('host') { + defined($machine) or return 1; + $lock .= "/hosts/$machine/lock"; + } + when ('job') { + defined($machine) && defined($jid) or return 1; + $lock .= "/hosts/$machine/$jid.lock"; + } + when ('none') { return 1; } + default { die "Invalid BEX::Config::locking_scheme"; } + } + if (defined($queue->{'LockName'})) { + return 1 if ($queue->{'LockName'} eq $lock); + $queue->unlock; + } + open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!"; + if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) { + close $queue->{'LockHandle'}; + delete $queue->{'LockHandle'}; + return 0; + } + $queue->{'LockName'} = $lock; + return 1; +} + +sub unlock($) { + my ($queue) = @_; + defined $queue->{'LockName'} or return; + unlink $queue->{'LockName'}; + flock $queue->{'LockHandle'}, LOCK_UN; + close $queue->{'LockHandle'}; + delete $queue->{'LockHandle'}; + delete $queue->{'LockName'}; +} + 42;