]> mj.ucw.cz Git - bex.git/commitdiff
Implemented locking of queues, hosts and jobs
authorMartin Mares <mj@ucw.cz>
Mon, 31 Oct 2011 11:50:05 +0000 (12:50 +0100)
committerMartin Mares <mj@ucw.cz>
Mon, 31 Oct 2011 11:50:05 +0000 (12:50 +0100)
TODO
brun
lib/BEX/Config.pm
lib/BEX/Queue.pm

diff --git a/TODO b/TODO
index 7b7081e107361ce877695e355932be499e96705f..1c75f3870e0ff7c05ab18b3a0e0a789f329d05f2 100644 (file)
--- a/TODO
+++ b/TODO
@@ -4,9 +4,10 @@
 - bprun: option for setting max # of running jobs
 - bprun --job
 - bprun --curses
-- Locking
 - rsync, rsync-only
 - Terminology: machine vs. host
 - ssh options
 - Detector of orphans (unused queue dirs, jobs on non-existent machines, non-queued jobs)
 - job failed => give a more explanatory message
+- write_job_status should be atomic
+- change ((no subject))
diff --git a/brun b/brun
index 449f8a133f54fcc041d2a8a8e5430cc8250494ab..8b6daebae3d976a3529d85b23891f30e295e3144 100755 (executable)
--- a/brun
+++ b/brun
@@ -93,8 +93,16 @@ sub run_job($$$) {
 my @machines = BEX::Config::parse_machine_list(@ARGV ? @ARGV : '*');
 my $queue = BEX::Queue->new($queue_name);
 
+$queue->lock(undef, undef) or die "The queue is locked by another brun, cannot continue.\n";
+
 for my $mach (@machines) {
        my @q = $queue->scan($mach) or next;
+       if (!$queue->lock($mach, undef)) {
+               print "### Machine $mach is locked by another brun, skipping...\n";
+               update_status($mach, '-', 'LOCKED', undef);
+               update_status($mach, '-', 'DONE', undef);
+               next;
+       }
        update_status($mach, '-', 'INIT', undef);
        my $ping;
        for my $jid (@q) {
@@ -102,6 +110,11 @@ for my $mach (@machines) {
                        $jid eq $given_job or next;
                }
                my $job = BEX::Job->new_from_file($queue->job_file($jid));
+               if (!$queue->lock($mach, $jid)) {
+                       print "### Skipping locked $jid on $mach ###\n";
+                       update_status($mach, $jid, 'LOCKED', undef);
+                       next;
+               }
                my $stat = {
                        'Time' => time,
                };
@@ -113,6 +126,13 @@ for my $mach (@machines) {
                } else {
                        ($s, $msg) = run_job($job, $queue, $mach);
                }
+
+               $stat->{'Status'} = $s;
+               $stat->{'Message'} = $msg;
+               $queue->write_job_status($mach, $jid, $stat);
+
+               # Called after writing the status file, so that the front-end watching
+               # our status FIFO can see the new status file.
                update_status($mach, $jid, $s, $queue, $msg);
 
                if ($s eq 'OK') {
@@ -120,10 +140,8 @@ for my $mach (@machines) {
                        $queue->remove($mach, $jid);
                } else {
                        print "--- $s: $msg\n";
-                       $stat->{'Status'} = $s;
-                       $stat->{'Message'} = $msg;
-                       $queue->write_job_status($mach, $jid, $stat);
                }
        }
        update_status($mach, '-', 'DONE', undef);
 }
+$queue->unlock;
index 4526faeb3e9ab2d2fa0726e210f53d77cb3d175a..45eebf53e3447b67e9c23506c5baa364993e0a29 100644 (file)
@@ -28,6 +28,15 @@ our $keep_history = 1;
 # Before we try to connect to a host, ping it to check if it's alive
 our $ping_hosts = 1;
 
+# Whenever we want to run a job on a machine, we must obtain a lock.
+# Available locking schemes are:
+#      none - no locking takes place (dangerous!)
+#      job - obtain exclusive access to the job, other jobs on the same
+#            host can run in parallel
+#      host - obtain exclusive access to the host, other hosts can run
+#      queue - obtain exclusive access to the whole queue
+our $locking_scheme = 'host';
+
 # Various utility functions
 
 sub parse_machine_list(@);
index 712ecff20a6e928441cfaef05c1a16191fcf31c5..473549751aa52db4306be334cb55a7dec1b381bd 100644 (file)
@@ -3,11 +3,13 @@
 
 use strict;
 use warnings;
+use feature 'switch';
 
 package BEX::Queue;
 
 use IO::File;
 use File::Path;
+use Fcntl qw(:flock);
 use POSIX ();
 
 sub new($;$) {
@@ -151,9 +153,52 @@ sub write_job_status($$$$) {
        my $sf = $queue->status_file($machine, $jid);
        open S, '>', $sf or die "Cannot create $sf: $!";
        for my $k (sort keys %$stat) {
-               print S "$k: ", $stat->{$k}, "\n";
+               print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
        }
        close S;
 }
 
+# Whenever we want to run a job on a machine, we must obtain a lock;
+# at most one lock can be held at a time by a single BEX::Queue object.
+# See the description of locking schemes in BEX::Config.
+sub lock($$$) {
+       my ($queue, $machine, $jid) = @_;
+       my $lock = $queue->{'Name'};
+       given ($BEX::Config::locking_scheme) {
+               when ('queue') { $lock .= '/lock'; }
+               when ('host') {
+                       defined($machine) or return 1;
+                       $lock .= "/hosts/$machine/lock";
+               }
+               when ('job') {
+                       defined($machine) && defined($jid) or return 1;
+                       $lock .= "/hosts/$machine/$jid.lock";
+               }
+               when ('none') { return 1; }
+               default { die "Invalid BEX::Config::locking_scheme"; }
+       }
+       if (defined($queue->{'LockName'})) {
+               return 1 if ($queue->{'LockName'} eq $lock);
+               $queue->unlock;
+       }
+       open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
+       if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
+               close $queue->{'LockHandle'};
+               delete $queue->{'LockHandle'};
+               return 0;
+       }
+       $queue->{'LockName'} = $lock;
+       return 1;
+}
+
+sub unlock($) {
+       my ($queue) = @_;
+       defined $queue->{'LockName'} or return;
+       unlink $queue->{'LockName'};
+       flock $queue->{'LockHandle'}, LOCK_UN;
+       close $queue->{'LockHandle'};
+       delete $queue->{'LockHandle'};
+       delete $queue->{'LockName'};
+}
+
 42;