Scheduler.pm 10.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::Scheduler

=head1 DESCRIPTION

    Scheduler starts with the numbers of required workers for unblocked analyses,
    then goes through several kinds of restrictions (submit_limit, meadow_limits, hive_capacity, etc)
    that act as limiters and may cap the original numbers in several ways.
    The capped numbers are then grouped by meadow_type and rc_name and returned in a two-level hash.

14 15
=head1 LICENSE

16 17
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
    Copyright [2016] EMBL-European Bioinformatics Institute
18 19 20 21 22 23 24 25 26 27

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

28 29
=head1 CONTACT

30
  Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
31 32 33 34 35 36 37 38 39 40 41 42

=cut


package Bio::EnsEMBL::Hive::Scheduler;

use strict;
use warnings;

use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::Hive::Valley;
43
use Bio::EnsEMBL::Hive::Limiter;
44 45 46 47 48


sub schedule_workers_resync_if_necessary {
    my ($queen, $valley, $filter_analysis) = @_;

49 50 51
    my $submit_capacity                         = $valley->config_get('SubmitWorkersMax');
    my $default_meadow_type                     = $valley->get_default_meadow()->type;
    my $meadow_capacity_limiter_hashed_by_type  = $valley->get_meadow_capacity_hash_by_meadow_type();
52

53 54
    my $analysis_id2rc_id                       = $queen->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
    my $rc_id2name                              = $queen->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
55
        # combined mapping:
56
    my $analysis_id2rc_name                     = { map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id };
57

58
    my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer)
59
        = schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
60
    print $log_buffer;
61

62 63
    unless( $total_workers_required ) {
        print "\nScheduler: according to analysis_stats no workers are required... let's see if resync can fix it.\n" ;
64

65
            # FIXME: here is an (optimistic) assumption all Workers the Queen can see are reachable from the Valley:
66 67 68 69
        if( $queen->count_running_workers() != $valley->count_running_workers ) {
            print "Scheduler: mismatch between Queen's workers and Valley's workers detected, checking for dead workers...\n";
            $queen->check_for_dead_workers($valley, 1);
        }
70
        print "Scheduler: re-balancing of semaphore_counts...\n";
71
        $queen->db->get_AnalysisJobAdaptor->balance_semaphores($filter_analysis && $filter_analysis->dbID);
72
        print "Scheduler: re-synchronizing the Hive...\n";
73 74
        $queen->synchronize_hive($filter_analysis);

75
        ($workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer)
76
            = schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
77
        print $log_buffer;
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
    }

        # adjustment for pending workers:
    my ($pending_worker_counts_by_meadow_type_rc_name, $total_pending_all_meadows)  = $valley->get_pending_worker_counts_by_meadow_type_rc_name();

    while( my ($this_meadow_type, $partial_workers_to_submit_by_rc_name) = each %$workers_to_submit_by_meadow_type_rc_name) {
        while( my ($this_rc_name, $workers_to_submit_this_group) = each %$partial_workers_to_submit_by_rc_name) {
            if(my $pending_this_group = $pending_worker_counts_by_meadow_type_rc_name->{ $this_meadow_type }{ $this_rc_name }) {

                print "Scheduler was thinking of submitting $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers when it detected $pending_this_group pending in this group, ";

                if( $workers_to_submit_this_group > $pending_this_group) {
                    $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}   -= $pending_this_group; # adjust the hashed value
                    print "so is going to submit only ".$workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}." extra\n";
                } else {
                    delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name};                   # avoid leaving an empty group in the hash
                    print "so is not going to submit any extra\n";
                }
            } else {
                print "Scheduler is going to submit $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers\n";
            }
        }
100 101 102 103

        unless(keys %{ $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type} }) {  # if nothing has been scheduled for a meadow,
            delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type};          # do not mention the meadow in the hash
        }
104 105
    }

106
    return $workers_to_submit_by_meadow_type_rc_name;
107 108 109
}


110 111 112 113 114 115 116
sub suggest_analysis_to_specialize_by_rc_id_meadow_type {
    my ($queen, $filter_rc_id, $filter_meadow_type) = @_;

    return schedule_workers($queen, 1, $filter_meadow_type, $filter_rc_id, $filter_meadow_type);
}


