Commit 01e034eb authored by Leo Gordon's avatar Leo Gordon
Browse files

record each Workers log_dir in the database; simplified the log_dir code and...

record each Workers log_dir in the database; simplified the log_dir code and renamed cmdline options accordingly
parent 9e3508cc
......@@ -64,7 +64,7 @@ use POSIX;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::Utils 'destringify'; # import 'destringify()'
use Bio::EnsEMBL::Hive::Utils ('destringify', 'dir_revhash'); # import 'destringify()' and 'dir_revhash()'
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor;
......@@ -99,11 +99,11 @@ sub create_new_worker {
my ( $meadow_type, $meadow_name, $process_id, $exec_host,
$rc_id, $rc_name, $analysis_id, $logic_name, $job_id, $input_id,
$no_write, $debug, $worker_output_dir, $hive_output_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $compile_module_once) =
$no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $compile_module_once) =
rearrange([qw(meadow_type meadow_name process_id exec_host
rc_id rc_name analysis_id logic_name job_id input_id
no_write debug worker_output_dir hive_output_dir job_limit life_span no_cleanup retry_throwing_jobs compile_module_once) ], @args);
no_write debug worker_log_dir hive_log_dir job_limit life_span no_cleanup retry_throwing_jobs compile_module_once) ], @args);
if($rc_name) {
if($rc_id) {
......@@ -142,7 +142,6 @@ sub create_new_worker {
print "creating a job outside the database\n";
$job->print_job;
$debug=1 unless(defined($debug));
$hive_output_dir='' unless(defined($hive_output_dir)); # make it defined but empty/false
} else {
die "For creating a job outside the database either -analysis_id or -logic_name must also be defined\n";
}
......@@ -211,37 +210,36 @@ sub create_new_worker {
$analysis_stats_adaptor->increase_running_workers($analysisStats->analysis_id);
}
my $sql = q{INSERT INTO worker
(born, last_check_in, meadow_type, meadow_name, process_id, host, analysis_id)
my $sql = q{INSERT INTO worker (born, last_check_in, meadow_type, meadow_name, process_id, host, analysis_id)
VALUES (CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, ?,?,?,?,?)};
my $sth = $self->prepare($sql);
$sth->execute($meadow_type, $meadow_name, $process_id, $exec_host, $analysisStats->analysis_id);
my $worker_id = $self->dbc->db_handle->last_insert_id(undef, undef, 'worker', 'worker_id');
$sth->finish;
my $sth = $self->prepare($sql);
$sth->execute($meadow_type, $meadow_name, $process_id, $exec_host, $analysisStats->analysis_id);
my $worker_id = $self->dbc->db_handle->last_insert_id(undef, undef, 'worker', 'worker_id');
$sth->finish;
my $worker = $self->fetch_by_dbID($worker_id);
$worker=undef unless($worker and $worker->analysis);
if($hive_log_dir or $worker_log_dir) {
my $dir_revhash = dir_revhash($worker_id);
$worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/" : '') .'worker_id_'.$worker_id;
$worker->init;
# Note: the following die-message will not reach the log files for circular reason!
system("mkdir -p $worker_log_dir") && die "Could not create '$worker_log_dir' because: $!";
if($worker and $analysisStats) {
$analysisStats->update_status('WORKING');
}
$worker->_specific_job($job) if($job);
$worker->execute_writes(0) if($no_write);
my $sql_add_log = "UPDATE worker SET log_dir=? WHERE worker_id=?";
my $sth_add_log = $self->prepare($sql_add_log);
$sth_add_log->execute($worker_log_dir, $worker_id);
$sth_add_log->finish;
}
my $worker = $self->fetch_by_dbID($worker_id);
$worker->init;
$worker->_specific_job($job) if($job);
$worker->execute_writes(0) if($no_write);
$worker->debug($debug) if($debug);
$worker->worker_output_dir($worker_output_dir) if(defined($worker_output_dir));
unless(defined($hive_output_dir)) {
my $arrRef = $self->db->get_MetaContainer->list_value_by_key( 'hive_output_dir' );
if( @$arrRef ) {
$hive_output_dir = destringify($arrRef->[0]);
}
}
$worker->hive_output_dir($hive_output_dir);
if($job_limit) {
$worker->job_limit($job_limit);
......@@ -260,6 +258,8 @@ sub create_new_worker {
$worker->compile_module_once($compile_module_once);
}
$analysisStats->update_status('WORKING');
return $worker;
}
......@@ -1072,6 +1072,7 @@ sub _columns {
w.last_check_in
w.died
w.cause_of_death
w.log_dir
);
}
......@@ -1099,6 +1100,8 @@ sub _objs_from_sth {
$worker->last_check_in($column{'last_check_in'});
$worker->died($column{'died'});
$worker->cause_of_death($column{'cause_of_death'});
$worker->log_dir($column{'log_dir'});
$worker->db($self->db);
if($column{'analysis_id'} and $self->db->get_AnalysisAdaptor) {
......
......@@ -77,7 +77,6 @@ use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
use Bio::EnsEMBL::Hive::Utils::RedirectStack;
use Bio::EnsEMBL::Hive::Utils ('dir_revhash'); # import dir_revhash
use base ( 'Bio::EnsEMBL::Storable', # inherit dbID(), adaptor() and new() methods
);
......@@ -360,56 +359,29 @@ sub compile_module_once {
return $self->{'_compile_module_once'} ;
}
=head2 hive_output_dir
=head2 log_dir
Arg [1] : (optional) string directory path
Title : hive_output_dir
Usage : $hive_output_dir = $self->hive_output_dir;
$self->hive_output_dir($hive_output_dir);
Description: getter/setter for the directory where STDOUT and STRERR of the hive will be redirected to.
If it is "true", each worker will create its own subdirectory in it
where each job will have its own .out and .err files.
Title : log_dir
Usage : $worker_log_dir = $self->log_dir;
$self->log_dir($worker_log_dir);
Description: getter/setter for the directory where STDOUT and STRERR of the worker will be redirected to.
In this directory each job will have its own .out and .err files.
Returntype : string
=cut
sub hive_output_dir {
my $self = shift @_;
$self->{'_hive_output_dir'} = shift @_ if(@_);
return $self->{'_hive_output_dir'};
}
sub worker_output_dir {
sub log_dir {
my $self = shift @_;
if((my $worker_output_dir = $self->{'_worker_output_dir'}) and not @_) { # no need to set, just return:
return $worker_output_dir;
} else { # let's try to set first:
if(@_) { # setter mode ignores hive_output_dir
$worker_output_dir = shift @_;
} elsif( my $hive_output_dir = $self->hive_output_dir ) {
my $worker_id = $self->dbID;
my $dir_revhash = dir_revhash($worker_id);
$worker_output_dir = join('/', $hive_output_dir, dir_revhash($worker_id), 'worker_id_'.$worker_id );
}
if($worker_output_dir) { # will not attempt to create if set to false
system("mkdir -p $worker_output_dir") && die "Could not create '$worker_output_dir' because: $!";
}
$self->{'_worker_output_dir'} = $worker_output_dir;
if(@_) {
$self->{'_log_dir'} = shift @_;
}
return $self->{'_worker_output_dir'};
return $self->{'_log_dir'};
}
sub get_stdout_redirector {
my $self = shift;
......@@ -437,10 +409,10 @@ sub print_worker {
print("\tbatch_size = ", $self->analysis->stats->get_or_estimate_batch_size(),"\n");
print("\tjob_limit = ", $self->job_limit,"\n") if(defined($self->job_limit));
print("\tlife_span = ", $self->life_span,"\n") if(defined($self->life_span));
if(my $worker_output_dir = $self->worker_output_dir) {
print("\tworker_output_dir = $worker_output_dir\n");
if(my $worker_log_dir = $self->log_dir) {
print("\tworker_log_dir = $worker_log_dir\n");
} else {
print("\tworker_output_dir = STDOUT/STDERR\n");
print("\tworker_log_dir = STDOUT/STDERR\n");
}
}
......@@ -497,9 +469,9 @@ sub run {
my $self = shift;
$self->print_worker();
if( my $worker_output_dir = $self->worker_output_dir ) {
$self->get_stdout_redirector->push( $worker_output_dir.'/worker.out' );
$self->get_stderr_redirector->push( $worker_output_dir.'/worker.err' );
if( my $worker_log_dir = $self->log_dir ) {
$self->get_stdout_redirector->push( $worker_log_dir.'/worker.out' );
$self->get_stderr_redirector->push( $worker_log_dir.'/worker.err' );
$self->print_worker();
}
......@@ -594,7 +566,7 @@ sub run {
printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count);
print("total jobs completed : ", $self->work_done, "\n");
if( $self->worker_output_dir() ) {
if( $self->log_dir() ) {
$self->get_stdout_redirector->pop();
$self->get_stderr_redirector->pop();
}
......@@ -668,7 +640,7 @@ sub run_one_batch {
$self->db->get_JobMessageAdaptor()->register_message($job_id, $msg_thrown, $job->incomplete );
}
print STDERR $job_completion_line if($self->worker_output_dir and ($self->debug or $job->incomplete)); # one copy goes to the job's STDERR
print STDERR $job_completion_line if($self->log_dir and ($self->debug or $job->incomplete)); # one copy goes to the job's STDERR
$self->stop_job_output_redirection($job); # and then we switch back to worker's STDERR
print STDERR $job_completion_line; # one copy goes to the worker's STDERR
......@@ -730,11 +702,11 @@ sub enter_status {
sub start_job_output_redirection {
my ($self, $job, $worker_output_dir) = @_;
my ($self, $job) = @_;
if(my $worker_output_dir = $self->worker_output_dir) {
$self->get_stdout_redirector->push( $job->stdout_file( $worker_output_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
$self->get_stderr_redirector->push( $job->stderr_file( $worker_output_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
if(my $worker_log_dir = $self->log_dir) {
$self->get_stdout_redirector->push( $job->stdout_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
$self->get_stderr_redirector->push( $job->stderr_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
if(my $job_adaptor = $job->adaptor) {
$job_adaptor->store_out_files($job);
......@@ -746,7 +718,7 @@ sub start_job_output_redirection {
sub stop_job_output_redirection {
my ($self, $job) = @_;
if($self->worker_output_dir) {
if($self->log_dir) {
$self->get_stdout_redirector->pop();
$self->get_stderr_redirector->pop();
......
......@@ -62,7 +62,7 @@ sub main {
$self->{'verbose_stats'} = 1;
$self->{'retry_throwing_jobs'} = undef;
$self->{'compile_module_once'} = undef;
$self->{'hive_output_dir'} = undef;
$self->{'hive_log_dir'} = undef;
GetOptions(
# connection parameters
......@@ -95,7 +95,7 @@ sub main {
'job_limit|jlimit=i' => \$self->{'job_limit'},
'life_span|lifespan=i' => \$self->{'life_span'},
'logic_name=s' => \$self->{'logic_name'},
'hive_output_dir=s' => \$self->{'hive_output_dir'},
'hive_log_dir|hive_output_dir=s' => \$self->{'hive_log_dir'},
'retry_throwing_jobs=i' => \$self->{'retry_throwing_jobs'},
'compile_module_once=i' => \$self->{'compile_module_once'},
'debug=i' => \$self->{'debug'},
......@@ -300,7 +300,7 @@ sub generate_worker_cmd {
if ($run_job_id) {
$worker_cmd .= " -job_id $run_job_id";
} else {
foreach my $worker_option ('job_limit', 'life_span', 'logic_name', 'retry_throwing_jobs', 'compile_module_once', 'hive_output_dir', 'debug') {
foreach my $worker_option ('job_limit', 'life_span', 'logic_name', 'retry_throwing_jobs', 'compile_module_once', 'hive_log_dir', 'debug') {
if(defined(my $value = $self->{$worker_option})) {
$worker_cmd .= " -${worker_option} $value";
}
......@@ -458,7 +458,7 @@ __DATA__
-logic_name <string> : restrict the pipeline stat/runs to this analysis logic_name
-retry_throwing_jobs 0|1 : if a job dies *knowingly*, should we retry it by default?
-compile_module_once 0|1 : should we compile the module only once (desired future behaviour), or pretend to do it before every job (current behaviour)?
-hive_output_dir <path> : directory where stdout/stderr of the hive is redirected
-hive_log_dir <path> : directory where stdout/stderr of the hive is redirected
-debug <debug_level> : set debug level of the workers
=head2 Other commands/options
......
......@@ -21,7 +21,7 @@ my $db_conf = {
my ($reg_conf, $reg_alias, $url); # Connection parameters
my ($rc_id, $rc_name, $analysis_id, $logic_name, $job_id, $input_id); # Task specification parameters
my ($job_limit, $life_span, $no_cleanup, $no_write, $hive_output_dir, $worker_output_dir, $retry_throwing_jobs, $compile_module_once); # Worker control parameters
my ($job_limit, $life_span, $no_cleanup, $no_write, $hive_log_dir, $worker_log_dir, $retry_throwing_jobs, $compile_module_once); # Worker control parameters
my ($help, $debug, $show_analysis_stats);
GetOptions(
......@@ -49,8 +49,8 @@ GetOptions(
'life_span|lifespan=i' => \$life_span,
'no_cleanup' => \$no_cleanup,
'no_write' => \$no_write,
'hive_output_dir|outdir=s' => \$hive_output_dir, # keep compatibility with the old name
'worker_output_dir=s' => \$worker_output_dir, # will take precedence over hive_output_dir if set
'hive_log_dir|hive_output_dir=s' => \$hive_log_dir, # keep compatibility with the old name
'worker_log_dir|worker_output_dir=s' => \$worker_log_dir, # will take precedence over hive_log_dir if set
'retry_throwing_jobs=i' => \$retry_throwing_jobs,
'compile_module_once=i' => \$compile_module_once,
......@@ -115,8 +115,8 @@ eval {
-life_span => $life_span,
-no_cleanup => $no_cleanup,
-no_write => $no_write,
-worker_output_dir => $worker_output_dir,
-hive_output_dir => $hive_output_dir,
-worker_log_dir => $worker_log_dir,
-hive_log_dir => $hive_log_dir,
-retry_throwing_jobs => $retry_throwing_jobs,
-compile_module_once => $compile_module_once,
......@@ -210,8 +210,8 @@ __DATA__
-life_span <num> : number of minutes this worker is allowed to run
-no_cleanup : don't perform temp directory cleanup when worker exits
-no_write : don't write_output or auto_dataflow input_job
-hive_output_dir <path> : directory where stdout/stderr of the whole hive of workers is redirected
-worker_output_dir <path> : directory where stdout/stderr of this particular worker is redirected
-hive_log_dir <path> : directory where stdout/stderr of the whole hive of workers is redirected
-worker_log_dir <path> : directory where stdout/stderr of this particular worker is redirected
-retry_throwing_jobs <0|1> : if a job dies *knowingly*, should we retry it by default?
-compile_module_once 0|1 : should we compile the module only once (desired future behaviour), or pretend to do it before every job (current behaviour)?
......
# Record each Worker's log_dir in worker table (as opposed to recording each Job's output and error files)
ALTER TABLE worker ADD COLUMN log_dir varchar(80) DEFAULT NULL;
......@@ -78,6 +78,7 @@ CREATE TABLE worker (
last_check_in datetime NOT NULL,
died datetime DEFAULT NULL,
cause_of_death enum('', 'NO_WORK', 'JOB_LIMIT', 'HIVE_OVERLOAD', 'LIFESPAN', 'CONTAMINATED', 'KILLED_BY_USER', 'MEMLIMIT', 'RUNLIMIT', 'FATALITY') DEFAULT '' NOT NULL,
log_dir varchar(80) DEFAULT NULL,
PRIMARY KEY (worker_id),
INDEX analysis_status (analysis_id, status)
......
......@@ -70,7 +70,9 @@ CREATE TABLE worker (
born timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_check_in datetime NOT NULL,
died datetime DEFAULT NULL,
cause_of_death TEXT DEFAULT '' NOT NULL /* enum('', 'NO_WORK', 'JOB_LIMIT', 'HIVE_OVERLOAD', 'LIFESPAN', 'CONTAMINATED', 'KILLED_BY_USER', 'MEMLIMIT', 'RUNLIMIT', 'FATALITY') DEFAULT '' NOT NULL */
cause_of_death TEXT DEFAULT '' NOT NULL, /* enum('', 'NO_WORK', 'JOB_LIMIT', 'HIVE_OVERLOAD', 'LIFESPAN', 'CONTAMINATED', 'KILLED_BY_USER', 'MEMLIMIT', 'RUNLIMIT', 'FATALITY') DEFAULT '' NOT NULL */
log_dir varchar(80) DEFAULT NULL
);
CREATE INDEX IF NOT EXISTS analysis_id_status_idx ON worker (analysis_id, status);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment