my ($queen, $valley, $filter_analysis) = @_;
my $submit_capacity = $valley->config_get('SubmitWorkersMax');
my $default_meadow_type = $valley->get_default_meadow()->type;
my $meadow_capacity_limiter_hashed_by_type = $valley->get_meadow_capacity_hash_by_meadow_type();
my $analysis_id2rc_id = $queen->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
my $rc_id2name = $queen->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
# combined mapping:
my $analysis_id2rc_name = { map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id };
my ($workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
=
schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
print $log_buffer;
unless( $total_extra_workers_required ) {
print "\nScheduler: according to analysis_stats no workers are required... let's see if resync can fix it.\n" ;
# FIXME: here is an (optimistic) assumption all Workers the DB knows about are reachable from the Valley:
if( $queen->db->get_RoleAdaptor->count_active_roles() != $valley->count_running_workers ) {
print "Scheduler: mismatch between DB's active Roles and Valley's running Workers detected, checking for dead workers...\n";
$queen->check_for_dead_workers($valley, 1);
}
print "Scheduler: re-synchronizing the Hive...\n";
$queen->synchronize_hive($filter_analysis);
if( $queen->db->hive_auto_rebalance_semaphores ) { # make sure rebalancing only ever happens for the pipelines that asked for it
if( $queen->check_nothing_to_run_but_semaphored ) { # and double-check on our side
print "Scheduler: looks like we may need re-balancing semaphore_counts...\n";
if( my $rebalanced_jobs_counter = $queen->db->get_AnalysisJobAdaptor->balance_semaphores($filter_analysis && $filter_analysis->dbID) ) {
print "Scheduler: re-balanced $rebalanced_jobs_counter jobs, going through another re-synchronization of the Hive...\n";
$queen->synchronize_hive($filter_analysis);
} else {
print "Scheduler: hmmm... managed to re-balance 0 jobs, you may need to investigate further.\n";
}
} else {
print "Scheduler: apparently there are no semaphored jobs that may need to be re-balanced at this time.\n";
}
} else {
print "Scheduler: automatic re-balancing of semaphore_counts is off by default. If you think your pipeline might benefit from it, set hive_auto_rebalance_semaphores => 1 in the PipeConfig's hive_meta_table.\n";
}
($workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
=
schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
print $log_buffer;
}
# adjustment for pending workers:
my ($pending_worker_counts_by_meadow_type_rc_name, $total_pending_all_meadows) = $valley->get_pending_worker_counts_by_meadow_type_rc_name();
while( my ($this_meadow_type, $partial_workers_to_submit_by_rc_name) = each %$workers_to_submit_by_meadow_type_rc_name) {
while( my ($this_rc_name, $workers_to_submit_this_group) = each %$partial_workers_to_submit_by_rc_name) {
if(my $pending_this_group = $pending_worker_counts_by_meadow_type_rc_name->{ $this_meadow_type }{ $this_rc_name }) {
print "Scheduler was thinking of submitting $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers when it detected $pending_this_group pending in this group, ";
if( $workers_to_submit_this_group > $pending_this_group) {
$workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name} -= $pending_this_group; # adjust the hashed value
print "so is going to submit only ".$workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}." extra\n";
} else {
delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}; # avoid leaving an empty group in the hash
print "so is not going to submit any extra\n";
}
} else {
print "Scheduler is going to submit $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers\n";
}
}
unless(keys %{ $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type} }) { # if nothing has been scheduled for a meadow,
delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}; # do not mention the meadow in the hash
}
}
return $workers_to_submit_by_meadow_type_rc_name;
}