Commit e108b22e authored by Leo Gordon's avatar Leo Gordon
Browse files

more cleanup and shuffling around of the code

parent 140d7389
......@@ -65,6 +65,7 @@ use Sys::Hostname;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::Utils 'destringify'; # import 'destringify()'
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor;
......@@ -96,11 +97,10 @@ use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
sub create_new_worker {
my ($self, @args) = @_;
my ($rc_id, $analysis_id, $beekeeper ,$pid, $job, $no_write) =
rearrange([qw(rc_id analysis_id beekeeper process_id job no_write) ], @args);
my ($rc_id, $analysis_id, $beekeeper ,$process_id, $job, $no_write, $debug, $worker_output_dir, $hive_output_dir, $batch_size, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs) =
rearrange([qw(rc_id analysis_id beekeeper process_id job no_write debug worker_output_dir hive_output_dir batch_size job_limit life_span no_cleanup retry_throwing_jobs) ], @args);
my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor;
return undef unless($analStatsDBA);
my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor or return undef;
$analysis_id = $job->analysis_id if(defined($job));
......@@ -133,7 +133,7 @@ sub create_new_worker {
}
my $host = hostname;
$pid = $$ unless($pid);
$process_id ||= $$;
$beekeeper = '' unless($beekeeper);
my $sql = q{INSERT INTO hive
......@@ -141,7 +141,7 @@ sub create_new_worker {
VALUES (NOW(), NOW(), ?,?,?,?)};
my $sth = $self->prepare($sql);
$sth->execute($pid, $analysisStats->analysis_id, $beekeeper, $host);
$sth->execute($process_id, $analysisStats->analysis_id, $beekeeper, $host);
my $worker_id = $sth->{'mysql_insertid'};
$sth->finish;
......@@ -152,9 +152,37 @@ sub create_new_worker {
$analysisStats->update_status('WORKING');
}
$worker->_specific_job($job) if(defined($job));
$worker->_specific_job($job) if($job);
$worker->execute_writes(0) if($no_write);
$worker->debug($debug) if($debug);
$worker->worker_output_dir($worker_output_dir) if(defined($worker_output_dir));
unless(defined($hive_output_dir)) {
my $arrRef = $self->db->get_MetaContainer->list_value_by_key( 'hive_output_dir' );
if( @$arrRef ) {
$hive_output_dir = destringify($arrRef->[0]);
}
}
$worker->hive_output_dir($hive_output_dir);
if($batch_size) {
$worker->set_worker_batch_size($batch_size);
}
if($job_limit) {
$worker->job_limit($job_limit);
$worker->life_span(0);
}
if($life_span) {
$worker->life_span($life_span * 60);
}
if($no_cleanup) {
$worker->perform_cleanup(0);
}
if(defined $retry_throwing_jobs) {
$worker->retry_throwing_jobs($retry_throwing_jobs);
}
return $worker;
}
......@@ -211,13 +239,13 @@ sub check_for_dead_workers { # a bit counter-intuitively only looks for curre
foreach my $worker (@$queen_worker_list) {
next unless($meadow->responsible_for_worker($worker));
my $worker_pid = $worker->process_id();
if(my $status = $worker_status_hash->{$worker_pid}) { # can be RUN|PEND|xSUSP
my $process_id = $worker->process_id();
if(my $status = $worker_status_hash->{$process_id}) { # can be RUN|PEND|xSUSP
$worker_status_summary{$status}++;
} else {
$worker_status_summary{'AWOL'}++;
$gc_wpid_to_worker{$worker_pid} = $worker;
$gc_wpid_to_worker{$process_id} = $worker;
}
}
print "\t".join(', ', map { "$_:$worker_status_summary{$_}" } keys %worker_status_summary)."\n\n";
......@@ -233,8 +261,8 @@ sub check_for_dead_workers { # a bit counter-intuitively only looks for curre
}
warn "GarbageCollector: Releasing the jobs\n";
while(my ($worker_pid, $worker) = each %gc_wpid_to_worker) {
$worker->cause_of_death( $wpid_to_cod->{$worker_pid} || 'FATALITY');
while(my ($process_id, $worker) = each %gc_wpid_to_worker) {
$worker->cause_of_death( $wpid_to_cod->{$process_id} || 'FATALITY');
$self->register_worker_death($worker);
}
}
......
......@@ -91,9 +91,9 @@ sub main {
'meadow_options|lsf_options=s' => \$meadow_options, # 'lsf_options' is deprecated (please investigate the resource requirements, they may suit your needs way better)
# worker control
'job_limit|jlimit=i' => \$self->{'job_limit'},
'batch_size=i' => \$self->{'batch_size'},
'lifespan=i' => \$self->{'lifespan'},
'job_limit|jlimit=i' => \$self->{'job_limit'},
'batch_size=i' => \$self->{'batch_size'},
'life_span|lifespan=i' => \$self->{'life_span'},
'logic_name=s' => \$self->{'logic_name'},
'hive_output_dir=s' => \$self->{'hive_output_dir'},
'maximise_concurrency=i' => \$self->{'maximise_concurrency'},
......@@ -322,7 +322,7 @@ sub generate_worker_cmd {
if ($self->{'run_job_id'}) {
$worker_cmd .= " -job_id ".$self->{'run_job_id'};
} else {
foreach my $worker_option ('batch_size', 'job_limit', 'lifespan', 'logic_name', 'maximize_concurrency', 'retry_throwing_jobs', 'hive_output_dir') {
foreach my $worker_option ('batch_size', 'job_limit', 'life_span', 'logic_name', 'maximize_concurrency', 'retry_throwing_jobs', 'hive_output_dir') {
if(defined(my $value = $self->{$worker_option})) {
$worker_cmd .= " -${worker_option} $value";
}
......@@ -505,7 +505,7 @@ __DATA__
-job_limit <num> : #jobs to run before worker can die naturally
-batch_size <num> : #jobs a worker can claim at once
-lifespan <num> : lifespan limit for each worker
-life_span <num> : life_span limit for each worker
-logic_name <string> : restrict the pipeline stat/runs to this analysis logic_name
-maximise_concurrency 1 : try to run more different analyses at the same time
-retry_throwing_jobs 0|1 : if a job dies *knowingly*, should we retry it by default?
......
......@@ -7,7 +7,6 @@ use Getopt::Long;
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::Queen;
use Bio::EnsEMBL::Hive::Utils 'destringify'; # import 'destringify()'
use Bio::EnsEMBL::Registry;
use Bio::EnsEMBL::Hive::Meadow::LSF;
......@@ -15,7 +14,7 @@ use Bio::EnsEMBL::Hive::Meadow::LOCAL;
Bio::EnsEMBL::Registry->no_version_check(1);
# ok this is a hack, but I'm going to pretend I've got an object here
# I'm going to pretend I've got an object here
# by creating a blessed hash ref and passing it around like an object
# this is to avoid using global variables in functions, and to consolidate
# the globals into a nice '$self' package
......@@ -68,11 +67,9 @@ GetOptions(
'logic_name=s' => \$self->{'logic_name'},
'batch_size=i' => \$self->{'batch_size'},
'job_limit|limit=i' => \$self->{'job_limit'},
'lifespan=i' => \$self->{'lifespan'},
'life_span|lifespan=i' => \$self->{'life_span'},
'hive_output_dir|outdir=s' => \$self->{'hive_output_dir'}, # keep compatibility with the old name
'worker_output_dir=s' => \$self->{'worker_output_dir'}, # will take precedence over hive_output_dir if set
'bk=s' => \$self->{'beekeeper'}, # deprecated and ignored
'pid=s' => \$self->{'process_id'},
'input_id=s' => \$self->{'input_id'},
'no_cleanup' => \$self->{'no_cleanup'},
'analysis_stats' => \$self->{'show_analysis_stats'},
......@@ -116,19 +113,17 @@ unless($DBA and $DBA->isa("Bio::EnsEMBL::Hive::DBSQL::DBAdaptor")) {
my $queen = $DBA->get_Queen();
$queen->{maximise_concurrency} = 1 if ($self->{maximise_concurrency});
unless($self->{'process_id'}) { # do we really need this confusing feature - to be able to set the process_id externally?
eval {
$self->{'process_id'} = Bio::EnsEMBL::Hive::Meadow::LSF->get_current_worker_process_id();
};
if($@) {
$self->{'process_id'} = Bio::EnsEMBL::Hive::Meadow::LOCAL->get_current_worker_process_id();
$self->{'beekeeper'} = 'LOCAL';
} else {
$self->{'beekeeper'} = 'LSF';
}
eval {
$self->{'process_id'} = Bio::EnsEMBL::Hive::Meadow::LSF->get_current_worker_process_id();
};
if($@) {
$self->{'process_id'} = Bio::EnsEMBL::Hive::Meadow::LOCAL->get_current_worker_process_id();
$self->{'beekeeper'} = 'LOCAL';
} else {
$self->{'beekeeper'} = 'LSF';
}
print("pid = ", $self->{'process_id'}, "\n") if($self->{'process_id'});
print("process_id = ", $self->{'process_id'}, "\n") if($self->{'process_id'});
if($self->{'logic_name'}) {
my $analysis = $queen->db->get_AnalysisAdaptor->fetch_by_logic_name($self->{'logic_name'});
......@@ -159,50 +154,27 @@ if($self->{'job_id'}) {
}
my $worker = $queen->create_new_worker(
-rc_id => $self->{'rc_id'},
-analysis_id => $self->{'analysis_id'},
-beekeeper => $self->{'beekeeper'},
-process_id => $self->{'process_id'},
-job => $self->{'analysis_job'},
-no_write => $self->{'no_write'},
);
-rc_id => $self->{'rc_id'},
-analysis_id => $self->{'analysis_id'},
-beekeeper => $self->{'beekeeper'},
-process_id => $self->{'process_id'},
-job => $self->{'analysis_job'},
-no_write => $self->{'no_write'},
-debug => $self->{'debug'},
-batch_size => $self->{'batch_size'},
-job_limit => $self->{'job_limit'},
-life_span => $self->{'life_span'},
-no_cleanup => $self->{'no_cleanup'},
-worker_output_dir => $self->{'worker_output_dir'},
-hive_output_dir => $self->{'hive_output_dir'},
-retry_throwing_jobs => $self->{'retry_throwing_jobs'},
);
unless($worker) {
$queen->print_analysis_status if($self->{'show_analysis_stats'});
print("\n=== COULDN'T CREATE WORKER ===\n");
exit(1);
}
$worker->debug($self->{'debug'}) if($self->{'debug'});
if(defined($self->{'worker_output_dir'})) {
$worker->worker_output_dir($self->{'worker_output_dir'});
}
unless(defined($self->{'hive_output_dir'})) {
my $arrRef = $DBA->get_MetaContainer->list_value_by_key( 'hive_output_dir' );
if( @$arrRef ) {
$self->{'hive_output_dir'} = destringify($arrRef->[0]);
}
}
$worker->hive_output_dir($self->{'hive_output_dir'});
if($self->{'batch_size'}) {
$worker->set_worker_batch_size($self->{'batch_size'});
}
if($self->{'job_limit'}) {
$worker->job_limit($self->{'job_limit'});
$worker->life_span(0);
}
if($self->{'lifespan'}) {
$worker->life_span($self->{'lifespan'} * 60);
}
if($self->{'no_cleanup'}) {
$worker->perform_cleanup(0);
}
if(defined $self->{'retry_throwing_jobs'}) {
$worker->retry_throwing_jobs($self->{'retry_throwing_jobs'});
}
$worker->print_worker();
$worker->run();
......@@ -302,11 +274,9 @@ __DATA__
-logic_name <string> : logic_name of analysis to make this worker
-batch_size <num> : #jobs to claim at a time
-job_limit <num> : #jobs to run before worker can die naturally
-lifespan <num> : number of minutes this worker is allowed to run
-life_span <num> : number of minutes this worker is allowed to run
-hive_output_dir <path> : directory where stdout/stderr of the whole hive of workers is redirected
-worker_output_dir <path> : directory where stdout/stderr of this particular worker is redirected
-bk <string> : beekeeper identifier (deprecated and ignored)
-pid <string> : externally set process_id descriptor (e.g. lsf job_id, array_id)
-input_id <string> : test input_id on specified analysis (analysis_id or logic_name)
-job_id <id> : run specific job defined by analysis_job_id
-analysis_stats : show status of each analysis in hive
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment