From 55de1985a56299e8eb97897ddaaa6d4f732fdca7 Mon Sep 17 00:00:00 2001
From: Leo Gordon <>
Date: Tue, 20 Nov 2012 15:35:44 +0000
Subject: [PATCH] separated the Scheduler's code into a separate module (not an
 object yet)

 modules/Bio/EnsEMBL/Hive/     | 137 +----------------------
 modules/Bio/EnsEMBL/Hive/ | 154 ++++++++++++++++++++++++++
 scripts/                  |   5 +-
 3 files changed, 159 insertions(+), 137 deletions(-)
 create mode 100644 modules/Bio/EnsEMBL/Hive/

diff --git a/modules/Bio/EnsEMBL/Hive/ b/modules/Bio/EnsEMBL/Hive/
index 00180209b..aa56dc62a 100644
--- a/modules/Bio/EnsEMBL/Hive/
+++ b/modules/Bio/EnsEMBL/Hive/
@@ -258,9 +258,9 @@ sub specialize_new_worker {
     } else {    # probably scheduled by
         $stats = $self->suggest_analysis_to_specialize_by_rc_id($worker->resource_class_id)
-            or die "Scheduler failed to pick an analysis for the worker";
+            or die "Queen failed to pick an analysis for the worker";
-        print "Scheduler picked analysis with dbID=".$stats->analysis_id." for the worker\n";
+        print "Queen picked analysis with dbID=".$stats->analysis_id." for the worker\n";
         $analysis_id = $stats->analysis_id;
@@ -725,139 +725,6 @@ sub count_running_workers {
-=head2 schedule_workers
-  Arg[1]     : Bio::EnsEMBL::Hive::Analysis object (optional)
-  Example    : $count = $queen->schedule_workers();
-  Description: Runs through the analyses in the system which are waiting
-               for workers to be created for them.  Calculates the maximum
-               number of workers needed to fill the current needs of the system
-               If Arg[1] is defined, does it only for the given analysis.
-  Exceptions : none
-  Caller     : beekeepers and other external processes
-sub schedule_workers {
-    my ($self, $filter_analysis, $available_submit_limit, $available_worker_slots_by_meadow_type, $orig_pending_worker_counts_by_meadow_type_rc_name, $analysis_id2rc_name, $default_meadow_type) = @_;
-    my @suitable_analyses   = $filter_analysis
-                                ? ( $filter_analysis->stats )
-                                : @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id() };
-    unless(@suitable_analyses) {
-        print "Scheduler could not find any suitable analyses to start with\n";
-        return ({}, 0);
-    }
-    my %workers_to_submit_by_meadow_type_rc_name    = ();
-    my %total_workers_to_submit_by_meadow_type      = ();
-    my %pending_worker_counts_by_meadow_type_rc_name= %{ clone $orig_pending_worker_counts_by_meadow_type_rc_name };    # we need a deep disposable copy here
-    my $total_workers_to_submit                     = 0;
-    my $available_load                              = 1.0 - $self->get_hive_current_load();
-  foreach my $analysis_stats (@suitable_analyses) {
-    last if ($available_load <= 0.0);
-    my $analysis = $analysis_stats->get_analysis();         # FIXME: if it proves too expensive we may need to consider caching
-    my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
-    if( defined(my $meadow_limit = $available_worker_slots_by_meadow_type->{ $this_meadow_type }) ) {
-        $available_submit_limit = defined($available_submit_limit)
-                                    ? (($available_submit_limit<$meadow_limit) ? $available_submit_limit : $meadow_limit)
-                                    : $meadow_limit;
-    }
-    last if (defined($available_submit_limit) and !$available_submit_limit);
-        #digging deeper under the surface so need to sync
-    if(($analysis_stats->status eq 'LOADING') or ($analysis_stats->status eq 'BLOCKED') or ($analysis_stats->status eq 'ALL_CLAIMED')) {
-      $self->synchronize_AnalysisStats($analysis_stats);
-    }
-    next if($analysis_stats->status eq 'BLOCKED');
-        # FIXME: the following call *sometimes* returns a stale number greater than the number of workers actually needed for an analysis; -sync fixes it
-    my $workers_this_analysis = $analysis_stats->num_required_workers
-        or next;
-    if(defined($available_submit_limit)) {                              # available_submit_limit total capping, if available
-        if($workers_this_analysis > $available_submit_limit) {
-            $workers_this_analysis = $available_submit_limit;
-        }
-        $available_submit_limit -= $workers_this_analysis;
-    }
-    if((my $hive_capacity = $analysis_stats->hive_capacity) > 0) {      # per-analysis hive_capacity capping, if available
-        my $remaining_capacity_for_this_analysis = int($available_load * $hive_capacity);
-        if($workers_this_analysis > $remaining_capacity_for_this_analysis) {
-            $workers_this_analysis = $remaining_capacity_for_this_analysis;
-        }
-        $available_load -= 1.0*$workers_this_analysis/$hive_capacity;
-    }
-    my $curr_rc_name    = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
-    if(my $pending_this_meadow_type_and_rc_name = $pending_worker_counts_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available
-        my $pending_this_analysis = ($pending_this_meadow_type_and_rc_name < $workers_this_analysis) ? $pending_this_meadow_type_and_rc_name : $workers_this_analysis;
-        print "Scheduler detected $pending_this_analysis pending workers with resource_class_name=$curr_rc_name, adjusting for this value\n";
-        $pending_worker_counts_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name } -= $pending_this_analysis;
-        $workers_this_analysis                                                              -= $pending_this_analysis;
-    }
-    next unless($workers_this_analysis);    # do not autovivify the output hash by a zero
-    $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name } += $workers_this_analysis;
-    $total_workers_to_submit_by_meadow_type{ $this_meadow_type }                    += $workers_this_analysis;
-    $total_workers_to_submit                                                        += $workers_this_analysis;
-    $analysis_stats->print_stats();
-    printf("Scheduler suggests adding $workers_this_analysis x $this_meadow_type:$curr_rc_name workers for '%s' [%.3f hive_load remaining]\n", $analysis->logic_name, $available_load);
-  }
-    print ''.('-'x60)."\n";
-    foreach my $meadow_type (keys %total_workers_to_submit_by_meadow_type) {
-        print "Scheduler suggests submitting a total of $total_workers_to_submit_by_meadow_type{$meadow_type} workers to $meadow_type\n";
-    }
-    printf("The remaining hive_load after submitting these workers will be: %.3f\n", $available_load);
-    print ''.('='x60)."\n";
-    return (\%workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit);
-sub schedule_workers_resync_if_necessary {
-    my ($self, $valley, $analysis) = @_;
-    my $available_submit_limit                                                      = $valley->config_get('SubmitWorkersMax');
-    my $available_worker_slots_by_meadow_type                                       = $valley->get_available_worker_slots_by_meadow_type();
-    my ($pending_worker_counts_by_meadow_type_rc_name, $total_pending_all_meadows)  = $valley->get_pending_worker_counts_by_meadow_type_rc_name();
-    my $analysis_id2rc_id         = $self->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
-    my $rc_id2name                = $self->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
-        # combined mapping:
-    my %analysis_id2rc_name       = map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id;
-    my $default_meadow_type       = $valley->get_default_meadow()->type;
-    my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
-        = $self->schedule_workers($analysis, $available_submit_limit, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, \%analysis_id2rc_name, $default_meadow_type);
-    unless( $total_workers_to_submit or $self->get_hive_current_load() or $self->count_running_workers() ) {
-        print "\nScheduler: nothing is running and nothing to do (according to analysis_stats) => executing garbage collection and sync\n" ;
-        $self->check_for_dead_workers($valley, 1);
-        $self->synchronize_hive($analysis);
-        ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
-            = $self->schedule_workers($analysis, $available_submit_limit, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, \%analysis_id2rc_name, $default_meadow_type);
-    }
-    return ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit);
 sub get_remaining_jobs_show_hive_progress {
   my $self = shift;
   my $sql = "SELECT sum(done_job_count), sum(failed_job_count), sum(total_job_count), ".
diff --git a/modules/Bio/EnsEMBL/Hive/ b/modules/Bio/EnsEMBL/Hive/
new file mode 100644
index 000000000..b4996490a
--- /dev/null
+++ b/modules/Bio/EnsEMBL/Hive/
@@ -0,0 +1,154 @@
+=head1 NAME
+    Bio::EnsEMBL::Hive::Scheduler
+    Scheduler starts with the numbers of required workers for unblocked analyses,
+    then goes through several kinds of restrictions (submit_limit, meadow_limits, hive_capacity, etc)
+    that act as limiters and may cap the original numbers in several ways.
+    The capped numbers are then grouped by meadow_type and rc_name and returned in a two-level hash.
+=head1 CONTACT
+  Please contact mailing list with questions/suggestions.
+package Bio::EnsEMBL::Hive::Scheduler;
+use strict;
+use warnings;
+use Clone 'clone';
+use Bio::EnsEMBL::Hive::Analysis;
+use Bio::EnsEMBL::Hive::AnalysisStats;
+use Bio::EnsEMBL::Hive::Queen;
+use Bio::EnsEMBL::Hive::Valley;
+sub schedule_workers_resync_if_necessary {
+    my ($queen, $valley, $filter_analysis) = @_;
+    my $available_worker_slots_by_meadow_type                                       = $valley->get_available_worker_slots_by_meadow_type();
+    my ($pending_worker_counts_by_meadow_type_rc_name, $total_pending_all_meadows)  = $valley->get_pending_worker_counts_by_meadow_type_rc_name();
+    my $analysis_id2rc_id         = $queen->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
+    my $rc_id2name                = $queen->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
+        # combined mapping:
+    my $analysis_id2rc_name       = { map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id };
+    my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
+        = schedule_workers($queen, $valley, $filter_analysis, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, $analysis_id2rc_name);
+    unless( $total_workers_to_submit or $queen->get_hive_current_load() or $queen->count_running_workers() ) {
+        print "\nScheduler: nothing is running and nothing to do (according to analysis_stats) => executing garbage collection and sync\n" ;
+        $queen->check_for_dead_workers($valley, 1);
+        $queen->synchronize_hive($filter_analysis);
+        ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
+            = schedule_workers($queen, $valley, $filter_analysis, $available_worker_slots_by_meadow_type, $pending_worker_counts_by_meadow_type_rc_name, $analysis_id2rc_name);
+    }
+    return ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit);
+sub schedule_workers {
+    my ($queen, $valley, $filter_analysis, $available_worker_slots_by_meadow_type, $orig_pending_worker_counts_by_meadow_type_rc_name, $analysis_id2rc_name) = @_;
+    my @suitable_analyses   = $filter_analysis
+                                ? ( $filter_analysis->stats )
+                                : @{ $queen->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id() };
+    unless(@suitable_analyses) {
+        print "Scheduler could not find any suitable analyses to start with\n";
+        return ({}, 0);
+    }
+    my %workers_to_submit_by_meadow_type_rc_name    = ();
+    my %total_workers_to_submit_by_meadow_type      = ();
+    my %pending_worker_counts_by_meadow_type_rc_name= %{ clone $orig_pending_worker_counts_by_meadow_type_rc_name };    # we need a deep disposable copy here
+    my $total_workers_to_submit                     = 0;
+    my $default_meadow_type                         = $valley->get_default_meadow()->type;
+    my $available_submit_limit                      = $valley->config_get('SubmitWorkersMax');
+    my $available_load                              = 1.0 - $queen->get_hive_current_load();
+    foreach my $analysis_stats (@suitable_analyses) {
+        last if ($available_load <= 0.0);
+        my $analysis = $analysis_stats->get_analysis();         # FIXME: if it proves too expensive we may need to consider caching
+        my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
+        if( defined(my $meadow_limit = $available_worker_slots_by_meadow_type->{ $this_meadow_type }) ) {
+            $available_submit_limit = defined($available_submit_limit)
+                ? (($available_submit_limit<$meadow_limit) ? $available_submit_limit : $meadow_limit)
+                : $meadow_limit;
+        }
+        last if (defined($available_submit_limit) and !$available_submit_limit);
+        #digging deeper under the surface so need to sync
+        if(($analysis_stats->status eq 'LOADING') or ($analysis_stats->status eq 'BLOCKED') or ($analysis_stats->status eq 'ALL_CLAIMED')) {
+            $queen->synchronize_AnalysisStats($analysis_stats);
+        }
+        next if($analysis_stats->status eq 'BLOCKED');
+        # FIXME: the following call *sometimes* returns a stale number greater than the number of workers actually needed for an analysis; -sync fixes it
+        my $workers_this_analysis = $analysis_stats->num_required_workers
+            or next;
+        if(defined($available_submit_limit)) {                              # available_submit_limit total capping, if available
+            if($workers_this_analysis > $available_submit_limit) {
+                $workers_this_analysis = $available_submit_limit;
+            }
+            $available_submit_limit -= $workers_this_analysis;
+        }
+        if((my $hive_capacity = $analysis_stats->hive_capacity) > 0) {      # per-analysis hive_capacity capping, if available
+            my $remaining_capacity_for_this_analysis = int($available_load * $hive_capacity);
+            if($workers_this_analysis > $remaining_capacity_for_this_analysis) {
+                $workers_this_analysis = $remaining_capacity_for_this_analysis;
+            }
+            $available_load -= 1.0*$workers_this_analysis/$hive_capacity;
+        }
+        my $curr_rc_name    = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
+        if(my $pending_this_meadow_type_and_rc_name = $pending_worker_counts_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available
+            my $pending_this_analysis = ($pending_this_meadow_type_and_rc_name < $workers_this_analysis) ? $pending_this_meadow_type_and_rc_name : $workers_this_analysis;
+            print "Scheduler detected $pending_this_analysis pending workers with resource_class_name=$curr_rc_name, adjusting for this value\n";
+            $pending_worker_counts_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name } -= $pending_this_analysis;
+            $workers_this_analysis                                                              -= $pending_this_analysis;
+        }
+        next unless($workers_this_analysis);    # do not autovivify the output hash by a zero
+        $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $curr_rc_name } += $workers_this_analysis;
+        $total_workers_to_submit_by_meadow_type{ $this_meadow_type }                    += $workers_this_analysis;
+        $total_workers_to_submit                                                        += $workers_this_analysis;
+        $analysis_stats->print_stats();
+        printf("Scheduler suggests adding $workers_this_analysis x $this_meadow_type:$curr_rc_name workers for '%s' [%.4f hive_load remaining]\n", $analysis->logic_name, $available_load);
+    }
+    print ''.('-'x60)."\n";
+    foreach my $meadow_type (keys %total_workers_to_submit_by_meadow_type) {
+        print "Scheduler suggests submitting a total of $total_workers_to_submit_by_meadow_type{$meadow_type} workers to $meadow_type\n";
+    }
+    printf("The remaining hive_load after submitting these workers will be: %.4f\n", $available_load);
+    print ''.('='x60)."\n";
+    return (\%workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit);
diff --git a/scripts/ b/scripts/
index cf67e272e..e651b1f23 100755
--- a/scripts/
+++ b/scripts/
@@ -11,6 +11,7 @@ use Bio::EnsEMBL::Hive::URLFactory;
 use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
 use Bio::EnsEMBL::Hive::Queen;
 use Bio::EnsEMBL::Hive::Valley;
+use Bio::EnsEMBL::Hive::Scheduler;
@@ -232,7 +233,7 @@ sub main {
-        $queen->schedule_workers_resync_if_necessary($valley, $analysis);   # show what would be submitted, but do not actually submit
+        Bio::EnsEMBL::Hive::Scheduler::schedule_workers_resync_if_necessary($queen, $valley, $analysis);   # show what would be submitted, but do not actually submit
         if($show_failed_jobs) {
@@ -332,7 +333,7 @@ sub run_autonomously {
         my ($workers_to_submit_by_meadow_type_rc_name, $total_workers_to_submit)
-            = $queen->schedule_workers_resync_if_necessary($valley, $run_analysis);
+            = Bio::EnsEMBL::Hive::Scheduler::schedule_workers_resync_if_necessary($queen, $valley, $run_analysis);
         if($total_workers_to_submit) {
             foreach my $meadow_type (keys %$workers_to_submit_by_meadow_type_rc_name) {