diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm index f9a216a7564fc0c7ce7cc850088353cc7333ee37..88774d006bde799a0409304562be50f6e50e6543 100644 --- a/modules/Bio/EnsEMBL/Hive/Queen.pm +++ b/modules/Bio/EnsEMBL/Hive/Queen.pm @@ -272,9 +272,8 @@ sub specialize_worker { } } # probably scheduled by beekeeper.pl: - } elsif( $stats = Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_a_worker($worker, $analyses_pattern) ) { + } elsif( $analysis = Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_a_worker($worker, $analyses_pattern) ) { - $analysis = $stats->analysis; } else { $worker->cause_of_death('NO_ROLE'); die "No analysis suitable for the worker was found\n"; diff --git a/modules/Bio/EnsEMBL/Hive/Scheduler.pm b/modules/Bio/EnsEMBL/Hive/Scheduler.pm index 714a59e9aee39aa2fdacd177894f03da872b0d77..28e174f7b509882f0bb8b4e2690e6076a7733a41 100644 --- a/modules/Bio/EnsEMBL/Hive/Scheduler.pm +++ b/modules/Bio/EnsEMBL/Hive/Scheduler.pm @@ -56,7 +56,7 @@ sub schedule_workers_resync_if_necessary { # combined mapping: my $analysis_id2rc_name = { map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id }; - my ($workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer) + my ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer) = schedule_workers($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name); print join("\n", @$log_buffer, ''); @@ -88,7 +88,7 @@ sub schedule_workers_resync_if_necessary { print "Scheduler: automatic re-balancing of semaphore_counts is off by default. If you think your pipeline might benefit from it, set hive_auto_rebalance_semaphores => 1 in the PipeConfig's hive_meta_table.\n"; } - ($workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer) + ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer) = schedule_workers($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name); print join("\n", @$log_buffer, ''); } @@ -136,90 +136,96 @@ sub suggest_analysis_to_specialize_a_worker { # the corresponding checks should be added here @{ $queen->db->get_AnalysisAdaptor->fetch_all_by_pattern( $analyses_pattern ) }; - return schedule_workers( $queen, 1, $worker_meadow_type, \@list_of_analyses ); + my ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer) + = schedule_workers( $queen, 1, $worker_meadow_type, \@list_of_analyses ); + + # take the first analysis from the "plan" if the "plan" was not empty: + return scalar(@$workers_to_submit_by_analysis) && $workers_to_submit_by_analysis->[0][0]; } sub schedule_workers { my ($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name) = @_; - my @stats_sorted_by_suitability = @{ Bio::EnsEMBL::Hive::Scheduler::sort_stats_by_suitability( $list_of_analyses ) }; - - unless(@stats_sorted_by_suitability) { - return $analysis_id2rc_name ? ({}, 0, "Scheduler could not find any suitable analyses to start with\n") : undef; # FIXME: returns data in different format in "suggest analysis" mode - } - - # the pre-pending-adjusted outcome will be stored here: - my %workers_to_submit_by_meadow_type_rc_name = (); + my @workers_to_submit_by_analysis = (); # The down-to-analysis "plan" that may completely change by the time the Workers are born and specialized + my %workers_to_submit_by_meadow_type_rc_name = (); # Pre-pending-adjusted per-resource breakout my $total_extra_workers_required = 0; my @log_buffer = (); - my $submit_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers scheduled this time', $submit_capacity ); - my $queen_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->db->get_RoleAdaptor->get_hive_current_load() ); + if( my @stats_sorted_by_suitability = @{ Bio::EnsEMBL::Hive::Scheduler::sort_stats_by_suitability( $list_of_analyses ) } ) { - foreach my $analysis_stats (@stats_sorted_by_suitability) { - last if( $submit_capacity_limiter->reached ); + my $submit_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers scheduled this time', $submit_capacity ); + my $queen_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->db->get_RoleAdaptor->get_hive_current_load() ); - my $analysis = $analysis_stats->analysis(); # FIXME: if it proves too expensive we may need to consider caching - my $this_meadow_type = $analysis->meadow_type || $default_meadow_type; + foreach my $analysis_stats (@stats_sorted_by_suitability) { + last if( $submit_capacity_limiter->reached ); - next if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached ); + my $analysis = $analysis_stats->analysis(); # FIXME: if it proves too expensive we may need to consider caching + my $this_meadow_type = $analysis->meadow_type || $default_meadow_type; - #digging deeper under the surface so need to sync: - if( $analysis_stats->status =~ /^(LOADING|ALL_CLAIMED|BLOCKED|SYNCHING)$/ ) { - $queen->safe_synchronize_AnalysisStats($analysis_stats); - } - next if( $analysis_stats->status =~ /^(BLOCKED|SYNCHING)$/ ); - - # getting the initial worker requirement for this analysis (may be stale if not sync'ed recently) - my $extra_workers_this_analysis = $analysis_stats->num_required_workers; - - # if this analysis doesn't require any extra workers - just skip it: - next if ($extra_workers_this_analysis <= 0); - - $total_extra_workers_required += $extra_workers_this_analysis; # also keep the total number required so far (if nothing required we may need a resync later) - - # setting up all negotiating limiters: - $queen_capacity_limiter->multiplier( $analysis_stats->hive_capacity ); - my @limiters = ( - $submit_capacity_limiter, - $queen_capacity_limiter, - $meadow_capacity_limiter_hashed_by_type - ? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type} - : (), - defined($analysis->analysis_capacity) - ? Bio::EnsEMBL::Hive::Limiter->new( "Number of Workers working at '".$analysis->logic_name."' analysis", - $analysis->analysis_capacity - $analysis_stats->num_running_workers ) - : (), - ); - - # negotiations: - foreach my $limiter (@limiters) { - $extra_workers_this_analysis = $limiter->preliminary_offer( $extra_workers_this_analysis ); - } + next if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached ); - # do not continue with this analysis if limiters haven't agreed on a positive number: - next if ($extra_workers_this_analysis <= 0); + #digging deeper under the surface so need to sync: + if( $analysis_stats->status =~ /^(LOADING|ALL_CLAIMED|BLOCKED|SYNCHING)$/ ) { + $queen->safe_synchronize_AnalysisStats($analysis_stats); + } + next if( $analysis_stats->status =~ /^(BLOCKED|SYNCHING)$/ ); + + # getting the initial worker requirement for this analysis (may be stale if not sync'ed recently) + my $extra_workers_this_analysis = $analysis_stats->num_required_workers; + + # if this analysis doesn't require any extra workers - just skip it: + next if ($extra_workers_this_analysis <= 0); + + $total_extra_workers_required += $extra_workers_this_analysis; # also keep the total number required so far (if nothing required we may need a resync later) + + # setting up all negotiating limiters: + $queen_capacity_limiter->multiplier( $analysis_stats->hive_capacity ); + my @limiters = ( + $submit_capacity_limiter, + $queen_capacity_limiter, + $meadow_capacity_limiter_hashed_by_type + ? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type} + : (), + defined($analysis->analysis_capacity) + ? Bio::EnsEMBL::Hive::Limiter->new( "Number of Workers working at '".$analysis->logic_name."' analysis", + $analysis->analysis_capacity - $analysis_stats->num_running_workers ) + : (), + ); + + # negotiations: + foreach my $limiter (@limiters) { + $extra_workers_this_analysis = $limiter->preliminary_offer( $extra_workers_this_analysis ); + } - # let all parties know the final decision of negotiations: - foreach my $limiter (@limiters) { - $limiter->final_decision( $extra_workers_this_analysis ); - } + # do not continue with this analysis if limiters haven't agreed on a positive number: + next if ($extra_workers_this_analysis <= 0); - if($analysis_id2rc_name) { - my $this_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id }; - $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis; + # let all parties know the final decision of negotiations: + foreach my $limiter (@limiters) { + $limiter->final_decision( $extra_workers_this_analysis ); + } + + push @workers_to_submit_by_analysis, [ $analysis, $extra_workers_this_analysis]; push @log_buffer, $analysis_stats->toString; - push @log_buffer, sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n", - $analysis->logic_name, - $queen_capacity_limiter->available_capacity, - ); - } else { - return $analysis_stats; # FIXME: returns data in different format in "suggest analysis" mode - } + + if($analysis_id2rc_name) { + my $this_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id }; + $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis; + push @log_buffer, sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n", + $analysis->logic_name, + $queen_capacity_limiter->available_capacity, + ); + } + + } # /foreach my $analysis_stats (@stats_sorted_by_suitability) + + } else { + + push @log_buffer, "Scheduler could not find any suitable analyses to start with"; } - return (\%workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, \@log_buffer); + return (\@workers_to_submit_by_analysis, \%workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, \@log_buffer); }