Skip to content
Snippets Groups Projects
Commit 78a2310d authored by Leo Gordon's avatar Leo Gordon
Browse files

Individual Worker's specialization is now routed through Scheduler's main...

Individual Worker's specialization is now routed through Scheduler's main subroutine, to enforce more checks
parent 4958f02f
No related branches found
No related tags found
No related merge requests found
......@@ -74,6 +74,7 @@ use Bio::EnsEMBL::Utils::Argument ('rearrange');
use Bio::EnsEMBL::Hive::Utils ('destringify', 'dir_revhash'); # NB: needed by invisible code
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::Scheduler;
use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
......@@ -274,7 +275,7 @@ sub specialize_new_worker {
}
}
# probably scheduled by beekeeper.pl:
} elsif( $stats = $self->suggest_analysis_to_specialize_by_rc_id_meadow_type($worker->resource_class_id, $worker->meadow_type) ) {
} elsif( $stats = Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_by_rc_id_meadow_type($self, $worker->resource_class_id, $worker->meadow_type) ) {
$worker->analysis( undef ); # make sure we reset anything that was there before
$analysis_id = $stats->analysis_id;
......@@ -833,21 +834,4 @@ sub register_all_workers_dead {
}
sub suggest_analysis_to_specialize_by_rc_id_meadow_type {
my $self = shift;
my $rc_id = shift;
my $meadow_type = shift;
my @suitable_analyses = @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id_meadow_type( $rc_id, $meadow_type ) };
foreach my $stats (@suitable_analyses) {
#synchronize and double check that it can be run:
$self->safe_synchronize_AnalysisStats($stats);
return $stats if( ($stats->status ne 'BLOCKED') and ($stats->status ne 'SYNCHING') and ($stats->num_required_workers > 0) );
}
return undef;
}
1;
......@@ -38,7 +38,6 @@ use warnings;
use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::Hive::Queen;
use Bio::EnsEMBL::Hive::Valley;
use Bio::EnsEMBL::Hive::Limiter;
......@@ -46,7 +45,9 @@ use Bio::EnsEMBL::Hive::Limiter;
sub schedule_workers_resync_if_necessary {
my ($queen, $valley, $filter_analysis) = @_;
my $meadow_capacity_by_type = $valley->get_meadow_capacity_hash_by_meadow_type();
my $submit_capacity = $valley->config_get('SubmitWorkersMax');
my $default_meadow_type = $valley->get_default_meadow()->type;
my $meadow_capacity_limiter_hashed_by_type = $valley->get_meadow_capacity_hash_by_meadow_type();
my $analysis_id2rc_id = $queen->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
my $rc_id2name = $queen->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
......@@ -54,12 +55,13 @@ sub schedule_workers_resync_if_necessary {
my $analysis_id2rc_name = { map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id };
my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer)
= schedule_workers($queen, $valley, $filter_analysis, $meadow_capacity_by_type, $analysis_id2rc_name);
= schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
print $log_buffer;
unless( $total_workers_required ) {
print "\nScheduler: according to analysis_stats no workers are required... let's see if resync can fix it.\n" ;
# FIXME: here is an (optimistic) assumption all Workers the Queen can see are reachable from the Valley:
if( $queen->count_running_workers() != $valley->count_running_workers ) {
print "Scheduler: mismatch between Queen's workers and Valley's workers detected, checking for dead workers...\n";
$queen->check_for_dead_workers($valley, 1);
......@@ -70,7 +72,7 @@ sub schedule_workers_resync_if_necessary {
$queen->synchronize_hive($filter_analysis);
($workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer)
= schedule_workers($queen, $valley, $filter_analysis, $meadow_capacity_by_type, $analysis_id2rc_name);
= schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
print $log_buffer;
}
......@@ -104,15 +106,22 @@ sub schedule_workers_resync_if_necessary {
}
sub suggest_analysis_to_specialize_by_rc_id_meadow_type {
my ($queen, $filter_rc_id, $filter_meadow_type) = @_;
return schedule_workers($queen, 1, $filter_meadow_type, $filter_rc_id, $filter_meadow_type);
}
sub schedule_workers {
my ($queen, $valley, $filter_analysis, $meadow_capacity_by_type, $analysis_id2rc_name) = @_;
my ($queen, $submit_capacity, $default_meadow_type, $filter_rc_id, $filter_meadow_type, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name) = @_;
my @suitable_analyses = $filter_analysis
? ( $filter_analysis->stats )
: @{ $queen->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id_meadow_type() };
: @{ $queen->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id_meadow_type($filter_rc_id, $filter_meadow_type) };
unless(@suitable_analyses) {
return ({}, 0, "Scheduler could not find any suitable analyses to start with\n");
return $analysis_id2rc_name ? ({}, 0, "Scheduler could not find any suitable analyses to start with\n") : undef; # FIXME: returns data in different format in "suggest analysis" mode
}
# the pre-pending-adjusted outcome will be stored here:
......@@ -120,24 +129,22 @@ sub schedule_workers {
my $total_workers_required = 0;
my $log_buffer = '';
my $default_meadow_type = $valley->get_default_meadow()->type;
my $submit_capacity = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers submitted this iteration', $valley->config_get('SubmitWorkersMax') );
my $queen_capacity = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->get_hive_current_load() );
my $submit_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers scheduled this time', $submit_capacity );
my $queen_capacity_limiter = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->get_hive_current_load() );
foreach my $analysis_stats (@suitable_analyses) {
last if( $submit_capacity->reached );
last if( $submit_capacity_limiter->reached );
my $analysis = $analysis_stats->get_analysis; # FIXME: if it proves too expensive we may need to consider caching
my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
next if( $meadow_capacity_by_type->{$this_meadow_type}->reached );
next if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached );
#digging deeper under the surface so need to sync:
if(($analysis_stats->status eq 'LOADING') or ($analysis_stats->status eq 'BLOCKED') or ($analysis_stats->status eq 'ALL_CLAIMED')) {
$queen->synchronize_AnalysisStats($analysis_stats);
if( $analysis_stats->status =~ /^(LOADING|ALL_CLAIMED|BLOCKED|SYNCHING)$/ ) {
$queen->safe_synchronize_AnalysisStats($analysis_stats);
}
next if($analysis_stats->status eq 'BLOCKED');
next if( $analysis_stats->status =~ /^(BLOCKED|SYNCHING)$/ );
# getting the initial worker requirement for this analysis (may be stale if not sync'ed recently)
my $extra_workers_this_analysis = $analysis_stats->num_required_workers;
......@@ -148,11 +155,13 @@ sub schedule_workers {
$total_workers_required += $extra_workers_this_analysis; # also keep the total number required so far (if nothing required we may need a resync later)
# setting up all negotiating limiters:
$queen_capacity->multiplier( $analysis_stats->hive_capacity );
$queen_capacity_limiter->multiplier( $analysis_stats->hive_capacity );
my @limiters = (
$submit_capacity,
$queen_capacity,
$meadow_capacity_by_type->{$this_meadow_type},
$submit_capacity_limiter,
$queen_capacity_limiter,
$meadow_capacity_limiter_hashed_by_type
? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}
: (),
defined($analysis->analysis_capacity)
? Bio::EnsEMBL::Hive::Limiter->new( "Number of Workers working at '".$analysis->logic_name."' analysis",
$analysis->analysis_capacity - $analysis_stats->num_running_workers )
......@@ -172,13 +181,17 @@ sub schedule_workers {
$limiter->final_decision( $extra_workers_this_analysis );
}
my $this_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
$workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
$log_buffer .= $analysis_stats->toString . "\n";
$log_buffer .= sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n",
$analysis->logic_name,
$queen_capacity->available_capacity,
);
if($analysis_id2rc_name) {
my $this_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
$workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
$log_buffer .= $analysis_stats->toString . "\n";
$log_buffer .= sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n",
$analysis->logic_name,
$queen_capacity_limiter->available_capacity,
);
} else {
return $analysis_stats; # FIXME: returns data in different format in "suggest analysis" mode
}
}
return (\%workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer);
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment