Skip to content
Snippets Groups Projects
Commit b522a5ff authored by Leo Gordon's avatar Leo Gordon
Browse files

API change: schedule_workers() now also returns the "plan" used by...

API change: schedule_workers() now also returns the "plan" used by suggest_analysis_to_specialize_a_worker(); only one return format for schedule_workers().
parent 6451e1d3
No related branches found
No related tags found
No related merge requests found
......@@ -272,9 +272,8 @@ sub specialize_worker {
}
}
# probably scheduled by beekeeper.pl:
} elsif( $stats = Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_a_worker($worker, $analyses_pattern) ) {
} elsif( $analysis = Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_a_worker($worker, $analyses_pattern) ) {
$analysis = $stats->analysis;
} else {
$worker->cause_of_death('NO_ROLE');
die "No analysis suitable for the worker was found\n";
......
......@@ -56,7 +56,7 @@ sub schedule_workers_resync_if_necessary {
# combined mapping:
my $analysis_id2rc_name = { map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id };
my ($workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
my ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
= schedule_workers($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
print join("\n", @$log_buffer, '');
......@@ -88,7 +88,7 @@ sub schedule_workers_resync_if_necessary {
print "Scheduler: automatic re-balancing of semaphore_counts is off by default. If you think your pipeline might benefit from it, set hive_auto_rebalance_semaphores => 1 in the PipeConfig's hive_meta_table.\n";
}
($workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
= schedule_workers($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
print join("\n", @$log_buffer, '');
}
......@@ -136,90 +136,96 @@ sub suggest_analysis_to_specialize_a_worker {
# the corresponding checks should be added here
@{ $queen->db->get_AnalysisAdaptor->fetch_all_by_pattern( $analyses_pattern ) };
return schedule_workers( $queen, 1, $worker_meadow_type, \@list_of_analyses );
my ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
= schedule_workers( $queen, 1, $worker_meadow_type, \@list_of_analyses );
# take the first analysis from the "plan" if the "plan" was not empty:
return scalar(@$workers_to_submit_by_analysis) && $workers_to_submit_by_analysis->[0][0];
}
sub schedule_workers {
my ($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name) = @_;
my @stats_sorted_by_suitability = @{ Bio::EnsEMBL::Hive::Scheduler::sort_stats_by_suitability( $list_of_analyses ) };
unless(@stats_sorted_by_suitability) {
return $analysis_id2rc_name ? ({}, 0, "Scheduler could not find any suitable analyses to start with\n") : undef; # FIXME: returns data in different format in "suggest analysis" mode
}
# the pre-pending-adjusted outcome will be stored here:
my %workers_to_submit_by_meadow_type_rc_name = ();
my @workers_to_submit_by_analysis = (); # The down-to-analysis "plan" that may completely change by the time the Workers are born and specialized
my %workers_to_submit_by_meadow_type_rc_name = (); # Pre-pending-adjusted per-resource breakout
my $total_extra_workers_required = 0;
my @log_buffer = ();
my $submit_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers scheduled this time', $submit_capacity );
my $queen_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->db->get_RoleAdaptor->get_hive_current_load() );
if( my @stats_sorted_by_suitability = @{ Bio::EnsEMBL::Hive::Scheduler::sort_stats_by_suitability( $list_of_analyses ) } ) {
foreach my $analysis_stats (@stats_sorted_by_suitability) {
last if( $submit_capacity_limiter->reached );
my $submit_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers scheduled this time', $submit_capacity );
my $queen_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->db->get_RoleAdaptor->get_hive_current_load() );
my $analysis = $analysis_stats->analysis(); # FIXME: if it proves too expensive we may need to consider caching
my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
foreach my $analysis_stats (@stats_sorted_by_suitability) {
last if( $submit_capacity_limiter->reached );
next if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached );
my $analysis = $analysis_stats->analysis(); # FIXME: if it proves too expensive we may need to consider caching
my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
#digging deeper under the surface so need to sync:
if( $analysis_stats->status =~ /^(LOADING|ALL_CLAIMED|BLOCKED|SYNCHING)$/ ) {
$queen->safe_synchronize_AnalysisStats($analysis_stats);
}
next if( $analysis_stats->status =~ /^(BLOCKED|SYNCHING)$/ );
# getting the initial worker requirement for this analysis (may be stale if not sync'ed recently)
my $extra_workers_this_analysis = $analysis_stats->num_required_workers;
# if this analysis doesn't require any extra workers - just skip it:
next if ($extra_workers_this_analysis <= 0);
$total_extra_workers_required += $extra_workers_this_analysis; # also keep the total number required so far (if nothing required we may need a resync later)
# setting up all negotiating limiters:
$queen_capacity_limiter->multiplier( $analysis_stats->hive_capacity );
my @limiters = (
$submit_capacity_limiter,
$queen_capacity_limiter,
$meadow_capacity_limiter_hashed_by_type
? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}
: (),
defined($analysis->analysis_capacity)
? Bio::EnsEMBL::Hive::Limiter->new( "Number of Workers working at '".$analysis->logic_name."' analysis",
$analysis->analysis_capacity - $analysis_stats->num_running_workers )
: (),
);
# negotiations:
foreach my $limiter (@limiters) {
$extra_workers_this_analysis = $limiter->preliminary_offer( $extra_workers_this_analysis );
}
next if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached );
# do not continue with this analysis if limiters haven't agreed on a positive number:
next if ($extra_workers_this_analysis <= 0);
#digging deeper under the surface so need to sync:
if( $analysis_stats->status =~ /^(LOADING|ALL_CLAIMED|BLOCKED|SYNCHING)$/ ) {
$queen->safe_synchronize_AnalysisStats($analysis_stats);
}
next if( $analysis_stats->status =~ /^(BLOCKED|SYNCHING)$/ );
# getting the initial worker requirement for this analysis (may be stale if not sync'ed recently)
my $extra_workers_this_analysis = $analysis_stats->num_required_workers;
# if this analysis doesn't require any extra workers - just skip it:
next if ($extra_workers_this_analysis <= 0);
$total_extra_workers_required += $extra_workers_this_analysis; # also keep the total number required so far (if nothing required we may need a resync later)
# setting up all negotiating limiters:
$queen_capacity_limiter->multiplier( $analysis_stats->hive_capacity );
my @limiters = (
$submit_capacity_limiter,
$queen_capacity_limiter,
$meadow_capacity_limiter_hashed_by_type
? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}
: (),
defined($analysis->analysis_capacity)
? Bio::EnsEMBL::Hive::Limiter->new( "Number of Workers working at '".$analysis->logic_name."' analysis",
$analysis->analysis_capacity - $analysis_stats->num_running_workers )
: (),
);
# negotiations:
foreach my $limiter (@limiters) {
$extra_workers_this_analysis = $limiter->preliminary_offer( $extra_workers_this_analysis );
}
# let all parties know the final decision of negotiations:
foreach my $limiter (@limiters) {
$limiter->final_decision( $extra_workers_this_analysis );
}
# do not continue with this analysis if limiters haven't agreed on a positive number:
next if ($extra_workers_this_analysis <= 0);
if($analysis_id2rc_name) {
my $this_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
$workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
# let all parties know the final decision of negotiations:
foreach my $limiter (@limiters) {
$limiter->final_decision( $extra_workers_this_analysis );
}
push @workers_to_submit_by_analysis, [ $analysis, $extra_workers_this_analysis];
push @log_buffer, $analysis_stats->toString;
push @log_buffer, sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n",
$analysis->logic_name,
$queen_capacity_limiter->available_capacity,
);
} else {
return $analysis_stats; # FIXME: returns data in different format in "suggest analysis" mode
}
if($analysis_id2rc_name) {
my $this_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
$workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
push @log_buffer, sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n",
$analysis->logic_name,
$queen_capacity_limiter->available_capacity,
);
}
} # /foreach my $analysis_stats (@stats_sorted_by_suitability)
} else {
push @log_buffer, "Scheduler could not find any suitable analyses to start with";
}
return (\%workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, \@log_buffer);
return (\@workers_to_submit_by_analysis, \%workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, \@log_buffer);
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment