Scheduler.pm 10.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::Scheduler

=head1 DESCRIPTION

    Scheduler starts with the numbers of required workers for unblocked analyses,
    then goes through several kinds of restrictions (submit_limit, meadow_limits, hive_capacity, etc)
    that act as limiters and may cap the original numbers in several ways.
    The capped numbers are then grouped by meadow_type and rc_name and returned in a two-level hash.

14 15
=head1 LICENSE

16
    Copyright [1999-2014] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
17 18 19 20 21 22 23 24 25 26

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
=head1 CONTACT

  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.

=cut


package Bio::EnsEMBL::Hive::Scheduler;

use strict;
use warnings;

use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::Hive::Valley;
42
use Bio::EnsEMBL::Hive::Limiter;
43 44 45 46 47


sub schedule_workers_resync_if_necessary {
    my ($queen, $valley, $filter_analysis) = @_;

48 49 50
    my $submit_capacity                         = $valley->config_get('SubmitWorkersMax');
    my $default_meadow_type                     = $valley->get_default_meadow()->type;
    my $meadow_capacity_limiter_hashed_by_type  = $valley->get_meadow_capacity_hash_by_meadow_type();
51

52 53
    my $analysis_id2rc_id                       = $queen->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
    my $rc_id2name                              = $queen->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
54
        # combined mapping:
55
    my $analysis_id2rc_name                     = { map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id };
56

57
    my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer)
58
        = schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
59
    print $log_buffer;
60

61 62
    unless( $total_workers_required ) {
        print "\nScheduler: according to analysis_stats no workers are required... let's see if resync can fix it.\n" ;
63

64
            # FIXME: here is an (optimistic) assumption all Workers the Queen can see are reachable from the Valley:
65 66 67 68
        if( $queen->count_running_workers() != $valley->count_running_workers ) {
            print "Scheduler: mismatch between Queen's workers and Valley's workers detected, checking for dead workers...\n";
            $queen->check_for_dead_workers($valley, 1);
        }
69
        print "Scheduler: re-balancing of semaphore_counts...\n";
70
        $queen->db->get_AnalysisJobAdaptor->balance_semaphores($filter_analysis && $filter_analysis->dbID);
71
        print "Scheduler: re-synchronizing the Hive...\n";
72 73
        $queen->synchronize_hive($filter_analysis);

74
        ($workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer)
75
            = schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
76
        print $log_buffer;
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
    }

        # adjustment for pending workers:
    my ($pending_worker_counts_by_meadow_type_rc_name, $total_pending_all_meadows)  = $valley->get_pending_worker_counts_by_meadow_type_rc_name();

    while( my ($this_meadow_type, $partial_workers_to_submit_by_rc_name) = each %$workers_to_submit_by_meadow_type_rc_name) {
        while( my ($this_rc_name, $workers_to_submit_this_group) = each %$partial_workers_to_submit_by_rc_name) {
            if(my $pending_this_group = $pending_worker_counts_by_meadow_type_rc_name->{ $this_meadow_type }{ $this_rc_name }) {

                print "Scheduler was thinking of submitting $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers when it detected $pending_this_group pending in this group, ";

                if( $workers_to_submit_this_group > $pending_this_group) {
                    $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}   -= $pending_this_group; # adjust the hashed value
                    print "so is going to submit only ".$workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name}." extra\n";
                } else {
                    delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type}{$this_rc_name};                   # avoid leaving an empty group in the hash
                    print "so is not going to submit any extra\n";
                }
            } else {
                print "Scheduler is going to submit $workers_to_submit_this_group x $this_meadow_type:$this_rc_name workers\n";
            }
        }
99 100 101 102

        unless(keys %{ $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type} }) {  # if nothing has been scheduled for a meadow,
            delete $workers_to_submit_by_meadow_type_rc_name->{$this_meadow_type};          # do not mention the meadow in the hash
        }
103 104
    }

105
    return $workers_to_submit_by_meadow_type_rc_name;
106 107 108
}


109 110 111 112 113 114 115
sub suggest_analysis_to_specialize_by_rc_id_meadow_type {
    my ($queen, $filter_rc_id, $filter_meadow_type) = @_;

    return schedule_workers($queen, 1, $filter_meadow_type, $filter_rc_id, $filter_meadow_type);
}


116
sub schedule_workers {
117
    my ($queen, $submit_capacity, $default_meadow_type, $filter_rc_id, $filter_meadow_type, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name) = @_;
118 119 120

    my @suitable_analyses   = $filter_analysis
                                ? ( $filter_analysis->stats )
121
                                : @{ $queen->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id_meadow_type($filter_rc_id, $filter_meadow_type) };
122 123

    unless(@suitable_analyses) {
124
        return $analysis_id2rc_name ? ({}, 0, "Scheduler could not find any suitable analyses to start with\n") : undef;    # FIXME: returns data in different format in "suggest analysis" mode
125 126
    }

127
        # the pre-pending-adjusted outcome will be stored here:
128
    my %workers_to_submit_by_meadow_type_rc_name    = ();
129
    my $total_workers_required                      = 0;
130
    my $log_buffer                                  = '';
131

132 133
    my $submit_capacity_limiter                     = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers scheduled this time', $submit_capacity );
    my $queen_capacity_limiter                      = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->get_hive_current_load() );
134

135
    foreach my $analysis_stats (@suitable_analyses) {
136
        last if( $submit_capacity_limiter->reached );
137

138 139
        my $analysis            = $analysis_stats->get_analysis;    # FIXME: if it proves too expensive we may need to consider caching
        my $this_meadow_type    = $analysis->meadow_type || $default_meadow_type;
140

141
        next if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached );
142

143
            #digging deeper under the surface so need to sync:
144 145
        if( $analysis_stats->status =~ /^(LOADING|ALL_CLAIMED|BLOCKED|SYNCHING)$/ ) {
            $queen->safe_synchronize_AnalysisStats($analysis_stats);
146
        }
147
        next if( $analysis_stats->status =~ /^(BLOCKED|SYNCHING)$/ );
148

149
            # getting the initial worker requirement for this analysis (may be stale if not sync'ed recently)
150
        my $extra_workers_this_analysis = $analysis_stats->num_required_workers;
151 152 153

            # if this analysis doesn't require any extra workers - just skip it:
        next if ($extra_workers_this_analysis <= 0);
154

155 156
        $total_workers_required += $extra_workers_this_analysis;    # also keep the total number required so far (if nothing required we may need a resync later)

157
            # setting up all negotiating limiters:
158
        $queen_capacity_limiter->multiplier( $analysis_stats->hive_capacity );
159
        my @limiters = (
160 161 162 163 164
            $submit_capacity_limiter,
            $queen_capacity_limiter,
            $meadow_capacity_limiter_hashed_by_type
                ? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}
                : (),
165 166 167 168
            defined($analysis->analysis_capacity)
                ? Bio::EnsEMBL::Hive::Limiter->new( "Number of Workers working at '".$analysis->logic_name."' analysis",
                                                    $analysis->analysis_capacity - $analysis_stats->num_running_workers )
                : (),
169
        );
170

171
            # negotiations:
172
        foreach my $limiter (@limiters) {
173
            $extra_workers_this_analysis = $limiter->preliminary_offer( $extra_workers_this_analysis );
174
        }
175

176
            # do not continue with this analysis if limiters haven't agreed on a positive number:
177
        next if ($extra_workers_this_analysis <= 0);
178

179
            # let all parties know the final decision of negotiations:
180
        foreach my $limiter (@limiters) {
181
            $limiter->final_decision( $extra_workers_this_analysis );
182
        }
183

184 185 186 187 188 189 190 191 192 193 194
        if($analysis_id2rc_name) {
            my $this_rc_name    = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
            $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
            $log_buffer .= $analysis_stats->toString . "\n";
            $log_buffer .= sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n",
                $analysis->logic_name,
                $queen_capacity_limiter->available_capacity,
            );
        } else {
            return $analysis_stats;     # FIXME: returns data in different format in "suggest analysis" mode
        }
195 196
    }

197
    return (\%workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer);
198 199 200 201 202
}


1;