2 # (c) 2011 Martin Mares <mj@ucw.cz>
18 $log_file //= new IO::File '>>log' or die "Cannot open log: $!";
19 print $log_file POSIX::strftime("%Y-%m-%d %H:%M:%S ", localtime), $_[0], "\n";
29 my ($class, $id) = @_;
36 $job->{'ID'} = POSIX::strftime("%Y%m%d-%H%M%S-$$-$job_cnt", localtime);
38 $job->{'Subject'} = '(no subject)';
42 sub new_from_file($$;$) {
43 my ($class, $file, $header_only) = @_;
45 open T, '<', $file or die "Cannot open $file: $!";
49 /^([A-Z][A-Za-z0-9-]*):\s*(.*)/ or die "Cannot load $file: Header syntax error";
50 !defined $job->{$1} or die "Cannot load $file: Header $1 re-defined";
55 $job->{'body'} = join("", @cmds);
58 $job->{'Subject'} //= '?';
59 $job->{'ID'} or die "Cannot load $file: Missing ID";
64 my ($job, $attr, $val) = @_;
65 $job->{$attr} = $val if defined $val;
71 for my $k (sort keys %$job) {
72 print "$k: ", $job->{$k}, "\n";
78 -d "tmp" or mkdir "tmp" or die "Cannot create directory tmp: $!";
79 $fn //= 'tmp/' . $job->{'ID'};
80 open T, '>', $fn or die "Cannot create $fn: $!";
81 for my $k (sort grep { /^[A-Z]/ } keys %$job) {
82 print T "$k: ", $job->{$k}, "\n";
85 print T $job->{'body'} if defined $job->{'body'};
93 my ($class, $name) = @_;
95 -d $name or die "Queue directory $name does not exist\n";
96 for my $d ("hosts", "jobs") {
97 -d "$name/$d" or mkdir "$name/$d" or die "Cannot create directory $name/$d: $!";
107 my ($queue, $machine) = @_;
108 return $queue->{'Name'} . '/hosts/' . $machine;
112 my ($queue, $machine, $jid) = @_;
113 return $queue->host_dir($machine) . '/' . $jid . '.job';
116 sub status_file($$) {
117 my ($queue, $machine, $jid) = @_;
118 return $queue->host_dir($machine) . '/' . $jid . '.stat';
122 my ($queue, $machine, $jid) = @_;
123 return $queue->host_dir($machine) . '/' . $jid . '.tmp';
127 my ($queue, $jid) = @_;
128 return $queue->{'Name'} . '/jobs/' . $jid. '.job';
132 my ($queue, $machine, $job) = @_;
133 my $qf = $queue->queue_file($machine, $job->{'ID'});
134 if (-f $qf) { return 0; }
135 my $fn = $queue->job_file($job->{'ID'});
136 -f $fn or $job->save($fn);
137 my $dir = $queue->host_dir($machine);
138 -d $dir or mkdir $dir or die "Cannot create directory $dir: $!";
139 symlink '../../jobs/' . $job->{'ID'} . '.job', $qf or die "Cannot create $qf: $!";
144 my ($queue, $machine) = @_;
146 if (opendir D, $queue->host_dir($machine)) {
147 while ($_ = readdir D) {
158 my ($queue, $machine, $jid) = @_;
159 unlink $queue->queue_file($machine, $jid);
160 unlink $queue->status_file($machine, $jid);
161 unlink $queue->temp_file($machine, $jid);
164 sub job_metadata($$) {
165 my ($queue, $jid) = @_;
166 my $cache = $queue->{'MetaCache'};
167 if (!defined $cache->{$jid}) {
168 $cache->{$jid} = BEX::Job->new_from_file($queue->job_file($jid), 1);
170 return $cache->{$jid};
173 sub read_job_status($$$) {
174 my ($queue, $machine, $jid) = @_;
176 my $sf = $queue->status_file($machine, $jid);
177 if (open S, '<', $sf) {
180 /^(\w+):\s*(.*)/ or die "Parse error in $sf";
188 sub write_job_status($$$$) {
189 my ($queue, $machine, $jid, $stat) = @_;
190 my $sf = $queue->status_file($machine, $jid);
191 open S, '>', $sf or die "Cannot create $sf: $!";
192 for my $k (sort keys %$stat) {
193 print S "$k: ", $stat->{$k}, "\n";