Commit f0edaecf authored by Leo Gordon's avatar Leo Gordon
Browse files

improved STDOUT/STDERR redirection into files; removal of job logs on success

parent 2edfa351
......@@ -463,7 +463,7 @@ sub update_status {
Arg [1] : Bio::EnsEMBL::Hive::AnalysisJob $job
Example :
Description: if files are non-zero size, will update DB with location
Description: update locations of log files, if present
Returntype :
Exceptions :
Caller : Bio::EnsEMBL::Hive::Worker
......@@ -471,28 +471,17 @@ sub update_status {
=cut
sub store_out_files {
my ($self,$job) = @_;
return unless($job);
my $sql = sprintf("DELETE from job_file WHERE worker_id=%d and job_id=%d",
$job->worker_id, $job->dbID);
$self->dbc->do($sql);
return unless($job->stdout_file or $job->stderr_file);
my $insert_sql = 'INSERT INTO job_file (job_id, worker_id, retry, type, path) VALUES (?,?,?,?,?)';
my ($self, $job) = @_;
my $sth = $self->dbc()->prepare($insert_sql);
my @params = ($job->dbID(), $job->worker_id(), $job->retry_count());
if($job->stdout_file()) {
$sth->execute(@params, 'STDOUT', $job->stdout_file());
}
if($job->stderr_file()) {
$sth->execute(@params, 'STDERR', $job->stderr_file());
}
$sth->finish();
return;
if($job->stdout_file or $job->stderr_file) {
my $insert_sql = 'REPLACE INTO job_file (job_id, retry, worker_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
my $sth = $self->dbc()->prepare($insert_sql);
$sth->execute($job->dbID(), $job->retry_count(), $job->worker_id(), $job->stdout_file(), $job->stderr_file());
$sth->finish();
} else {
my $sql = 'DELETE from job_file WHERE worker_id='.$job->worker_id.' AND job_id='.$job->dbID;
$self->dbc->do($sql);
}
}
......
......@@ -77,6 +77,7 @@ use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
use Bio::EnsEMBL::Hive::Extensions;
use Bio::EnsEMBL::Hive::Process;
use Bio::EnsEMBL::Hive::Utils::RedirectStack;
use Bio::EnsEMBL::Hive::Utils ('dir_revhash'); # import dir_revhash
......@@ -367,6 +368,7 @@ sub worker_output_dir {
my $worker_id = $self->dbID();
my $dir_revhash = dir_revhash($worker_id);
$worker_output_dir = join('/', $hive_output_dir, dir_revhash($worker_id), 'worker_id_'.$worker_id );
}
......@@ -379,6 +381,18 @@ sub worker_output_dir {
return $self->{'_worker_output_dir'};
}
sub get_stdout_redirector {
my $self = shift;
return $self->{_stdout_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDOUT);
}
sub get_stderr_redirector {
my $self = shift;
return $self->{_stderr_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDERR);
}
sub perform_cleanup {
my $self = shift;
......@@ -394,13 +408,13 @@ sub print_worker {
" host=",$self->host,
" pid=",$self->process_id,
"\n");
print(" batch_size = ", $self->analysis->stats->get_or_estimate_batch_size(),"\n");
print(" job_limit = ", $self->job_limit,"\n") if(defined($self->job_limit));
print(" life_span = ", $self->life_span,"\n") if(defined($self->life_span));
print("\tbatch_size = ", $self->analysis->stats->get_or_estimate_batch_size(),"\n");
print("\tjob_limit = ", $self->job_limit,"\n") if(defined($self->job_limit));
print("\tlife_span = ", $self->life_span,"\n") if(defined($self->life_span));
if(my $worker_output_dir = $self->worker_output_dir) {
print(" worker_output_dir = $worker_output_dir\n");
print("\tworker_output_dir = $worker_output_dir\n");
} else {
print(" worker_output_dir = STDOUT/STDERR\n");
print("\tworker_output_dir = STDOUT/STDERR\n");
}
}
......@@ -461,14 +475,8 @@ sub run {
$self->print_worker();
if( my $worker_output_dir = $self->worker_output_dir ) {
open OLDOUT, ">&STDOUT";
open OLDERR, ">&STDERR";
open WORKER_STDOUT, ">${worker_output_dir}/worker.out";
open WORKER_STDERR, ">${worker_output_dir}/worker.err";
close STDOUT;
close STDERR;
open STDOUT, ">&WORKER_STDOUT";
open STDERR, ">&WORKER_STDERR";
$self->get_stdout_redirector->push( $worker_output_dir.'/worker.out' );
$self->get_stderr_redirector->push( $worker_output_dir.'/worker.err' );
$self->print_worker();
}
......@@ -549,12 +557,8 @@ sub run {
print("total jobs completed : ", $self->work_done, "\n");
if( $self->worker_output_dir() ) {
close STDOUT;
close STDERR;
close WORKER_STDOUT;
close WORKER_STDERR;
open STDOUT, ">&", \*OLDOUT;
open STDERR, ">&", \*OLDERR;
$self->get_stdout_redirector->pop();
$self->get_stderr_redirector->pop();
}
}
......@@ -572,27 +576,31 @@ sub run_one_batch {
if($self->debug) {
$self->analysis->stats->print_stats;
print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n");
print "claimed ".scalar(@{$jobs})." jobs to process\n";
}
foreach my $job (@{$jobs}) {
$job->print_job if($self->debug);
$self->start_job_output_redirection($job);
$self->start_job_output_redirection($job); # switch logging into job's STDERR
eval { # capture any throw/die
$self->run_module_with_job($job);
};
my $msg_thrown = $@;
$self->stop_job_output_redirection($job);
my $msg_thrown = $@;
my $job_id = $job->dbID();
my $job_completion_line = "\njob $job_id : complete\n";
if($msg_thrown) { # record the message - whether it was a success or failure:
my $job_id = $job->dbID();
my $job_status_at_the_moment = $job->status();
my $action = $job->incomplete ? 'died' : 'exited';
warn "Job with id=$job_id $action in status '$job_status_at_the_moment' for the following reason: $msg_thrown\n";
$job_completion_line = "\njob $job_id : $action in status '$job_status_at_the_moment' for the following reason: $msg_thrown\n";
$self->db()->get_JobMessageAdaptor()->register_message($job_id, $msg_thrown, $job->incomplete );
}
print STDERR $job_completion_line if($self->worker_output_dir and ($self->debug or $job->incomplete)); # one copy goes to the job's STDERR
$self->stop_job_output_redirection($job); # and then we switch back to worker's STDERR
print STDERR $job_completion_line; # one copy goes to the worker's STDERR
if($job->incomplete) {
# If the job specifically said what to do next, respect that last wish.
# Otherwise follow the default behaviour set by the beekeeper in $worker:
......@@ -612,7 +620,6 @@ sub run_one_batch {
return $jobs_done_here;
}
} else { # job successfully completed:
$self->more_work_done;
$jobs_done_here++;
$job->update_status('DONE');
......@@ -678,11 +685,11 @@ sub run_module_with_job {
$self->{'writing_stopwatch'}->pause();
if( $job->autoflow ) {
printf("AUTOFLOW input->output\n") if($self->debug);
print STDERR "\njob ".$job->dbID." : AUTOFLOW input->output\n" if($self->debug);
$job->dataflow_output_id();
}
} else {
print("\n!!! *no* WRITE_OUTPUT and *no* AUTOFLOW\n") if($self->debug);
print STDERR "\n!!! *no* WRITE_OUTPUT requested, so there will be no AUTOFLOW\n" if($self->debug);
}
my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
......@@ -701,7 +708,7 @@ sub enter_status {
my ($self, $status, $job) = @_;
if($self->debug) {
printf("\n%s : $status\n", $job ? 'job '.$job->dbID : 'worker');
print STDERR "\n". ($job ? 'job '.$job->dbID : 'worker'). " : $status\n";
}
if($job) {
......@@ -712,52 +719,46 @@ sub enter_status {
}
sub start_job_output_redirection {
my $self = shift;
my $job = shift or return;
my $job_adaptor = $job->adaptor or return;
if( my $worker_output_dir = $self->worker_output_dir ) {
$job->stdout_file( $worker_output_dir . '/job_id_' . $job->dbID . '.out' );
$job->stderr_file( $worker_output_dir . '/job_id_' . $job->dbID . '.err' );
close STDOUT;
open STDOUT, ">".$job->stdout_file;
my ($self, $job, $worker_output_dir) = @_;
close STDERR;
open STDERR, ">".$job->stderr_file;
if(my $worker_output_dir = $self->worker_output_dir) {
$self->get_stdout_redirector->push( $job->stdout_file( $worker_output_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
$self->get_stderr_redirector->push( $job->stderr_file( $worker_output_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
$job_adaptor->store_out_files($job);
if(my $job_adaptor = $job->adaptor) {
$job_adaptor->store_out_files($job);
}
}
}
sub stop_job_output_redirection {
my $self = shift;
my $job = shift or return;
my $job_adaptor = $job->adaptor or return;
my ($self, $job) = @_;
if( $self->worker_output_dir ) {
if($self->worker_output_dir) {
$self->get_stdout_redirector->pop();
$self->get_stderr_redirector->pop();
# the following flushes $job->stderr_file and $job->stdout_file
open STDOUT, ">&WORKER_STDOUT";
open STDERR, ">&WORKER_STDERR";
my $force_cleanup = !($self->debug || $job->incomplete);
if(-z $job->stdout_file) {
if($force_cleanup or -z $job->stdout_file) {
warn "Deleting '".$job->stdout_file."' file\n";
unlink $job->stdout_file;
$job->stdout_file('');
$job->stdout_file(undef);
}
if(-z $job->stderr_file) {
if($force_cleanup or -z $job->stderr_file) {
warn "Deleting '".$job->stderr_file."' file\n";
unlink $job->stderr_file;
$job->stderr_file('');
$job->stderr_file(undef);
}
$job_adaptor->store_out_files($job);
if(my $job_adaptor = $job->adaptor) {
$job_adaptor->store_out_files($job);
}
}
}
sub _specific_job {
my $self = shift;
$self->{'_specific_job'} = shift if(@_);
......
......@@ -290,17 +290,17 @@ CREATE TABLE job_message (
-- job_id - foreign key
-- worker_id - link to worker table to define which worker claimed this job
-- retry - copy of retry_count of job as it was run
-- type - type of file e.g. STDOUT, STDERR, TMPDIR, ...
-- path - path to file or directory
-- stdout_file - path to the job's STDOUT log
-- stderr_file - path to the job's STDERR log
CREATE TABLE job_file (
job_id int(10) NOT NULL,
worker_id int(10) unsigned NOT NULL,
retry int(10) NOT NULL,
type varchar(16) NOT NULL default '',
path varchar(255) NOT NULL,
job_id int(10) NOT NULL,
retry int(10) NOT NULL,
worker_id int(10) unsigned NOT NULL,
stdout_file varchar(255),
stderr_file varchar(255),
UNIQUE KEY job_worker_type (job_id, worker_id, type),
UNIQUE KEY job_retry (job_id, retry),
INDEX worker_id (worker_id)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......
......@@ -267,17 +267,18 @@ CREATE INDEX IF NOT EXISTS job_idx ON job_message (job_id);
-- job_id - foreign key
-- worker_id - link to worker table to define which worker claimed this job
-- retry - copy of retry_count of job as it was run
-- type - type of file e.g. STDOUT, STDERR, TMPDIR, ...
-- path - path to file or directory
-- stdout_file - path to the job's STDOUT log
-- stderr_file - path to the job's STDERR log
CREATE TABLE job_file (
job_id INTEGER NOT NULL,
worker_id INTEGER NOT NULL,
retry int(10) NOT NULL,
type varchar(16) NOT NULL default '',
path varchar(255) NOT NULL
job_id INTEGER NOT NULL,
retry int(10) NOT NULL,
worker_id INTEGER NOT NULL,
stdout_file varchar(255),
stderr_file varchar(255)
);
CREATE UNIQUE INDEX IF NOT EXISTS job_worker_type_idx ON job_file (job_id, worker_id, type);
CREATE UNIQUE INDEX IF NOT EXISTS job_retry ON job_file (job_id, retry);
CREATE INDEX IF NOT EXISTS worker_idx ON job_file (worker_id);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment