From 63375cdb50b931016abf05432901ca581c116f0f Mon Sep 17 00:00:00 2001
From: Leo Gordon <lg4@ebi.ac.uk>
Date: Fri, 16 Nov 2012 10:36:49 +0000
Subject: [PATCH] aggregate meadow stats collection in the Valley

---
 modules/Bio/EnsEMBL/Hive/Meadow/LOCAL.pm |  7 ++++++
 modules/Bio/EnsEMBL/Hive/Meadow/LSF.pm   | 10 +++++---
 modules/Bio/EnsEMBL/Hive/Queen.pm        | 23 ++++++++++--------
 modules/Bio/EnsEMBL/Hive/Valley.pm       | 31 ++++++++++++++++++++++++
 4 files changed, 57 insertions(+), 14 deletions(-)

diff --git a/modules/Bio/EnsEMBL/Hive/Meadow/LOCAL.pm b/modules/Bio/EnsEMBL/Hive/Meadow/LOCAL.pm
index 99ee24ca2..65f775403 100644
--- a/modules/Bio/EnsEMBL/Hive/Meadow/LOCAL.pm
+++ b/modules/Bio/EnsEMBL/Hive/Meadow/LOCAL.pm
@@ -21,6 +21,13 @@ sub get_current_worker_process_id {
 }
 
 
