1 # Batch EXecutor 2.0 -- Queues
2 # (c) 2011 Martin Mares <mj@ucw.cz>
16 my ($class, $name) = @_;
18 -d $name or die "Queue directory $name does not exist\n";
19 for my $d ("hosts", "jobs") {
20 -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
30 my ($queue, $machine, $jid) = @_;
31 return $queue->host_dir($machine) . '/' . $jid . '.log';
34 # Most actions have to be logged by the caller
36 my ($queue, $mach, $jid, $stat, $msg) = @_;
37 my $t = POSIX::strftime("%Y-%m-%d %H:%M:%S", localtime);
38 my $m = join(" ", $t, $mach, $jid, $stat);
39 $m .= " $msg" if defined $msg;
41 my $fh = $queue->{'LogFH'} //= new IO::File $queue->{'Name'} . '/log', '>>' or die "Cannot open log: $!";
44 # Append to the per-job log file
45 if (open L, '>>', $queue->log_file($mach, $jid)) {
52 my ($queue, $machine) = @_;
53 return $queue->{'Name'} . '/hosts/' . $machine;
57 my ($queue, $machine, $jid) = @_;
58 return $queue->host_dir($machine) . '/' . $jid . '.job';
62 my ($queue, $machine, $jid) = @_;
63 return $queue->host_dir($machine) . '/' . $jid . '.stat';
67 my ($queue, $machine, $jid) = @_;
68 return $queue->host_dir($machine) . '/' . $jid . '.tmp';
72 my ($queue, $jid) = @_;
73 return $queue->{'Name'} . '/jobs/' . $jid. '.job';
77 my ($queue, $machine, $job) = @_;
78 my $qf = $queue->queue_file($machine, $job->id);
80 $queue->log($machine, $job->id, 'REQUEUE');
83 my $fn = $queue->job_file($job->id);
84 -f $fn or $job->save($fn);
85 my $dir = $queue->host_dir($machine);
86 -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
87 symlink '../../jobs/' . $job->id . '.job', $qf or die "Cannot create $qf: $!";
88 $queue->log($machine, $job->id, 'QUEUE');
93 my ($queue, $machine) = @_;
95 if (opendir D, $queue->host_dir($machine)) {
96 while ($_ = readdir D) {
107 my ($queue, $machine, $jid) = @_;
108 if ($BEX::Config::keep_history) {
109 my $s = $queue->{'Name'} . '/hosts/' . $machine;
110 my $d = $queue->{'Name'} . '/history/' . $machine;
111 File::Path::mkpath($d);
112 for my $suff ('job', 'stat', 'log') {
113 my $src = "$s/$jid.$suff";
114 my $dst = "$d/$jid.$suff";
116 rename $src, $dst or die "Cannot rename $src to $dst: $!";
120 unlink $queue->queue_file($machine, $jid);
121 unlink $queue->status_file($machine, $jid);
122 unlink $queue->log_file($machine, $jid);
124 unlink $queue->temp_file($machine, $jid);
127 sub job_metadata($$) {
128 my ($queue, $jid) = @_;
129 my $cache = $queue->{'MetaCache'};
130 if (!defined $cache->{$jid}) {
131 $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
133 return $cache->{$jid};
136 sub read_job_status($$$) {
137 my ($queue, $machine, $jid) = @_;
139 my $sf = $queue->status_file($machine, $jid);
140 if (open S, '<', $sf) {
143 /^(\w+):\s*(.*)/ or die "Parse error in $sf";
151 sub write_job_status($$$$) {
152 my ($queue, $machine, $jid, $stat) = @_;
153 my $sf = $queue->status_file($machine, $jid);
154 open S, '>', "$sf.$$" or die "Cannot create $sf.$$: $!";
155 for my $k (sort keys %$stat) {
156 print S "$k: ", $stat->{$k}, "\n" if defined $stat->{$k};
159 rename "$sf.$$", $sf or die "Cannot rename $sf.$$ to $sf: $!";
163 my ($queue, $machine, $jid) = @_;
164 my $lock = $queue->{'Name'};
166 $lock .= "/hosts/$machine/$jid.lock";
167 } elsif (defined $machine) {
168 $lock .= "/hosts/$machine/lock";
174 # Whenever we want to run a job on a machine, we must obtain a lock;
175 # at most one lock can be held at a time by a single BEX::Queue object.
176 # See the description of locking schemes in BEX::Config.
178 my ($queue, $machine, $jid) = @_;
180 given ($BEX::Config::locking_scheme) {
182 $lock = lock_name($queue, undef, undef);
185 defined($machine) or return 1;
186 $lock = lock_name($queue, $machine, undef);
189 defined($machine) && defined($jid) or return 1;
190 $lock = lock_name($queue, $machine, $jid);
192 when ('none') { return 1; }
193 default { die "Invalid BEX::Config::locking_scheme"; }
195 if (defined($queue->{'LockName'})) {
196 return 1 if ($queue->{'LockName'} eq $lock);
199 open $queue->{'LockHandle'}, '>>', $lock or die "Cannot create $lock: $!";
200 if (!flock($queue->{'LockHandle'}, LOCK_EX | LOCK_NB)) {
201 close $queue->{'LockHandle'};
202 delete $queue->{'LockHandle'};
205 $queue->{'LockName'} = $lock;
211 defined $queue->{'LockName'} or return;
212 unlink $queue->{'LockName'};
213 flock $queue->{'LockHandle'}, LOCK_UN;
214 close $queue->{'LockHandle'};
215 delete $queue->{'LockHandle'};
216 delete $queue->{'LockName'};
219 # Unsafe (does not check fcntl, only existence of a lock file), but should be enough for bq
221 my ($queue, $machine, $jid) = @_;
222 given ($BEX::Config::locking_scheme) {
224 when ('host') { return unless defined $machine; }
225 when ('jid') { return unless defined $jid; }
226 when ('none') { return; }
228 my $lock = lock_name($queue, $machine, $jid);