Commit 49e581d2 authored by Matthieu Muffato's avatar Matthieu Muffato
Browse files

Merge branch 'master' of git.internal.sanger.ac.uk:/repos/git/ensembl/compara/ensembl-hive

parents 3788c02d efacc250
......@@ -35,8 +35,8 @@ sub new {
my $self = $class->SUPER::new( @_ ); # deal with Storable stuff
my ($logic_name, $module, $parameters, $resource_class_id, $failed_job_tolerance, $max_retry_count, $can_be_empty, $priority) =
rearrange([qw(logic_name module parameters resource_class_id failed_job_tolerance max_retry_count can_be_empty priority) ], @_);
my ($logic_name, $module, $parameters, $resource_class_id, $failed_job_tolerance, $max_retry_count, $can_be_empty, $priority, $meadow_type) =
rearrange([qw(logic_name module parameters resource_class_id failed_job_tolerance max_retry_count can_be_empty priority meadow_type) ], @_);
$self->logic_name($logic_name) if($logic_name);
$self->module($module) if($module);
......@@ -46,6 +46,7 @@ sub new {
$self->max_retry_count($max_retry_count) if($max_retry_count);
$self->can_be_empty($can_be_empty) if($can_be_empty);
$self->priority($priority) if($priority);
$self->meadow_type($meadow_type) if($meadow_type);
return $self;
}
......@@ -111,6 +112,13 @@ sub priority {
}
sub meadow_type {
my $self = shift;
$self->{'_meadow_type'} = shift if(@_);
return $self->{'_meadow_type'};
}
=head2 process
Arg [1] : none
......
......@@ -329,9 +329,7 @@ sub dataflow_output_id {
if($failed_to_create) { # adjust semaphore_count for jobs that failed to create:
$self->adaptor->decrease_semaphore_count_for_jobid( $funnel_job_id, $failed_to_create );
}
} else {
die "Could not create a funnel job";
} else { # assume the whole semaphored group of jobs has been created already
}
} else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
......
......@@ -142,6 +142,7 @@ sub CreateNewJob {
WHERE analysis_id=$analysis_id
});
}
} else { # if we got 0E0, it means "ignored insert collision" (job created previously), so we simply return an undef and deal with it outside
}
return $job_id;
......
......@@ -21,6 +21,13 @@ sub get_current_worker_process_id {
}
sub count_pending_workers_by_rc_name {
my ($self) = @_;
return ({}, 0); # LOCAL has no concept of pending workers
}
sub count_running_workers {
my $self = shift @_;
......
......@@ -39,17 +39,19 @@ sub count_pending_workers_by_rc_name {
my ($self) = @_;
my $jnp = $self->job_name_prefix();
my $cmd = qq{bjobs -w -J '${jnp}*' -u all 2>/dev/null | grep PEND};
my $cmd = "bjobs -w -J '${jnp}*' -u all 2>/dev/null | grep PEND";
my %pending_by_rc_name = ();
my %pending_this_meadow_by_rc_name = ();
my $total_pending_this_meadow = 0;
foreach my $line (qx/$cmd/) {
if($line=~/\b\Q$jnp\E(\S+)\-\d+(\[\d+\])?\b/) {
$pending_by_rc_name{$1}++;
$pending_this_meadow_by_rc_name{$1}++;
$total_pending_this_meadow++;
}
}
return \%pending_by_rc_name;
return (\%pending_this_meadow_by_rc_name, $total_pending_this_meadow);
}
......
......@@ -380,8 +380,8 @@ sub run {
my %seen_logic_name = ();
foreach my $aha (@{$self->pipeline_analyses}) {
my ($logic_name, $module, $parameters_hash, $input_ids, $blocked, $batch_size, $hive_capacity, $failed_job_tolerance, $max_retry_count, $can_be_empty, $rc_id, $rc_name, $priority) =
rearrange([qw(logic_name module parameters input_ids blocked batch_size hive_capacity failed_job_tolerance max_retry_count can_be_empty rc_id rc_name priority)], %$aha);
my ($logic_name, $module, $parameters_hash, $input_ids, $blocked, $batch_size, $hive_capacity, $failed_job_tolerance, $max_retry_count, $can_be_empty, $rc_id, $rc_name, $priority, $meadow_type) =
rearrange([qw(logic_name module parameters input_ids blocked batch_size hive_capacity failed_job_tolerance max_retry_count can_be_empty rc_id rc_name priority meadow_type)], %$aha);
unless($logic_name) {
die "logic_name' must be defined in every analysis";
......@@ -424,6 +424,7 @@ sub run {
-max_retry_count => $max_retry_count,
-can_be_empty => $can_be_empty,
-priority => $priority,
-meadow_type => $meadow_type,
);
$analysis_adaptor->store($analysis);
......
......@@ -142,6 +142,7 @@ sub pipeline_analyses {
return [
{ -logic_name => 'start',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::Start',
-meadow_type=> 'LOCAL', # do not bother the farm with such a simple task (and get it done faster)
-parameters => {},
-input_ids => [
{ 'a_multiplier' => $self->o('first_mult'), 'b_multiplier' => $self->o('second_mult') },
......
......@@ -61,6 +61,7 @@ package Bio::EnsEMBL::Hive::Queen;
use strict;
use POSIX;
use Clone 'clone';
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
......@@ -738,27 +739,35 @@ sub count_running_workers {
=cut
sub schedule_workers {
my ($self, $filter_analysis, $orig_pending_by_rc_name, $available_submit_limit) = @_;
my ($self, $filter_analysis, $available_submit_limit, $available_worker_slots_by_meadow_type, $orig_pending_worker_counts_by_meadow_type_rc_name, $analysis_id2rc_name, $default_meadow_type) = @_;
my @suitable_analyses = $filter_analysis
? ( $filter_analysis->stats )
: @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id() };
my @suitable_analyses = $filter_analysis
? ( $filter_analysis->stats )
: @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id() };
my $analysis_id2rc_id = $self->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
my $rc_id2name = $self->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
# combined mapping
my %analysis_id2rc_name = map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id;
return {} unless(@suitable_analyses);
my %pending_by_rc_name = %{ $orig_pending_by_rc_name || {} }; # NB: make our own copy to be able to modify it
my $total_workers_to_run = 0;
my %workers_to_run_by_rc_name = ();
my $available_load = 1.0 - $self->get_hive_current_load();
unless(@suitable_analyses) {
print "Scheduler could not find any suitable analyses to start with\n";
return ({}, 0);
}
my %workers_to_submit_by_meadow_type_rc_name = ();
my %total_workers_to_submit_by_meadow_type = ();
my %pending_worker_counts_by_meadow_type_rc_name= %{ clone $orig_pending_worker_counts_by_meadow_type_rc_name }; # we need a deep disposable copy here
my $total_workers_to_submit = 0;
my $available_load = 1.0 - $self->get_hive_current_load();
foreach my $analysis_stats (@suitable_analyses) {
last if ($available_load <= 0.0);
my $analysis = $analysis_stats->get_analysis(); # FIXME: if it proves too expensive we may need to consider caching
my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
if( defined(my $meadow_limit = $available_worker_slots_by_meadow_type->{ $this_meadow_type }) ) {
$available_submit_limit = defined($available_submit_limit)
? (($available_submit_limit<$meadow_limit) ? $available_submit_limit : $meadow_limit)
: $meadow_limit;
}
last if (defined($available_submit_limit) and !$available_submit_limit);
#digging deeper under the surface so need to sync
......@@ -767,12 +776,12 @@ sub schedule_workers {
}
next if($analysis_stats->status eq 'BLOCKED');
next if($analysis_stats->num_required_workers == 0);
# FIXME: the following call *sometimes* returns a stale number greater than the number of workers actually needed for an analysis; -sync fixes it
my $workers_this_analysis = $analysis_stats->num_required_workers;
my $workers_this_analysis = $analysis_stats->num_required_workers
or next;
if(defined($available_submit_limit)) { # submit_limit total capping, if available
if(defined($available_submit_limit)) { # available_submit_limit total capping, if available
if($workers_this_analysis > $available_submit_limit) {
$workers_this_analysis = $available_submit_limit;
}
......@@ -789,54 +798,63 @@ sub schedule_workers {
$available_load -= 1.0*$workers_this_analysis/$hive_capacity;
}
my $curr_rc_name = $analysis_id2rc_name{ $analysis_stats->analysis_id };
my $curr_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
if($pending_by_rc_name{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available
my $pending_this_analysis = ($pending_by_rc_name{ $curr_rc_name } < $workers_this_analysis) ? $pending_by_rc_name{ $curr_rc_name } : $workers_this_analysis;
if(my $pending_this_meadow_type_and_rc_name = $pending_worker_counts_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available
my $pending_this_analysis = ($pending_this_meadow_type_and_rc_name < $workers_this_analysis) ? $pending_this_meadow_type_and_rc_name : $workers_this_analysis;
print "Scheduler detected $pending_this_analysis pending workers with resource_class_name=$curr_rc_name\n";
$workers_this_analysis -= $pending_this_analysis;
$pending_by_rc_name{ $curr_rc_name } -= $pending_this_analysis;
print "Scheduler detected $pending_this_analysis pending workers with resource_class_name=$curr_rc_name, adjusting for this value\n";
$pending_worker_counts_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name } -= $pending_this_analysis;
$workers_this_analysis -= $pending_this_analysis;
}
next unless($workers_this_analysis); # do not autovivify the hash by a zero
next unless($workers_this_analysis); # do not autovivify the output hash by a zero
$total_workers_to_run += $workers_this_analysis;
$workers_to_run_by_rc_name{ $curr_rc_name } += $workers_this_analysis;
$workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name } += $workers_this_analysis;
$total_workers_to_submit_by_meadow_type{ $this_meadow_type } += $workers_this_analysis;
$total_workers_to_submit += $workers_this_analysis;
$analysis_stats->print_stats();
printf("Scheduler suggests adding %d more workers of resource_class_name=%s for analysis_id=%d [%1.3f hive_load remaining]\n", $workers_this_analysis, $curr_rc_name, $analysis_stats->analysis_id, $available_load);
printf("Scheduler suggests adding $workers_this_analysis x $this_meadow_type:$curr_rc_name workers for '%s' [%.3f hive_load remaining]\n", $analysis->logic_name, $available_load);
}
printf("Scheduler suggests adding a total of %d workers [%1.5f hive_load remaining]\n", $total_workers_to_run, $available_load);
return \%workers_to_run_by_rc_name;
print ''.('-'x60)."\n";
foreach my $meadow_type (keys %total_workers_to_submit_by_meadow_type) {
print "Scheduler suggests submitting a total of $total_workers_to_submit_by_meadow_type{$meadow_type} workers to $meadow_type\n";
}
printf("The remaining hive_load after submitting these workers will be: %.3f\n", $available_load);
print ''.('='x60)."\n";
return (\%workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit);
}
sub schedule_workers_resync_if_necessary {
my ($self, $valley, $analysis) = @_;
my $submit_limit = $valley->config_get('SubmitWorkersMax');
my $meadow = $valley->get_current_meadow();
my $available_submit_limit = $valley->config_get('SubmitWorkersMax');
my $available_worker_slots_by_meadow_type = $valley->get_available_worker_slots_by_meadow_type();
my ($pending_worker_counts_by_meadow_type_rc_name, $total_pending_all_meadows) = $valley->get_pending_worker_counts_by_meadow_type_rc_name();
my $pending_by_rc_name = $meadow->can('count_pending_workers_by_rc_name') ? $meadow->count_pending_workers_by_rc_name() : {};
my $meadow_limit = ($meadow->can('count_running_workers') and defined($meadow->config_get('TotalRunningWorkersMax'))) ? $meadow->config_get('TotalRunningWorkersMax') - $meadow->count_running_workers : undef;
my $analysis_id2rc_id = $self->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
my $rc_id2name = $self->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
# combined mapping:
my %analysis_id2rc_name = map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id;
my $available_submit_limit = ($submit_limit and $meadow_limit)
? (($submit_limit<$meadow_limit) ? $submit_limit : $meadow_limit)
: (defined($submit_limit) ? $submit_limit : $meadow_limit);
my $default_meadow_type = $valley->get_default_meadow()->type;
my $workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_by_rc_name, $available_submit_limit);
my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
= $self->schedule_workers($analysis, $available_submit_limit, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, \%analysis_id2rc_name, $default_meadow_type);
unless( keys %$workers_to_run_by_rc_name or $self->get_hive_current_load() or $self->count_running_workers() ) {
unless( $total_workers_to_submit or $self->get_hive_current_load() or $self->count_running_workers() ) {
print "\nScheduler: nothing is running and nothing to do (according to analysis_stats) => executing garbage collection and sync\n" ;
$self->check_for_dead_workers($valley, 1);
$self->synchronize_hive($analysis);
$workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_by_rc_name, $available_submit_limit);
($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
= $self->schedule_workers($analysis, $available_submit_limit, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, \%analysis_id2rc_name, $default_meadow_type);
}
return $workers_to_run_by_rc_name;
return ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit);
}
......
......@@ -36,14 +36,14 @@ sub meadow_class_path {
sub new {
my ($class, $config, $current_meadow_type, $pipeline_name) = @_;
my ($class, $config, $default_meadow_type, $pipeline_name) = @_;
my $self = bless {}, $class;
$self->config( $config );
$self->context( [ 'Valley' ] );
my $amch = $self->available_meadow_hash( {} );
my $amh = $self->available_meadow_hash( {} );
# make sure modules are loaded and available ones are checked prior to setting the current one
foreach my $meadow_class (@{ find_submodules( $self->meadow_class_path ) }) {
......@@ -53,11 +53,11 @@ sub new {
$meadow_object->pipeline_name( $pipeline_name ) if($pipeline_name);
$amch->{$meadow_class->type} = $meadow_object;
$amh->{$meadow_class->type} = $meadow_object;
}
}
$self->set_current_meadow_type($current_meadow_type); # run this method even if $current_meadow_type was not specified
$self->set_default_meadow_type($default_meadow_type); # run this method even if $default_meadow_type was not specified
return $self;
}
......@@ -82,25 +82,25 @@ sub get_available_meadow_list { # this beautiful one-liner pushes $local to
}
sub set_current_meadow_type {
my ($self, $current_meadow_type) = @_;
sub set_default_meadow_type {
my ($self, $default_meadow_type) = @_;
if($current_meadow_type) {
if( my $current_meadow = $self->available_meadow_hash->{$current_meadow_type} ) { # store if available
$self->{_current_meadow} = $current_meadow;
if($default_meadow_type) {
if( my $default_meadow = $self->available_meadow_hash->{$default_meadow_type} ) { # store if available
$self->{_default_meadow} = $default_meadow;
} else {
die "Meadow '$current_meadow_type' does not seem to be available on this machine, please investigate";
die "Meadow '$default_meadow_type' does not seem to be available on this machine, please investigate";
}
} else {
$self->{_current_meadow} = $self->get_available_meadow_list->[0]; # take the first from preference list
$self->{_default_meadow} = $self->get_available_meadow_list->[0]; # take the first from preference list
}
}
sub get_current_meadow {
sub get_default_meadow {
my $self = shift @_;
return $self->{_current_meadow};
return $self->{_default_meadow};
}
......@@ -140,5 +140,36 @@ sub whereami {
}
sub get_pending_worker_counts_by_meadow_type_rc_name {
my $self = shift @_;
my %pending_counts = ();
my $total_pending_all_meadows = 0;
foreach my $meadow (@{ $self->get_available_meadow_list }) {
my ($pending_this_meadow_by_rc_name, $total_pending_this_meadow) = ($meadow->count_pending_workers_by_rc_name());
$pending_counts{ $meadow->type } = $pending_this_meadow_by_rc_name;
$total_pending_all_meadows += $total_pending_this_meadow;
}
return (\%pending_counts, $total_pending_all_meadows);
}
sub get_available_worker_slots_by_meadow_type {
my $self = shift @_;
my %available_worker_slots = ();
foreach my $meadow (@{ $self->get_available_meadow_list }) {
if( $meadow->can('count_running_workers') and defined($meadow->config_get('TotalRunningWorkersMax'))) {
$available_worker_slots{ $meadow->type } = $meadow->config_get('TotalRunningWorkersMax') - $meadow->count_running_workers;
}
}
return \%available_worker_slots;
}
1;
......@@ -29,7 +29,7 @@ sub main {
my $sync = 0;
my $local = 0;
my $show_failed_jobs = 0;
my $meadow_type = undef;
my $default_meadow_type = undef;
my $submit_workers_max = undef;
my $total_running_workers_max = undef;
my $submission_options = undef;
......@@ -72,7 +72,7 @@ sub main {
# meadow control
'local!' => \$local,
'meadow_type=s' => \$meadow_type,
'meadow_type=s' => \$default_meadow_type,
'total_running_workers_max=i' => \$total_running_workers_max,
'submit_workers_max=i' => \$submit_workers_max,
'submission_options=s' => \$submission_options,
......@@ -154,15 +154,15 @@ sub main {
$submit_workers_max = 1;
}
$meadow_type = 'LOCAL' if($local);
my $valley = Bio::EnsEMBL::Hive::Valley->new( $config, $meadow_type, $pipeline_name );
$default_meadow_type = 'LOCAL' if($local);
my $valley = Bio::EnsEMBL::Hive::Valley->new( $config, $default_meadow_type, $pipeline_name );
$valley->config_set('SubmitWorkersMax', $submit_workers_max) if(defined $submit_workers_max);
my $current_meadow = $valley->get_current_meadow();
warn "Current meadow: ".$current_meadow->signature."\n\n";
my $default_meadow = $valley->get_default_meadow();
warn "Default meadow: ".$default_meadow->signature."\n\n";
$current_meadow->config_set('TotalRunningWorkersMax', $total_running_workers_max) if(defined $total_running_workers_max);
$current_meadow->config_set('SubmissionOptions', $submission_options) if(defined $submission_options);
$default_meadow->config_set('TotalRunningWorkersMax', $total_running_workers_max) if(defined $total_running_workers_max);
$default_meadow->config_set('SubmissionOptions', $submission_options) if(defined $submission_options);
if($reset_job_id) { $queen->reset_job_by_dbID_and_sync($reset_job_id); }
......@@ -301,13 +301,13 @@ sub generate_worker_cmd {
sub run_autonomously {
my ($self, $max_loops, $keep_alive, $queen, $valley, $run_analysis, $run_job_id, $force) = @_;
my $current_meadow = $valley->get_current_meadow();
my $default_meadow = $valley->get_default_meadow();
my $worker_cmd = generate_worker_cmd($self, $run_analysis, $run_job_id, $force);
my $special_task = $run_analysis || $run_job_id;
# first, fetch two resource-related mappings from the database:
my $rc_name2id = $self->{'dba'}->get_ResourceClassAdaptor->fetch_HASHED_FROM_name_TO_resource_class_id();
my $rc_id2xparams = $self->{'dba'}->get_ResourceDescriptionAdaptor->fetch_by_meadow_type_HASHED_FROM_resource_class_id_TO_parameters($current_meadow->type());
my $rc_id2xparams = $self->{'dba'}->get_ResourceDescriptionAdaptor->fetch_by_meadow_type_HASHED_FROM_resource_class_id_TO_parameters($default_meadow->type());
# now, chain these two mappings together:
# FIXME: in future this mapping should be obtainable from the adaptor in one go.
......@@ -331,15 +331,21 @@ sub run_autonomously {
$queen->print_analysis_status unless($self->{'no_analysis_stats'});
$queen->print_running_worker_counts;
my $workers_to_run_by_rc_name = $queen->schedule_workers_resync_if_necessary($valley, $run_analysis);
my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
= $queen->schedule_workers_resync_if_necessary($valley, $run_analysis);
if(keys %$workers_to_run_by_rc_name) {
foreach my $rc_name ( sort { $workers_to_run_by_rc_name->{$a}<=>$workers_to_run_by_rc_name->{$b} } keys %$workers_to_run_by_rc_name) {
my $this_rc_worker_count = $workers_to_run_by_rc_name->{$rc_name};
if($total_workers_to_submit) {
foreach my $meadow_type (keys %$workers_to_submit_by_meadow_type_rc_name) {
print "Submitting $this_rc_worker_count workers (rc_name=$rc_name) to ".$current_meadow->signature()."\n";
my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
$current_meadow->submit_workers($worker_cmd.($special_task ? '' : " -rc_name $rc_name"), $this_rc_worker_count, $iteration, $rc_name, $rc_name2xparams{ $rc_name } || '');
foreach my $rc_name (keys %{ $workers_to_submit_by_meadow_type_rc_name->{$meadow_type} }) {
my $this_meadow_rc_worker_count = $workers_to_submit_by_meadow_type_rc_name->{$meadow_type}{$rc_name};
print "Submitting $this_meadow_rc_worker_count workers (rc_name=$rc_name) to ".$this_meadow->signature()."\n";
$this_meadow->submit_workers($worker_cmd.($special_task ? '' : " -rc_name $rc_name"), $this_meadow_rc_worker_count, $iteration, $rc_name, $rc_name2xparams{ $rc_name } || '');
}
}
} else {
print "Not submitting any workers this iteration\n";
......
......@@ -60,9 +60,9 @@ sub main {
min(swap), avg(swap), max(swap)
FROM analysis_base
JOIN resource_class rc USING(resource_class_id)
LEFT JOIN worker USING(analysis_id)
LEFT JOIN worker w USING(analysis_id)
LEFT JOIN lsf_report USING (process_id)
WHERE meadow_type='LSF'
WHERE w.meadow_type='LSF'
GROUP BY analysis_id
ORDER BY analysis_id;
});
......
# a new column for the multi-meadow scheduler:
ALTER TABLE analysis_base ADD COLUMN meadow_type varchar(40) DEFAULT NULL;
......@@ -50,6 +50,7 @@ CREATE TABLE analysis_base (
max_retry_count int(10) DEFAULT 3 NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL,
meadow_type varchar(40) DEFAULT NULL,
PRIMARY KEY (analysis_id),
UNIQUE KEY logic_name_idx (logic_name)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment