Scheduler.pm 10.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::Scheduler

=head1 DESCRIPTION

    Scheduler starts with the numbers of required workers for unblocked analyses,
    then goes through several kinds of restrictions (submit_limit, meadow_limits, hive_capacity, etc)
    that act as limiters and may cap the original numbers in several ways.
    The capped numbers are then grouped by meadow_type and rc_name and returned in a two-level hash.

14 15
=head1 LICENSE

16
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Matthieu Muffato's avatar
Matthieu Muffato committed
17
    Copyright [2016-2018] EMBL-European Bioinformatics Institute
18 19 20 21 22 23 24 25 26 27

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

28 29
=head1 CONTACT

30
  Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
31 32 33 34 35 36 37 38 39 40 41 42

=cut


package Bio::EnsEMBL::Hive::Scheduler;

use strict;
use warnings;

use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::Hive::Valley;
43
use Bio::EnsEMBL::Hive::Limiter;
44 45 46 47 48


sub schedule_workers_resync_if_necessary {
    my ($queen, $valley, $filter_analysis) = @_;

49 50 51
    my $submit_capacity                         = $valley->config_get('SubmitWorkersMax');
    my $default_meadow_type                     = $valley->get_default_meadow()->type;
    my $meadow_capacity_limiter_hashed_by_type  = $valley->get_meadow_capacity_hash_by_meadow_type();
52

53 54
    my $analysis_id2rc_id                       = $queen->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
    my $rc_id2name                              = $queen->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
55
        # combined mapping:
56
    my $analysis_id2rc_name                     = { map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id };
57

58
    my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer)
59
        = schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
60
    print $log_buffer;
61

62 63
    unless( $total_workers_required ) {
        print "\nScheduler: according to analysis_stats no workers are required... let's see if resync can fix it.\n" ;
64

65
            # FIXME: here is an (optimistic) assumption all Workers the Queen can see are reachable from the Valley:
66 67 68 69
        if( $queen->count_running_workers() != $valley->count_running_workers ) {
            print "Scheduler: mismatch between Queen's workers and Valley's workers detected, checking for dead workers...\n";
            $queen->check_for_dead_workers($valley, 1);
        }
70
        print "Scheduler: re-balancing of semaphore_counts...\n";
71
        $queen->db->get_AnalysisJobAdaptor->balance_semaphores($filter_analysis && $filter_analysis->dbID);
72
        print "Scheduler: re-synchronizing the Hive...\n";
73 74
        $queen->synchronize_hive($filter_analysis);

75
        ($workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer)
76
            = schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
77
        print $log_buffer;
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
    }

        # adjustment for pending workers:
    my ($pending_worker_counts_by_meadow_type_rc_name, $total_pending_all_meadows)  = $valley->get_pending_worker_counts_by_meadow_type_rc_name();

    while( my ($this_meadow_type, $partial_workers_to_submit_by_rc_name) = each %$workers_to_submit_by_meadow_type_rc_name) {
        while( my ($this_rc_name, $workers_to_submit_this_group) = each %$partial_workers_to_submit_by_rc_name) {
            if(my $pending_this_group = $pending_worker_counts_by_meadow_type_rc_name->{ $this_meadow_type }{ $this_rc_name }) {

                print "Scheduler was thinking of submitting $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers when it detected $pending_this_group pending in this group, ";

                if( $workers_to_submit_this_group > $pending_this_group) {
                    $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}   -= $pending_this_group; # adjust the hashed value
                    print "so is going to submit only ".$workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}." extra\n";
                } else {
                    delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name};                   # avoid leaving an empty group in the hash
                    print "so is not going to submit any extra\n";
                }
            } else {
                print "Scheduler is going to submit $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers\n";
            }
        }
100 101 102 103

        unless(keys %{ $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type} }) {  # if nothing has been scheduled for a meadow,
            delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type};          # do not mention the meadow in the hash
        }
104 105
    }

106
    return $workers_to_submit_by_meadow_type_rc_name;
107 108 109
}


110 111 112 113 114 115 116
sub suggest_analysis_to_specialize_by_rc_id_meadow_type {
    my ($queen, $filter_rc_id, $filter_meadow_type) = @_;

    return schedule_workers($queen, 1, $filter_meadow_type, $filter_rc_id, $filter_meadow_type);
}


117
sub schedule_workers {
118
    my ($queen, $submit_capacity, $default_meadow_type, $filter_rc_id, $filter_meadow_type, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name) = @_;
119 120 121

    my @suitable_analyses   = $filter_analysis
                                ? ( $filter_analysis->stats )
122
                                : @{ $queen->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id_meadow_type($filter_rc_id, $filter_meadow_type) };
123 124

    unless(@suitable_analyses) {
125
        return $analysis_id2rc_name ? ({}, 0, "Scheduler could not find any suitable analyses to start with\n") : undef;    # FIXME: returns data in different format in "suggest analysis" mode
126 127
    }

128
        # the pre-pending-adjusted outcome will be stored here:
129
    my %workers_to_submit_by_meadow_type_rc_name    = ();
130
    my $total_workers_required                      = 0;
131
    my $log_buffer                                  = '';
132

133 134
    my $submit_capacity_limiter                     = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers scheduled this time', $submit_capacity );
    my $queen_capacity_limiter                      = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->get_hive_current_load() );
135

136
    foreach my $analysis_stats (@suitable_analyses) {
137
        last if( $submit_capacity_limiter->reached );
138

139 140
        my $analysis            = $analysis_stats->get_analysis;    # FIXME: if it proves too expensive we may need to consider caching
        my $this_meadow_type    = $analysis->meadow_type || $default_meadow_type;
141

142 143 144
            # This meadow type is not available
        next if( $meadow_capacity_limiter_hashed_by_type && !$meadow_capacity_limiter_hashed_by_type->{$this_meadow_type} );
            # This meadow type is available but has reached its full capacity
145
        next if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached );
146

147
            #digging deeper under the surface so need to sync:
148 149
        if( $analysis_stats->status =~ /^(LOADING|ALL_CLAIMED|BLOCKED|SYNCHING)$/ ) {
            $queen->safe_synchronize_AnalysisStats($analysis_stats);
150
        }
151
        next if( $analysis_stats->status =~ /^(BLOCKED|SYNCHING)$/ );
152

153
            # getting the initial worker requirement for this analysis (may be stale if not sync'ed recently)
154
        my $extra_workers_this_analysis = $analysis_stats->num_required_workers;
155 156 157

            # if this analysis doesn't require any extra workers - just skip it:
        next if ($extra_workers_this_analysis <= 0);
158

159 160
        $total_workers_required += $extra_workers_this_analysis;    # also keep the total number required so far (if nothing required we may need a resync later)

161
            # setting up all negotiating limiters:
162
        $queen_capacity_limiter->multiplier( $analysis_stats->hive_capacity );
163
        my @limiters = (
164 165 166 167 168
            $submit_capacity_limiter,
            $queen_capacity_limiter,
            $meadow_capacity_limiter_hashed_by_type
                ? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}
                : (),
169 170 171 172
            defined($analysis->analysis_capacity)
                ? Bio::EnsEMBL::Hive::Limiter->new( "Number of Workers working at '".$analysis->logic_name."' analysis",
                                                    $analysis->analysis_capacity - $analysis_stats->num_running_workers )
                : (),
173
        );
174

175
            # negotiations:
176
        foreach my $limiter (@limiters) {
177
            $extra_workers_this_analysis = $limiter->preliminary_offer( $extra_workers_this_analysis );
178
        }
179

180
            # do not continue with this analysis if limiters haven't agreed on a positive number:
181
        next if ($extra_workers_this_analysis <= 0);
182

183
            # let all parties know the final decision of negotiations:
184
        foreach my $limiter (@limiters) {
185
            $limiter->final_decision( $extra_workers_this_analysis );
186
        }
187

188 189 190 191 192 193 194 195 196 197 198
        if($analysis_id2rc_name) {
            my $this_rc_name    = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
            $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
            $log_buffer .= $analysis_stats->toString . "\n";
            $log_buffer .= sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n",
                $analysis->logic_name,
                $queen_capacity_limiter->available_capacity,
            );
        } else {
            return $analysis_stats;     # FIXME: returns data in different format in "suggest analysis" mode
        }
199 200
    }

201
    return (\%workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer);
202 203 204 205 206
}


1;