117
sub schedule_workers {
118
    my ($queen, $submit_capacity, $default_meadow_type, $filter_rc_id, $filter_meadow_type, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name) = @_;
119 120 121

    my @suitable_analyses   = $filter_analysis
                                ? ( $filter_analysis->stats )
122
                                : @{ $queen->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id_meadow_type($filter_rc_id, $filter_meadow_type) };
123 124

    unless(@suitable_analyses) {
125
        return $analysis_id2rc_name ? ({}, 0, "Scheduler could not find any suitable analyses to start with\n") : undef;    # FIXME: returns data in different format in "suggest analysis" mode
126 127
    }

128
        # the pre-pending-adjusted outcome will be stored here:
129
    my %workers_to_submit_by_meadow_type_rc_name    = ();
130
    my $total_workers_required                      = 0;
131
    my $log_buffer                                  = '';
132

133 134
    my $submit_capacity_limiter                     = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers scheduled this time', $submit_capacity );
    my $queen_capacity_limiter                      = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->get_hive_current_load() );
135

136
    foreach my $analysis_stats (@suitable_analyses) {
137
        last if( $submit_capacity_limiter->reached );
138

139 140
        my $analysis            = $analysis_stats->get_analysis;    # FIXME: if it proves too expensive we may need to consider caching
        my $this_meadow_type    = $analysis->meadow_type || $default_meadow_type;
141

142
        next if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached );
143

144
            #digging deeper under the surface so need to sync:
145 146
        if( $analysis_stats->status =~ /^(LOADING|ALL_CLAIMED|BLOCKED|SYNCHING)$/ ) {
            $queen->safe_synchronize_AnalysisStats($analysis_stats);
147
        }
148
        next if( $analysis_stats->status =~ /^(BLOCKED|SYNCHING)$/ );
149

150
            # getting the initial worker requirement for this analysis (may be stale if not sync'ed recently)
151
        my $extra_workers_this_analysis = $analysis_stats->num_required_workers;
152 153 154

            # if this analysis doesn't require any extra workers - just skip it:
        next if ($extra_workers_this_analysis <= 0);
155

156 157
        $total_workers_required += $extra_workers_this_analysis;    # also keep the total number required so far (if nothing required we may need a resync later)

158
            # setting up all negotiating limiters:
159
        $queen_capacity_limiter->multiplier( $analysis_stats->hive_capacity );
160
        my @limiters = (
161 162 163 164 165
            $submit_capacity_limiter,
            $queen_capacity_limiter,
            $meadow_capacity_limiter_hashed_by_type
                ? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}
                : (),
166 167 168 169
            defined($analysis->analysis_capacity)
                ? Bio::EnsEMBL::Hive::Limiter->new( "Number of Workers working at '".$analysis->logic_name."' analysis",
                                                    $analysis->analysis_capacity - $analysis_stats->num_running_workers )
                : (),
170
        );
171

172
            # negotiations:
173
        foreach my $limiter (@limiters) {
174
            $extra_workers_this_analysis = $limiter->preliminary_offer( $extra_workers_this_analysis );
175
        }
176

177
            # do not continue with this analysis if limiters haven't agreed on a positive number:
178
        next if ($extra_workers_this_analysis <= 0);
179

180
            # let all parties know the final decision of negotiations:
181
        foreach my $limiter (@limiters) {
182
            $limiter->final_decision( $extra_workers_this_analysis );
183
        }
184

185 186 187 188 189 190 191 192 193 194 195
        if($analysis_id2rc_name) {
            my $this_rc_name    = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
            $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
            $log_buffer .= $analysis_stats->toString . "\n";
            $log_buffer .= sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n",
                $analysis->logic_name,
                $queen_capacity_limiter->available_capacity,
            );
        } else {
            return $analysis_stats;     # FIXME: returns data in different format in "suggest analysis" mode
        }
196 197
    }

198
    return (\%workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer);
199 200 201 202 203
}


1;