Commit 24bfdb58 authored by Leo Gordon's avatar Leo Gordon
Browse files

switch the Scheduler to using universal Limiter objects (cleaner code, more...

switch the Scheduler to using universal Limiter objects (cleaner code, more precise computation and should allow for expansion)
parent 62bda079
package Bio::EnsEMBL::Hive::Limiter;
sub new {
my ($class, $available_capacity) = @_;
my $self = bless {}, $class;
$self->available_capacity( $available_capacity );
# we fix the multiplier at 1 for direct limiters, but expect it to be (re)set later by reciprocal limiters:
$self->multiplier( 1 );
return $self;
}
sub available_capacity {
my $self = shift @_;
if(@_) {
$self->{_available_capacity} = shift @_;
}
return $self->{_available_capacity};
}
sub multiplier {
my $self = shift @_;
if(@_) {
$self->{_multiplier} = shift @_;
}
return $self->{_multiplier};
}
sub reached {
my $self = shift @_;
return defined($self->available_capacity) && ($self->available_capacity <= 0.0);
}
sub preliminary_offer {
my ($self, $slots_asked) = @_;
if( defined($self->available_capacity) and (my $multiplier = $self->multiplier) > 0.0 ) { # if multiplier is not positive, capacity stays unaffected
my $slots_available = int($self->available_capacity * $multiplier);
return ($slots_available<$slots_asked) ? $slots_available : $slots_asked;
}
return $slots_asked;
}
sub final_decision {
my ($self, $slots_agreed) = @_;
if( defined($self->available_capacity) and (my $multiplier = $self->multiplier) > 0.0 ) { # if multiplier is not positive, capacity stays unaffected
$self->available_capacity( $self->available_capacity - $slots_agreed/$multiplier );
}
}
1;
......@@ -29,6 +29,7 @@ use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::Hive::Queen;
use Bio::EnsEMBL::Hive::Valley;
use Bio::EnsEMBL::Hive::Limiter;
sub schedule_workers_resync_if_necessary {
......@@ -101,57 +102,53 @@ sub schedule_workers {
my $default_meadow_type = $valley->get_default_meadow()->type;
my $available_submit_limit = $valley->config_get('SubmitWorkersMax');
my $available_load = 1.0 - $queen->get_hive_current_load();
foreach my $analysis_stats (@suitable_analyses) {
last if ($available_load <= 0.0);
my $submit_capacity = Bio::EnsEMBL::Hive::Limiter->new( $valley->config_get('SubmitWorkersMax') );
my $queen_capacity = Bio::EnsEMBL::Hive::Limiter->new( 1.0 - $queen->get_hive_current_load() );
my %meadow_capacity = map { $_ => Bio::EnsEMBL::Hive::Limiter->new( $available_worker_slots_by_meadow_type->{$_} ) } keys %$available_worker_slots_by_meadow_type;
my $analysis = $analysis_stats->get_analysis(); # FIXME: if it proves too expensive we may need to consider caching
foreach my $analysis_stats (@suitable_analyses) {
last if( $submit_capacity->reached or $queen_capacity->reached);
my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
my $analysis = $analysis_stats->get_analysis; # FIXME: if it proves too expensive we may need to consider caching
my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
if( defined(my $meadow_limit = $available_worker_slots_by_meadow_type->{ $this_meadow_type }) ) {
$available_submit_limit = defined($available_submit_limit)
? (($available_submit_limit<$meadow_limit) ? $available_submit_limit : $meadow_limit)
: $meadow_limit;
}
last if (defined($available_submit_limit) and !$available_submit_limit);
next if( $meadow_capacity{$this_meadow_type}->reached );
#digging deeper under the surface so need to sync
#digging deeper under the surface so need to sync:
if(($analysis_stats->status eq 'LOADING') or ($analysis_stats->status eq 'BLOCKED') or ($analysis_stats->status eq 'ALL_CLAIMED')) {
$queen->synchronize_AnalysisStats($analysis_stats);
}
next if($analysis_stats->status eq 'BLOCKED');
# FIXME: the following call *sometimes* returns a stale number greater than the number of workers actually needed for an analysis; -sync fixes it
# getting the initial worker requirement for this analysis (may be stale if not sync'ed recently)
my $workers_this_analysis = $analysis_stats->num_required_workers
or next;
if(defined($available_submit_limit)) { # available_submit_limit total capping, if available
if($workers_this_analysis > $available_submit_limit) {
$workers_this_analysis = $available_submit_limit;
}
$available_submit_limit -= $workers_this_analysis;
}
# setting up all negotiating limiters:
$queen_capacity->multiplier( $analysis_stats->hive_capacity );
if((my $hive_capacity = $analysis_stats->hive_capacity) > 0) { # per-analysis hive_capacity capping, if available
my $remaining_capacity_for_this_analysis = int($available_load * $hive_capacity);
# negotiations:
$workers_this_analysis = $submit_capacity->preliminary_offer( $workers_this_analysis );
$workers_this_analysis = $queen_capacity->preliminary_offer( $workers_this_analysis );
$workers_this_analysis = $meadow_capacity{$this_meadow_type}->preliminary_offer( $workers_this_analysis );
if($workers_this_analysis > $remaining_capacity_for_this_analysis) {
$workers_this_analysis = $remaining_capacity_for_this_analysis;
}
$available_load -= 1.0*$workers_this_analysis/$hive_capacity;
}
# do not continue with this analysis if haven't agreed on a positive number:
next unless($workers_this_analysis);
my $this_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
# let all parties know the final decision of negotiations:
$submit_capacity->final_decision( $workers_this_analysis );
$queen_capacity->final_decision( $workers_this_analysis );
$meadow_capacity{$this_meadow_type}->final_decision( $workers_this_analysis );
my $this_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
$workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $workers_this_analysis;
$total_workers_to_submit += $workers_this_analysis;
$analysis_stats->print_stats();
printf("Before checking the Valley for pending jobs, Scheduler allocated $workers_this_analysis x $this_meadow_type:$this_rc_name workers for '%s' [%.4f hive_load remaining]\n", $analysis->logic_name, $available_load);
printf("Before checking the Valley for pending jobs, Scheduler allocated $workers_this_analysis x $this_meadow_type:$this_rc_name workers for '%s' [%.4f hive_load remaining]\n",
$analysis->logic_name,
$queen_capacity->available_capacity,
);
}
return (\%workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment