diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm index 36d7b995c0b6dd2149e04220cb6f05ee1d132e47..6a3d148cf9e91cf9e3dd0b2647995af87d775d48 100755 --- a/modules/Bio/EnsEMBL/Hive/Queen.pm +++ b/modules/Bio/EnsEMBL/Hive/Queen.pm @@ -65,6 +65,7 @@ use Sys::Hostname; use Bio::EnsEMBL::Utils::Argument; use Bio::EnsEMBL::Utils::Exception; +use Bio::EnsEMBL::Hive::Utils 'destringify'; # import 'destringify()' use Bio::EnsEMBL::Hive::Worker; use Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor; @@ -96,11 +97,10 @@ use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor'); sub create_new_worker { my ($self, @args) = @_; - my ($rc_id, $analysis_id, $beekeeper ,$pid, $job, $no_write) = - rearrange([qw(rc_id analysis_id beekeeper process_id job no_write) ], @args); + my ($rc_id, $analysis_id, $beekeeper ,$process_id, $job, $no_write, $debug, $worker_output_dir, $hive_output_dir, $batch_size, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs) = + rearrange([qw(rc_id analysis_id beekeeper process_id job no_write debug worker_output_dir hive_output_dir batch_size job_limit life_span no_cleanup retry_throwing_jobs) ], @args); - my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor; - return undef unless($analStatsDBA); + my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor or return undef; $analysis_id = $job->analysis_id if(defined($job)); @@ -133,7 +133,7 @@ sub create_new_worker { } my $host = hostname; - $pid = $$ unless($pid); + $process_id ||= $$; $beekeeper = '' unless($beekeeper); my $sql = q{INSERT INTO hive @@ -141,7 +141,7 @@ sub create_new_worker { VALUES (NOW(), NOW(), ?,?,?,?)}; my $sth = $self->prepare($sql); - $sth->execute($pid, $analysisStats->analysis_id, $beekeeper, $host); + $sth->execute($process_id, $analysisStats->analysis_id, $beekeeper, $host); my $worker_id = $sth->{'mysql_insertid'}; $sth->finish; @@ -152,9 +152,37 @@ sub create_new_worker { $analysisStats->update_status('WORKING'); } - $worker->_specific_job($job) if(defined($job)); + $worker->_specific_job($job) if($job); $worker->execute_writes(0) if($no_write); - + + $worker->debug($debug) if($debug); + $worker->worker_output_dir($worker_output_dir) if(defined($worker_output_dir)); + + unless(defined($hive_output_dir)) { + my $arrRef = $self->db->get_MetaContainer->list_value_by_key( 'hive_output_dir' ); + if( @$arrRef ) { + $hive_output_dir = destringify($arrRef->[0]); + } + } + $worker->hive_output_dir($hive_output_dir); + + if($batch_size) { + $worker->set_worker_batch_size($batch_size); + } + if($job_limit) { + $worker->job_limit($job_limit); + $worker->life_span(0); + } + if($life_span) { + $worker->life_span($life_span * 60); + } + if($no_cleanup) { + $worker->perform_cleanup(0); + } + if(defined $retry_throwing_jobs) { + $worker->retry_throwing_jobs($retry_throwing_jobs); + } + return $worker; } @@ -211,13 +239,13 @@ sub check_for_dead_workers { # a bit counter-intuitively only looks for curre foreach my $worker (@$queen_worker_list) { next unless($meadow->responsible_for_worker($worker)); - my $worker_pid = $worker->process_id(); - if(my $status = $worker_status_hash->{$worker_pid}) { # can be RUN|PEND|xSUSP + my $process_id = $worker->process_id(); + if(my $status = $worker_status_hash->{$process_id}) { # can be RUN|PEND|xSUSP $worker_status_summary{$status}++; } else { $worker_status_summary{'AWOL'}++; - $gc_wpid_to_worker{$worker_pid} = $worker; + $gc_wpid_to_worker{$process_id} = $worker; } } print "\t".join(', ', map { "$_:$worker_status_summary{$_}" } keys %worker_status_summary)."\n\n"; @@ -233,8 +261,8 @@ sub check_for_dead_workers { # a bit counter-intuitively only looks for curre } warn "GarbageCollector: Releasing the jobs\n"; - while(my ($worker_pid, $worker) = each %gc_wpid_to_worker) { - $worker->cause_of_death( $wpid_to_cod->{$worker_pid} || 'FATALITY'); + while(my ($process_id, $worker) = each %gc_wpid_to_worker) { + $worker->cause_of_death( $wpid_to_cod->{$process_id} || 'FATALITY'); $self->register_worker_death($worker); } } diff --git a/scripts/beekeeper.pl b/scripts/beekeeper.pl index a8b4d936b49afeba03cc92009a11c0c8b7700b70..42e791c0f477a0e4344ee7e94f6e444c926005f0 100755 --- a/scripts/beekeeper.pl +++ b/scripts/beekeeper.pl @@ -91,9 +91,9 @@ sub main { 'meadow_options|lsf_options=s' => \$meadow_options, # 'lsf_options' is deprecated (please investigate the resource requirements, they may suit your needs way better) # worker control - 'job_limit|jlimit=i' => \$self->{'job_limit'}, - 'batch_size=i' => \$self->{'batch_size'}, - 'lifespan=i' => \$self->{'lifespan'}, + 'job_limit|jlimit=i' => \$self->{'job_limit'}, + 'batch_size=i' => \$self->{'batch_size'}, + 'life_span|lifespan=i' => \$self->{'life_span'}, 'logic_name=s' => \$self->{'logic_name'}, 'hive_output_dir=s' => \$self->{'hive_output_dir'}, 'maximise_concurrency=i' => \$self->{'maximise_concurrency'}, @@ -322,7 +322,7 @@ sub generate_worker_cmd { if ($self->{'run_job_id'}) { $worker_cmd .= " -job_id ".$self->{'run_job_id'}; } else { - foreach my $worker_option ('batch_size', 'job_limit', 'lifespan', 'logic_name', 'maximize_concurrency', 'retry_throwing_jobs', 'hive_output_dir') { + foreach my $worker_option ('batch_size', 'job_limit', 'life_span', 'logic_name', 'maximize_concurrency', 'retry_throwing_jobs', 'hive_output_dir') { if(defined(my $value = $self->{$worker_option})) { $worker_cmd .= " -${worker_option} $value"; } @@ -505,7 +505,7 @@ __DATA__ -job_limit <num> : #jobs to run before worker can die naturally -batch_size <num> : #jobs a worker can claim at once - -lifespan <num> : lifespan limit for each worker + -life_span <num> : life_span limit for each worker -logic_name <string> : restrict the pipeline stat/runs to this analysis logic_name -maximise_concurrency 1 : try to run more different analyses at the same time -retry_throwing_jobs 0|1 : if a job dies *knowingly*, should we retry it by default? diff --git a/scripts/runWorker.pl b/scripts/runWorker.pl index c8711d45ff3c29e436dc4d4d3df407996bfda0c5..72806532e874a2bc73e34f46c044904ff2b1333c 100755 --- a/scripts/runWorker.pl +++ b/scripts/runWorker.pl @@ -7,7 +7,6 @@ use Getopt::Long; use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor; use Bio::EnsEMBL::Hive::Worker; use Bio::EnsEMBL::Hive::Queen; -use Bio::EnsEMBL::Hive::Utils 'destringify'; # import 'destringify()' use Bio::EnsEMBL::Registry; use Bio::EnsEMBL::Hive::Meadow::LSF; @@ -15,7 +14,7 @@ use Bio::EnsEMBL::Hive::Meadow::LOCAL; Bio::EnsEMBL::Registry->no_version_check(1); -# ok this is a hack, but I'm going to pretend I've got an object here +# I'm going to pretend I've got an object here # by creating a blessed hash ref and passing it around like an object # this is to avoid using global variables in functions, and to consolidate # the globals into a nice '$self' package @@ -68,11 +67,9 @@ GetOptions( 'logic_name=s' => \$self->{'logic_name'}, 'batch_size=i' => \$self->{'batch_size'}, 'job_limit|limit=i' => \$self->{'job_limit'}, - 'lifespan=i' => \$self->{'lifespan'}, + 'life_span|lifespan=i' => \$self->{'life_span'}, 'hive_output_dir|outdir=s' => \$self->{'hive_output_dir'}, # keep compatibility with the old name 'worker_output_dir=s' => \$self->{'worker_output_dir'}, # will take precedence over hive_output_dir if set - 'bk=s' => \$self->{'beekeeper'}, # deprecated and ignored - 'pid=s' => \$self->{'process_id'}, 'input_id=s' => \$self->{'input_id'}, 'no_cleanup' => \$self->{'no_cleanup'}, 'analysis_stats' => \$self->{'show_analysis_stats'}, @@ -116,19 +113,17 @@ unless($DBA and $DBA->isa("Bio::EnsEMBL::Hive::DBSQL::DBAdaptor")) { my $queen = $DBA->get_Queen(); $queen->{maximise_concurrency} = 1 if ($self->{maximise_concurrency}); -unless($self->{'process_id'}) { # do we really need this confusing feature - to be able to set the process_id externally? - eval { - $self->{'process_id'} = Bio::EnsEMBL::Hive::Meadow::LSF->get_current_worker_process_id(); - }; - if($@) { - $self->{'process_id'} = Bio::EnsEMBL::Hive::Meadow::LOCAL->get_current_worker_process_id(); - $self->{'beekeeper'} = 'LOCAL'; - } else { - $self->{'beekeeper'} = 'LSF'; - } +eval { + $self->{'process_id'} = Bio::EnsEMBL::Hive::Meadow::LSF->get_current_worker_process_id(); +}; +if($@) { + $self->{'process_id'} = Bio::EnsEMBL::Hive::Meadow::LOCAL->get_current_worker_process_id(); + $self->{'beekeeper'} = 'LOCAL'; +} else { + $self->{'beekeeper'} = 'LSF'; } -print("pid = ", $self->{'process_id'}, "\n") if($self->{'process_id'}); +print("process_id = ", $self->{'process_id'}, "\n") if($self->{'process_id'}); if($self->{'logic_name'}) { my $analysis = $queen->db->get_AnalysisAdaptor->fetch_by_logic_name($self->{'logic_name'}); @@ -159,50 +154,27 @@ if($self->{'job_id'}) { } my $worker = $queen->create_new_worker( - -rc_id => $self->{'rc_id'}, - -analysis_id => $self->{'analysis_id'}, - -beekeeper => $self->{'beekeeper'}, - -process_id => $self->{'process_id'}, - -job => $self->{'analysis_job'}, - -no_write => $self->{'no_write'}, - ); + -rc_id => $self->{'rc_id'}, + -analysis_id => $self->{'analysis_id'}, + -beekeeper => $self->{'beekeeper'}, + -process_id => $self->{'process_id'}, + -job => $self->{'analysis_job'}, + -no_write => $self->{'no_write'}, + -debug => $self->{'debug'}, + -batch_size => $self->{'batch_size'}, + -job_limit => $self->{'job_limit'}, + -life_span => $self->{'life_span'}, + -no_cleanup => $self->{'no_cleanup'}, + -worker_output_dir => $self->{'worker_output_dir'}, + -hive_output_dir => $self->{'hive_output_dir'}, + -retry_throwing_jobs => $self->{'retry_throwing_jobs'}, +); unless($worker) { $queen->print_analysis_status if($self->{'show_analysis_stats'}); print("\n=== COULDN'T CREATE WORKER ===\n"); exit(1); } -$worker->debug($self->{'debug'}) if($self->{'debug'}); - -if(defined($self->{'worker_output_dir'})) { - $worker->worker_output_dir($self->{'worker_output_dir'}); -} - -unless(defined($self->{'hive_output_dir'})) { - my $arrRef = $DBA->get_MetaContainer->list_value_by_key( 'hive_output_dir' ); - if( @$arrRef ) { - $self->{'hive_output_dir'} = destringify($arrRef->[0]); - } -} -$worker->hive_output_dir($self->{'hive_output_dir'}); - -if($self->{'batch_size'}) { - $worker->set_worker_batch_size($self->{'batch_size'}); -} -if($self->{'job_limit'}) { - $worker->job_limit($self->{'job_limit'}); - $worker->life_span(0); -} -if($self->{'lifespan'}) { - $worker->life_span($self->{'lifespan'} * 60); -} -if($self->{'no_cleanup'}) { - $worker->perform_cleanup(0); -} -if(defined $self->{'retry_throwing_jobs'}) { - $worker->retry_throwing_jobs($self->{'retry_throwing_jobs'}); -} - $worker->print_worker(); $worker->run(); @@ -302,11 +274,9 @@ __DATA__ -logic_name <string> : logic_name of analysis to make this worker -batch_size <num> : #jobs to claim at a time -job_limit <num> : #jobs to run before worker can die naturally - -lifespan <num> : number of minutes this worker is allowed to run + -life_span <num> : number of minutes this worker is allowed to run -hive_output_dir <path> : directory where stdout/stderr of the whole hive of workers is redirected -worker_output_dir <path> : directory where stdout/stderr of this particular worker is redirected - -bk <string> : beekeeper identifier (deprecated and ignored) - -pid <string> : externally set process_id descriptor (e.g. lsf job_id, array_id) -input_id <string> : test input_id on specified analysis (analysis_id or logic_name) -job_id <id> : run specific job defined by analysis_job_id -analysis_stats : show status of each analysis in hive