+sub count_pending_workers_by_rc_name {
+    my ($self) = @_;
+
+    return ({}, 0);     # LOCAL has no concept of pending workers
+}
+
+
 sub count_running_workers {
     my $self = shift @_;
 
diff --git a/modules/Bio/EnsEMBL/Hive/Meadow/LSF.pm b/modules/Bio/EnsEMBL/Hive/Meadow/LSF.pm
index 96f589d06..0ef80c643 100644
--- a/modules/Bio/EnsEMBL/Hive/Meadow/LSF.pm
+++ b/modules/Bio/EnsEMBL/Hive/Meadow/LSF.pm
@@ -39,17 +39,19 @@ sub count_pending_workers_by_rc_name {
     my ($self) = @_;
 
     my $jnp = $self->job_name_prefix();
-    my $cmd = qq{bjobs -w -J '${jnp}*' -u all 2>/dev/null | grep PEND};
+    my $cmd = "bjobs -w -J '${jnp}*' -u all 2>/dev/null | grep PEND";
 
-    my %pending_by_rc_name = ();
+    my %pending_this_meadow_by_rc_name = ();
+    my $total_pending_this_meadow = 0;
 
     foreach my $line (qx/$cmd/) {
         if($line=~/\b\Q$jnp\E(\S+)\-\d+(\[\d+\])?\b/) {
-            $pending_by_rc_name{$1}++;
+            $pending_this_meadow_by_rc_name{$1}++;
+            $total_pending_this_meadow++;
         }
     }
 
-    return \%pending_by_rc_name;
+    return (\%pending_this_meadow_by_rc_name, $total_pending_this_meadow);
 }
 
 
diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm
index ff377dff5..9b77ba2e0 100644
--- a/modules/Bio/EnsEMBL/Hive/Queen.pm
+++ b/modules/Bio/EnsEMBL/Hive/Queen.pm
@@ -738,7 +738,7 @@ sub count_running_workers {
 =cut
 
 sub schedule_workers {
-  my ($self, $filter_analysis, $orig_pending_by_rc_name, $available_submit_limit) = @_;
+  my ($self, $filter_analysis, $orig_pending_this_meadow_by_rc_name, $available_submit_limit) = @_;
 
   my @suitable_analyses         = $filter_analysis
                                     ? ( $filter_analysis->stats )
@@ -751,7 +751,7 @@ sub schedule_workers {
 
   return {} unless(@suitable_analyses);
 
-  my %pending_by_rc_name        = %{ $orig_pending_by_rc_name || {} };  # NB: make our own copy to be able to modify it
+  my %pending_this_meadow_by_rc_name    = %{ $orig_pending_this_meadow_by_rc_name || {} };  # NB: make our own copy to be able to modify it
   my $total_workers_to_run      = 0;
   my %workers_to_run_by_rc_name = ();
   my $available_load            = 1.0 - $self->get_hive_current_load();
@@ -791,12 +791,12 @@ sub schedule_workers {
 
     my $curr_rc_name    = $analysis_id2rc_name{ $analysis_stats->analysis_id };
 
-    if($pending_by_rc_name{ $curr_rc_name }) {                              # per-rc_name capping by pending processes, if available
-        my $pending_this_analysis = ($pending_by_rc_name{ $curr_rc_name } < $workers_this_analysis) ? $pending_by_rc_name{ $curr_rc_name } : $workers_this_analysis;
+    if($pending_this_meadow_by_rc_name{ $curr_rc_name }) {                              # per-rc_name capping by pending processes, if available
+        my $pending_this_analysis = ($pending_this_meadow_by_rc_name{ $curr_rc_name } < $workers_this_analysis) ? $pending_this_meadow_by_rc_name{ $curr_rc_name } : $workers_this_analysis;
 
         print "Scheduler detected $pending_this_analysis pending workers with resource_class_name=$curr_rc_name\n";
-        $workers_this_analysis                  -= $pending_this_analysis;
-        $pending_by_rc_name{ $curr_rc_name }    -= $pending_this_analysis;
+        $workers_this_analysis                              -= $pending_this_analysis;
+        $pending_this_meadow_by_rc_name{ $curr_rc_name }    -= $pending_this_analysis;
     }
 
     next unless($workers_this_analysis);    # do not autovivify the hash by a zero
@@ -818,14 +818,17 @@ sub schedule_workers_resync_if_necessary {
     my $submit_limit        = $valley->config_get('SubmitWorkersMax');
     my $meadow              = $valley->get_current_meadow();
 
-    my $pending_by_rc_name  = $meadow->can('count_pending_workers_by_rc_name') ? $meadow->count_pending_workers_by_rc_name() : {};
-    my $meadow_limit        = ($meadow->can('count_running_workers') and defined($meadow->config_get('TotalRunningWorkersMax'))) ? $meadow->config_get('TotalRunningWorkersMax') - $meadow->count_running_workers : undef;
+    my $available_worker_slots_by_meadow_type   = $valley->get_available_worker_slots_by_meadow_type();
+    my $meadow_limit                            = $available_worker_slots_by_meadow_type->{ $meadow->type };
+
+    my ($pending_worker_counts_by_meadow_type_rc_name, $total_pending_all_meadows)  = $valley->get_pending_worker_counts_by_meadow_type_rc_name();
+    my $pending_this_meadow_by_rc_name          = $pending_worker_counts_by_meadow_type_rc_name->{ $meadow->type };
 
     my $available_submit_limit = ($submit_limit and $meadow_limit)
                                     ? (($submit_limit<$meadow_limit) ? $submit_limit : $meadow_limit)
                                     : (defined($submit_limit) ? $submit_limit : $meadow_limit);
 
-    my $workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_by_rc_name, $available_submit_limit);
+    my $workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_this_meadow_by_rc_name, $available_submit_limit);
 
     unless( keys %$workers_to_run_by_rc_name or $self->get_hive_current_load() or $self->count_running_workers() ) {
         print "\nScheduler: nothing is running and nothing to do (according to analysis_stats) => executing garbage collection and sync\n" ;
@@ -833,7 +836,7 @@ sub schedule_workers_resync_if_necessary {
         $self->check_for_dead_workers($valley, 1);
         $self->synchronize_hive($analysis);
 
-        $workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_by_rc_name, $available_submit_limit);
+        $workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_this_meadow_by_rc_name, $available_submit_limit);
     }
 
     return $workers_to_run_by_rc_name;
diff --git a/modules/Bio/EnsEMBL/Hive/Valley.pm b/modules/Bio/EnsEMBL/Hive/Valley.pm
index 7c74fab8a..53c9a2e47 100644
--- a/modules/Bio/EnsEMBL/Hive/Valley.pm
+++ b/modules/Bio/EnsEMBL/Hive/Valley.pm
@@ -140,5 +140,36 @@ sub whereami {
 }
 
 
+sub get_pending_worker_counts_by_meadow_type_rc_name {
+    my $self = shift @_;
+
+    my %pending_counts = ();
+    my $total_pending_all_meadows = 0;
+
+    foreach my $meadow (@{ $self->get_available_meadow_list }) {
+        my ($pending_this_meadow_by_rc_name, $total_pending_this_meadow) = ($meadow->count_pending_workers_by_rc_name());
+        $pending_counts{ $meadow->type } = $pending_this_meadow_by_rc_name;
+        $total_pending_all_meadows += $total_pending_this_meadow;
+    }
+
+    return (\%pending_counts, $total_pending_all_meadows);
+}
+
+
+sub get_available_worker_slots_by_meadow_type {
+    my $self = shift @_;
+
+    my %available_worker_slots = ();
+
+    foreach my $meadow (@{ $self->get_available_meadow_list }) {
+        if( $meadow->can('count_running_workers') and defined($meadow->config_get('TotalRunningWorkersMax'))) {
+            $available_worker_slots{ $meadow->type } = $meadow->config_get('TotalRunningWorkersMax') - $meadow->count_running_workers;
+        }
+    }
+
+    return \%available_worker_slots;
+}
+
+
 1;
 
-- 
GitLab