Skip to content
Snippets Groups Projects
Commit e6823db4 authored by Leo Gordon's avatar Leo Gordon
Browse files

API change: synchronize_hive now takes an obligatory $list_of_analyses

parent c9c80323
No related branches found
No related tags found
No related merge requests found
......@@ -528,24 +528,21 @@ sub fetch_overdue_workers {
=head2 synchronize_hive
Arg [1] : $filter_analysis (optional)
Example : $queen->synchronize_hive();
Description: Runs through all analyses in the system and synchronizes
the analysis_stats summary with the states in the job
and worker tables. Then follows by checking all the blocking rules
and blocks/unblocks analyses as needed.
Arg [1] : $list_of_analyses
Example : $queen->synchronize_hive( [ $analysis_A, $analysis_B ] );
Description: Runs through all analyses in the given list and synchronizes
the analysis_stats summary with the states in the job and worker tables.
Then follows by checking all the blocking rules and blocks/unblocks analyses as needed.
Exceptions : none
Caller : general
=cut
sub synchronize_hive {
my ($self, $filter_analysis) = @_;
my ($self, $list_of_analyses) = @_;
my $start_time = time();
my $list_of_analyses = $filter_analysis ? [$filter_analysis] : $self->db->get_AnalysisAdaptor->fetch_all;
print STDERR "\nSynchronizing the hive (".scalar(@$list_of_analyses)." analyses this time):\n";
foreach my $analysis (@$list_of_analyses) {
$self->synchronize_AnalysisStats($analysis->stats);
......
......@@ -47,6 +47,9 @@ use Bio::EnsEMBL::Hive::Limiter;
sub schedule_workers_resync_if_necessary {
my ($queen, $valley, $filter_analysis) = @_;
my $list_of_analyses = $filter_analysis ? [ $filter_analysis ] : $queen->db->get_AnalysisAdaptor->fetch_all();
# $queen->db->get_AnalysisAdaptor->fetch_by_logic_name( $analyses_pattern )
my $submit_capacity = $valley->config_get('SubmitWorkersMax');
my $default_meadow_type = $valley->get_default_meadow()->type;
my $meadow_capacity_limiter_hashed_by_type = $valley->get_meadow_capacity_hash_by_meadow_type();
......@@ -69,15 +72,15 @@ sub schedule_workers_resync_if_necessary {
$queen->check_for_dead_workers($valley, 1);
}
print "Scheduler: re-synchronizing the Hive...\n";
$queen->synchronize_hive($filter_analysis);
print "Scheduler: re-synchronizing...\n";
$queen->synchronize_hive( $list_of_analyses );
if( $queen->db->hive_auto_rebalance_semaphores ) { # make sure rebalancing only ever happens for the pipelines that asked for it
if( $queen->check_nothing_to_run_but_semaphored ) { # and double-check on our side
print "Scheduler: looks like we may need re-balancing semaphore_counts...\n";
if( my $rebalanced_jobs_counter = $queen->db->get_AnalysisJobAdaptor->balance_semaphores($filter_analysis && [ $filter_analysis ] ) ) {
print "Scheduler: re-balanced $rebalanced_jobs_counter jobs, going through another re-synchronization of the Hive...\n";
$queen->synchronize_hive($filter_analysis);
if( my $rebalanced_jobs_counter = $queen->db->get_AnalysisJobAdaptor->balance_semaphores( $list_of_analyses ) ) {
print "Scheduler: re-balanced $rebalanced_jobs_counter jobs, going through another re-synchronization...\n";
$queen->synchronize_hive( $list_of_analyses );
} else {
print "Scheduler: hmmm... managed to re-balance 0 jobs, you may need to investigate further.\n";
}
......
......@@ -242,12 +242,14 @@ sub main {
}
my $analysis = $run_job_id
? $self->{'dba'}->get_AnalysisAdaptor->fetch_by_dbID( $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID( $run_job_id )->analysis_id )
? $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID( $run_job_id )->analysis
: ( $self->{'logic_name'} && $self->{'dba'}->get_AnalysisAdaptor->fetch_by_logic_name($self->{'logic_name'}) );
my $list_of_analyses = $analysis ? [ $analysis ] : $self->{'dba'}->get_AnalysisAdaptor->fetch_all();
if($all_dead) { $queen->register_all_workers_dead(); }
if($check_for_dead) { $queen->check_for_dead_workers($valley, 1); }
if($balance_semaphores) { $self->{'dba'}->get_AnalysisJobAdaptor->balance_semaphores( $analysis && [ $analysis ] ); }
if($balance_semaphores) { $self->{'dba'}->get_AnalysisJobAdaptor->balance_semaphores( $list_of_analyses ); }
if ($max_loops) { # positive $max_loop means limited, negative means unlimited
......@@ -257,7 +259,7 @@ sub main {
# the output of several methods will look differently depending on $analysis being [un]defined
if($sync) {
$queen->synchronize_hive($analysis);
$queen->synchronize_hive( $list_of_analyses );
}
$queen->print_analysis_status($analysis) unless($self->{'no_analysis_stats'});
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment