Commit f1603685 authored by Leo Gordon's avatar Leo Gordon
Browse files

proof of concept: all structures passed into calls and back are now meadow-aware

parent 7e2f06f6
......@@ -61,6 +61,7 @@ package Bio::EnsEMBL::Hive::Queen;
use strict;
use POSIX;
use Clone 'clone';
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
......@@ -738,30 +739,33 @@ sub count_running_workers {
=cut
sub schedule_workers {
my ($self, $filter_analysis, $submit_limit, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, $analysis_id2rc_name, $default_meadow_type) = @_;
my ($self, $filter_analysis, $available_submit_limit, $available_worker_slots_by_meadow_type, $orig_pending_worker_counts_by_meadow_type_rc_name, $analysis_id2rc_name, $default_meadow_type) = @_;
my $meadow_limit = $available_worker_slots_by_meadow_type->{ $default_meadow_type };
my $orig_pending_this_meadow_by_rc_name = $pending_worker_counts_by_meadow_type_rc_name->{ $default_meadow_type };
my @suitable_analyses = $filter_analysis
? ( $filter_analysis->stats )
: @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id() };
my $available_submit_limit = ($submit_limit and $meadow_limit)
? (($submit_limit<$meadow_limit) ? $submit_limit : $meadow_limit)
: (defined($submit_limit) ? $submit_limit : $meadow_limit);
my @suitable_analyses = $filter_analysis
? ( $filter_analysis->stats )
: @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id() };
return {} unless(@suitable_analyses);
my %pending_this_meadow_by_rc_name = %{ $orig_pending_this_meadow_by_rc_name || {} }; # NB: make our own copy to be able to modify it
my $total_workers_to_run = 0;
my %workers_to_run_by_rc_name = ();
my $available_load = 1.0 - $self->get_hive_current_load();
unless(@suitable_analyses) {
print "Scheduler could not find any suitable analyses to start with\n";
return ({}, 0);
}
my %workers_to_submit_by_meadow_type_rc_name = ();
my %total_workers_to_submit_by_meadow_type = ();
my %pending_worker_counts_by_meadow_type_rc_name= %{ clone $orig_pending_worker_counts_by_meadow_type_rc_name }; # we need a deep disposable copy here
my $total_workers_to_submit = 0;
my $available_load = 1.0 - $self->get_hive_current_load();
foreach my $analysis_stats (@suitable_analyses) {
last if ($available_load <= 0.0);
my $this_meadow_type = $default_meadow_type; # this should be coming from each specific analysis (and only default if undef)
if( defined(my $meadow_limit = $available_worker_slots_by_meadow_type->{ $this_meadow_type }) ) {
$available_submit_limit = defined($available_submit_limit)
? (($available_submit_limit<$meadow_limit) ? $available_submit_limit : $meadow_limit)
: $meadow_limit;
}
last if (defined($available_submit_limit) and !$available_submit_limit);
#digging deeper under the surface so need to sync
......@@ -770,12 +774,12 @@ sub schedule_workers {
}
next if($analysis_stats->status eq 'BLOCKED');
next if($analysis_stats->num_required_workers == 0);
# FIXME: the following call *sometimes* returns a stale number greater than the number of workers actually needed for an analysis; -sync fixes it
my $workers_this_analysis = $analysis_stats->num_required_workers;
my $workers_this_analysis = $analysis_stats->num_required_workers
or next;
if(defined($available_submit_limit)) { # submit_limit total capping, if available
if(defined($available_submit_limit)) { # available_submit_limit total capping, if available
if($workers_this_analysis > $available_submit_limit) {
$workers_this_analysis = $available_submit_limit;
}
......@@ -794,31 +798,37 @@ sub schedule_workers {
my $curr_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
if($pending_this_meadow_by_rc_name{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available
my $pending_this_analysis = ($pending_this_meadow_by_rc_name{ $curr_rc_name } < $workers_this_analysis) ? $pending_this_meadow_by_rc_name{ $curr_rc_name } : $workers_this_analysis;
if(my $pending_this_meadow_type_and_rc_name = $pending_worker_counts_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available
my $pending_this_analysis = ($pending_this_meadow_type_and_rc_name < $workers_this_analysis) ? $pending_this_meadow_type_and_rc_name : $workers_this_analysis;
print "Scheduler detected $pending_this_analysis pending workers with resource_class_name=$curr_rc_name\n";
$workers_this_analysis -= $pending_this_analysis;
$pending_this_meadow_by_rc_name{ $curr_rc_name } -= $pending_this_analysis;
print "Scheduler detected $pending_this_analysis pending workers with resource_class_name=$curr_rc_name, adjusting for this value\n";
$pending_worker_counts_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name } -= $pending_this_analysis;
$workers_this_analysis -= $pending_this_analysis;
}
next unless($workers_this_analysis); # do not autovivify the hash by a zero
next unless($workers_this_analysis); # do not autovivify the output hash by a zero
$total_workers_to_run += $workers_this_analysis;
$workers_to_run_by_rc_name{ $curr_rc_name } += $workers_this_analysis;
$workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name } += $workers_this_analysis;
$total_workers_to_submit_by_meadow_type{ $this_meadow_type } += $workers_this_analysis;
$total_workers_to_submit += $workers_this_analysis;
$analysis_stats->print_stats();
printf("Scheduler suggests adding %d more workers of resource_class_name=%s for analysis_id=%d [%1.3f hive_load remaining]\n", $workers_this_analysis, $curr_rc_name, $analysis_stats->analysis_id, $available_load);
printf("Scheduler suggests adding $workers_this_analysis more $this_meadow_type:$curr_rc_name workers for analysis_id=%d [%.3f hive_load remaining]\n", $analysis_stats->analysis_id, $available_load);
}
printf("Scheduler suggests adding a total of %d workers [%1.5f hive_load remaining]\n", $total_workers_to_run, $available_load);
return \%workers_to_run_by_rc_name;
print ''.('-'x60)."\n";
foreach my $meadow_type (keys %total_workers_to_submit_by_meadow_type) {
print "Scheduler suggests submitting a total of $total_workers_to_submit_by_meadow_type{$meadow_type} workers to $meadow_type\n";
}
printf("The remaining hive_load after submitting these workers will be: %.3f\n", $available_load);
print ''.('='x60)."\n";
return (\%workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit);
}
sub schedule_workers_resync_if_necessary {
my ($self, $valley, $analysis) = @_;
my $submit_limit = $valley->config_get('SubmitWorkersMax');
my $available_submit_limit = $valley->config_get('SubmitWorkersMax');
my $available_worker_slots_by_meadow_type = $valley->get_available_worker_slots_by_meadow_type();
my ($pending_worker_counts_by_meadow_type_rc_name, $total_pending_all_meadows) = $valley->get_pending_worker_counts_by_meadow_type_rc_name();
......@@ -827,20 +837,22 @@ sub schedule_workers_resync_if_necessary {
# combined mapping:
my %analysis_id2rc_name = map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id;
my $default_meadow_type = $valley->get_current_meadow()->type;
my $default_meadow_type = $valley->get_default_meadow()->type;
my $workers_to_run_by_rc_name = $self->schedule_workers($analysis, $submit_limit, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, \%analysis_id2rc_name, $default_meadow_type);
my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
= $self->schedule_workers($analysis, $available_submit_limit, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, \%analysis_id2rc_name, $default_meadow_type);
unless( keys %$workers_to_run_by_rc_name or $self->get_hive_current_load() or $self->count_running_workers() ) {
unless( $total_workers_to_submit or $self->get_hive_current_load() or $self->count_running_workers() ) {
print "\nScheduler: nothing is running and nothing to do (according to analysis_stats) => executing garbage collection and sync\n" ;
$self->check_for_dead_workers($valley, 1);
$self->synchronize_hive($analysis);
$workers_to_run_by_rc_name = $self->schedule_workers($analysis, $submit_limit, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, \%analysis_id2rc_name, $default_meadow_type);
($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
= $self->schedule_workers($analysis, $available_submit_limit, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, \%analysis_id2rc_name, $default_meadow_type);
}
return $workers_to_run_by_rc_name;
return ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit);
}
......
......@@ -36,14 +36,14 @@ sub meadow_class_path {
sub new {
my ($class, $config, $current_meadow_type, $pipeline_name) = @_;
my ($class, $config, $default_meadow_type, $pipeline_name) = @_;
my $self = bless {}, $class;
$self->config( $config );
$self->context( [ 'Valley' ] );
my $amch = $self->available_meadow_hash( {} );
my $amh = $self->available_meadow_hash( {} );
# make sure modules are loaded and available ones are checked prior to setting the current one
foreach my $meadow_class (@{ find_submodules( $self->meadow_class_path ) }) {
......@@ -53,11 +53,11 @@ sub new {
$meadow_object->pipeline_name( $pipeline_name ) if($pipeline_name);
$amch->{$meadow_class->type} = $meadow_object;
$amh->{$meadow_class->type} = $meadow_object;
}
}
$self->set_current_meadow_type($current_meadow_type); # run this method even if $current_meadow_type was not specified
$self->set_default_meadow_type($default_meadow_type); # run this method even if $default_meadow_type was not specified
return $self;
}
......@@ -82,25 +82,25 @@ sub get_available_meadow_list { # this beautiful one-liner pushes $local to
}
sub set_current_meadow_type {
my ($self, $current_meadow_type) = @_;
sub set_default_meadow_type {
my ($self, $default_meadow_type) = @_;
if($current_meadow_type) {
if( my $current_meadow = $self->available_meadow_hash->{$current_meadow_type} ) { # store if available
$self->{_current_meadow} = $current_meadow;
if($default_meadow_type) {
if( my $default_meadow = $self->available_meadow_hash->{$default_meadow_type} ) { # store if available
$self->{_default_meadow} = $default_meadow;
} else {
die "Meadow '$current_meadow_type' does not seem to be available on this machine, please investigate";
die "Meadow '$default_meadow_type' does not seem to be available on this machine, please investigate";
}
} else {
$self->{_current_meadow} = $self->get_available_meadow_list->[0]; # take the first from preference list
$self->{_default_meadow} = $self->get_available_meadow_list->[0]; # take the first from preference list
}
}
sub get_current_meadow {
sub get_default_meadow {
my $self = shift @_;
return $self->{_current_meadow};
return $self->{_default_meadow};
}
......
......@@ -29,7 +29,7 @@ sub main {
my $sync = 0;
my $local = 0;
my $show_failed_jobs = 0;
my $meadow_type = undef;
my $default_meadow_type = undef;
my $submit_workers_max = undef;
my $total_running_workers_max = undef;
my $submission_options = undef;
......@@ -72,7 +72,7 @@ sub main {
# meadow control
'local!' => \$local,
'meadow_type=s' => \$meadow_type,
'meadow_type=s' => \$default_meadow_type,
'total_running_workers_max=i' => \$total_running_workers_max,
'submit_workers_max=i' => \$submit_workers_max,
'submission_options=s' => \$submission_options,
......@@ -154,15 +154,15 @@ sub main {
$submit_workers_max = 1;
}
$meadow_type = 'LOCAL' if($local);
my $valley = Bio::EnsEMBL::Hive::Valley->new( $config, $meadow_type, $pipeline_name );
$default_meadow_type = 'LOCAL' if($local);
my $valley = Bio::EnsEMBL::Hive::Valley->new( $config, $default_meadow_type, $pipeline_name );
$valley->config_set('SubmitWorkersMax', $submit_workers_max) if(defined $submit_workers_max);
my $current_meadow = $valley->get_current_meadow();
warn "Current meadow: ".$current_meadow->signature."\n\n";
my $default_meadow = $valley->get_default_meadow();
warn "Default meadow: ".$default_meadow->signature."\n\n";
$current_meadow->config_set('TotalRunningWorkersMax', $total_running_workers_max) if(defined $total_running_workers_max);
$current_meadow->config_set('SubmissionOptions', $submission_options) if(defined $submission_options);
$default_meadow->config_set('TotalRunningWorkersMax', $total_running_workers_max) if(defined $total_running_workers_max);
$default_meadow->config_set('SubmissionOptions', $submission_options) if(defined $submission_options);
if($reset_job_id) { $queen->reset_job_by_dbID_and_sync($reset_job_id); }
......@@ -301,13 +301,13 @@ sub generate_worker_cmd {
sub run_autonomously {
my ($self, $max_loops, $keep_alive, $queen, $valley, $run_analysis, $run_job_id, $force) = @_;
my $current_meadow = $valley->get_current_meadow();
my $default_meadow = $valley->get_default_meadow();
my $worker_cmd = generate_worker_cmd($self, $run_analysis, $run_job_id, $force);
my $special_task = $run_analysis || $run_job_id;
# first, fetch two resource-related mappings from the database:
my $rc_name2id = $self->{'dba'}->get_ResourceClassAdaptor->fetch_HASHED_FROM_name_TO_resource_class_id();
my $rc_id2xparams = $self->{'dba'}->get_ResourceDescriptionAdaptor->fetch_by_meadow_type_HASHED_FROM_resource_class_id_TO_parameters($current_meadow->type());
my $rc_id2xparams = $self->{'dba'}->get_ResourceDescriptionAdaptor->fetch_by_meadow_type_HASHED_FROM_resource_class_id_TO_parameters($default_meadow->type());
# now, chain these two mappings together:
# FIXME: in future this mapping should be obtainable from the adaptor in one go.
......@@ -331,15 +331,21 @@ sub run_autonomously {
$queen->print_analysis_status unless($self->{'no_analysis_stats'});
$queen->print_running_worker_counts;
my $workers_to_run_by_rc_name = $queen->schedule_workers_resync_if_necessary($valley, $run_analysis);
my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
= $queen->schedule_workers_resync_if_necessary($valley, $run_analysis);
if(keys %$workers_to_run_by_rc_name) {
foreach my $rc_name ( sort { $workers_to_run_by_rc_name->{$a}<=>$workers_to_run_by_rc_name->{$b} } keys %$workers_to_run_by_rc_name) {
my $this_rc_worker_count = $workers_to_run_by_rc_name->{$rc_name};
if($total_workers_to_submit) {
foreach my $meadow_type (keys %$workers_to_submit_by_meadow_type_rc_name) {
print "Submitting $this_rc_worker_count workers (rc_name=$rc_name) to ".$current_meadow->signature()."\n";
my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
$current_meadow->submit_workers($worker_cmd.($special_task ? '' : " -rc_name $rc_name"), $this_rc_worker_count, $iteration, $rc_name, $rc_name2xparams{ $rc_name } || '');
foreach my $rc_name (keys %{ $workers_to_submit_by_meadow_type_rc_name->{$meadow_type} }) {
my $this_meadow_rc_worker_count = $workers_to_submit_by_meadow_type_rc_name->{$meadow_type}{$rc_name};
print "Submitting $this_meadow_rc_worker_count workers (rc_name=$rc_name) to ".$this_meadow->signature()."\n";
$this_meadow->submit_workers($worker_cmd.($special_task ? '' : " -rc_name $rc_name"), $this_meadow_rc_worker_count, $iteration, $rc_name, $rc_name2xparams{ $rc_name } || '');
}
}
} else {
print "Not submitting any workers this iteration\n";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment