From 78a2310d45d493e895b4348c1fcd1040959183c2 Mon Sep 17 00:00:00 2001
From: Leo Gordon <lg4@ebi.ac.uk>
Date: Wed, 8 Jan 2014 11:30:32 +0000
Subject: [PATCH] Individual Worker's specialization is now routed through
 Scheduler's main subroutine, to enforce more checks

---
 modules/Bio/EnsEMBL/Hive/Queen.pm     | 20 +-------
 modules/Bio/EnsEMBL/Hive/Scheduler.pm | 67 ++++++++++++++++-----------
 2 files changed, 42 insertions(+), 45 deletions(-)

diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm
index 57c7d8fee..6d11ca1da 100644
--- a/modules/Bio/EnsEMBL/Hive/Queen.pm
+++ b/modules/Bio/EnsEMBL/Hive/Queen.pm
@@ -74,6 +74,7 @@ use Bio::EnsEMBL::Utils::Argument ('rearrange');
 use Bio::EnsEMBL::Hive::Utils ('destringify', 'dir_revhash');  # NB: needed by invisible code
 use Bio::EnsEMBL::Hive::AnalysisJob;
 use Bio::EnsEMBL::Hive::Worker;
+use Bio::EnsEMBL::Hive::Scheduler;
 
 use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
 
@@ -274,7 +275,7 @@ sub specialize_new_worker {
             }
         }
             # probably scheduled by beekeeper.pl:
-    } elsif( $stats = $self->suggest_analysis_to_specialize_by_rc_id_meadow_type($worker->resource_class_id, $worker->meadow_type) ) {
+    } elsif( $stats = Bio::EnsEMBL::Hive::Scheduler::suggest_analysis_to_specialize_by_rc_id_meadow_type($self, $worker->resource_class_id, $worker->meadow_type) ) {
 
         $worker->analysis( undef ); # make sure we reset anything that was there before
         $analysis_id = $stats->analysis_id;
@@ -833,21 +834,4 @@ sub register_all_workers_dead {
 }
 
 
-sub suggest_analysis_to_specialize_by_rc_id_meadow_type {
-    my $self                = shift;
-    my $rc_id               = shift;
-    my $meadow_type         = shift;
-
-    my @suitable_analyses = @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id_meadow_type( $rc_id, $meadow_type ) };
-
-    foreach my $stats (@suitable_analyses) {
-
-            #synchronize and double check that it can be run:
-        $self->safe_synchronize_AnalysisStats($stats);
-        return $stats if( ($stats->status ne 'BLOCKED') and ($stats->status ne 'SYNCHING') and ($stats->num_required_workers > 0) );
-    }
-
-    return undef;
-}
-
 1;
diff --git a/modules/Bio/EnsEMBL/Hive/Scheduler.pm b/modules/Bio/EnsEMBL/Hive/Scheduler.pm
index 2aefd4e18..0dd068854 100644
--- a/modules/Bio/EnsEMBL/Hive/Scheduler.pm
+++ b/modules/Bio/EnsEMBL/Hive/Scheduler.pm
@@ -38,7 +38,6 @@ use warnings;
 
 use Bio::EnsEMBL::Hive::Analysis;
 use Bio::EnsEMBL::Hive::AnalysisStats;
-use Bio::EnsEMBL::Hive::Queen;
 use Bio::EnsEMBL::Hive::Valley;
 use Bio::EnsEMBL::Hive::Limiter;
 
@@ -46,7 +45,9 @@ use Bio::EnsEMBL::Hive::Limiter;
 sub schedule_workers_resync_if_necessary {
     my ($queen, $valley, $filter_analysis) = @_;
 
-    my $meadow_capacity_by_type                 = $valley->get_meadow_capacity_hash_by_meadow_type();
+    my $submit_capacity                         = $valley->config_get('SubmitWorkersMax');
+    my $default_meadow_type                     = $valley->get_default_meadow()->type;
+    my $meadow_capacity_limiter_hashed_by_type  = $valley->get_meadow_capacity_hash_by_meadow_type();
 
     my $analysis_id2rc_id                       = $queen->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
     my $rc_id2name                              = $queen->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
@@ -54,12 +55,13 @@ sub schedule_workers_resync_if_necessary {
     my $analysis_id2rc_name                     = { map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id };
 
     my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer)
-        = schedule_workers($queen, $valley, $filter_analysis, $meadow_capacity_by_type, $analysis_id2rc_name);
+        = schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
     print $log_buffer;
 
     unless( $total_workers_required ) {
         print "\nScheduler: according to analysis_stats no workers are required... let's see if resync can fix it.\n" ;
 
+            # FIXME: here is an (optimistic) assumption all Workers the Queen can see are reachable from the Valley:
         if( $queen->count_running_workers() != $valley->count_running_workers ) {
             print "Scheduler: mismatch between Queen's workers and Valley's workers detected, checking for dead workers...\n";
             $queen->check_for_dead_workers($valley, 1);
@@ -70,7 +72,7 @@ sub schedule_workers_resync_if_necessary {
         $queen->synchronize_hive($filter_analysis);
 
         ($workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer)
-            = schedule_workers($queen, $valley, $filter_analysis, $meadow_capacity_by_type, $analysis_id2rc_name);
+            = schedule_workers($queen, $submit_capacity, $default_meadow_type, undef, undef, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
         print $log_buffer;
     }
 
@@ -104,15 +106,22 @@ sub schedule_workers_resync_if_necessary {
 }
 
 
+sub suggest_analysis_to_specialize_by_rc_id_meadow_type {
+    my ($queen, $filter_rc_id, $filter_meadow_type) = @_;
+
+    return schedule_workers($queen, 1, $filter_meadow_type, $filter_rc_id, $filter_meadow_type);
+}
+
+
 sub schedule_workers {
-    my ($queen, $valley, $filter_analysis, $meadow_capacity_by_type, $analysis_id2rc_name) = @_;
+    my ($queen, $submit_capacity, $default_meadow_type, $filter_rc_id, $filter_meadow_type, $filter_analysis, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name) = @_;
 
     my @suitable_analyses   = $filter_analysis
                                 ? ( $filter_analysis->stats )
-                                : @{ $queen->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id_meadow_type() };
+                                : @{ $queen->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id_meadow_type($filter_rc_id, $filter_meadow_type) };
 
     unless(@suitable_analyses) {
-        return ({}, 0, "Scheduler could not find any suitable analyses to start with\n");
+        return $analysis_id2rc_name ? ({}, 0, "Scheduler could not find any suitable analyses to start with\n") : undef;    # FIXME: returns data in different format in "suggest analysis" mode
     }
 
         # the pre-pending-adjusted outcome will be stored here:
@@ -120,24 +129,22 @@ sub schedule_workers {
     my $total_workers_required                      = 0;
     my $log_buffer                                  = '';
 
-    my $default_meadow_type                         = $valley->get_default_meadow()->type;
-
-    my $submit_capacity                             = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers submitted this iteration', $valley->config_get('SubmitWorkersMax') );
-    my $queen_capacity                              = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->get_hive_current_load() );
+    my $submit_capacity_limiter                     = Bio::EnsEMBL::Hive::Limiter->new( 'Max number of Workers scheduled this time', $submit_capacity );
+    my $queen_capacity_limiter                      = Bio::EnsEMBL::Hive::Limiter->new( 'Total reciprocal capacity of the Hive', 1.0 - $queen->get_hive_current_load() );
 
     foreach my $analysis_stats (@suitable_analyses) {
-        last if( $submit_capacity->reached );
+        last if( $submit_capacity_limiter->reached );
 
         my $analysis            = $analysis_stats->get_analysis;    # FIXME: if it proves too expensive we may need to consider caching
         my $this_meadow_type    = $analysis->meadow_type || $default_meadow_type;
 
-        next if( $meadow_capacity_by_type->{$this_meadow_type}->reached );
+        next if( $meadow_capacity_limiter_hashed_by_type && $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}->reached );
 
             #digging deeper under the surface so need to sync:
-        if(($analysis_stats->status eq 'LOADING') or ($analysis_stats->status eq 'BLOCKED') or ($analysis_stats->status eq 'ALL_CLAIMED')) {
-            $queen->synchronize_AnalysisStats($analysis_stats);
+        if( $analysis_stats->status =~ /^(LOADING|ALL_CLAIMED|BLOCKED|SYNCHING)$/ ) {
+            $queen->safe_synchronize_AnalysisStats($analysis_stats);
         }
-        next if($analysis_stats->status eq 'BLOCKED');
+        next if( $analysis_stats->status =~ /^(BLOCKED|SYNCHING)$/ );
 
             # getting the initial worker requirement for this analysis (may be stale if not sync'ed recently)
         my $extra_workers_this_analysis = $analysis_stats->num_required_workers;
@@ -148,11 +155,13 @@ sub schedule_workers {
         $total_workers_required += $extra_workers_this_analysis;    # also keep the total number required so far (if nothing required we may need a resync later)
 
             # setting up all negotiating limiters:
-        $queen_capacity->multiplier( $analysis_stats->hive_capacity );
+        $queen_capacity_limiter->multiplier( $analysis_stats->hive_capacity );
         my @limiters = (
-            $submit_capacity,
-            $queen_capacity,
-            $meadow_capacity_by_type->{$this_meadow_type},
+            $submit_capacity_limiter,
+            $queen_capacity_limiter,
+            $meadow_capacity_limiter_hashed_by_type
+                ? $meadow_capacity_limiter_hashed_by_type->{$this_meadow_type}
+                : (),
             defined($analysis->analysis_capacity)
                 ? Bio::EnsEMBL::Hive::Limiter->new( "Number of Workers working at '".$analysis->logic_name."' analysis",
                                                     $analysis->analysis_capacity - $analysis_stats->num_running_workers )
@@ -172,13 +181,17 @@ sub schedule_workers {
             $limiter->final_decision( $extra_workers_this_analysis );
         }
 
-        my $this_rc_name    = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
-        $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
-        $log_buffer .= $analysis_stats->toString . "\n";
-        $log_buffer .= sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n",
-            $analysis->logic_name,
-            $queen_capacity->available_capacity,
-        );
+        if($analysis_id2rc_name) {
+            my $this_rc_name    = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
+            $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $extra_workers_this_analysis;
+            $log_buffer .= $analysis_stats->toString . "\n";
+            $log_buffer .= sprintf("Before checking the Valley for pending jobs, Scheduler allocated $extra_workers_this_analysis x $this_meadow_type:$this_rc_name extra workers for '%s' [%.4f hive_load remaining]\n",
+                $analysis->logic_name,
+                $queen_capacity_limiter->available_capacity,
+            );
+        } else {
+            return $analysis_stats;     # FIXME: returns data in different format in "suggest analysis" mode
+        }
     }
 
     return (\%workers_to_submit_by_meadow_type_rc_name, $total_workers_required, $log_buffer);
-- 
GitLab