From d67bcffb8877c0c175553e77fa048eabbc59ffd0 Mon Sep 17 00:00:00 2001
From: Leo Gordon <lg4@ebi.ac.uk>
Date: Mon, 21 Nov 2011 16:44:48 +0000
Subject: [PATCH] remove the per-worker batch_size method

---
 modules/Bio/EnsEMBL/Hive/AnalysisStats.pm     |  2 +-
 .../EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm  |  9 ++--
 modules/Bio/EnsEMBL/Hive/Queen.pm             | 29 +++++-------
 modules/Bio/EnsEMBL/Hive/Worker.pm            | 44 +++++++++----------
 4 files changed, 37 insertions(+), 47 deletions(-)

diff --git a/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm b/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm
index 9c86d1ad8..fd8fb9533 100755
--- a/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm
+++ b/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm
@@ -96,7 +96,7 @@ sub batch_size {
     my $self = shift;
 
     $self->{'_batch_size'} = shift if(@_);
-    $self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # do we need to initialize it at all?
+    $self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
 
     return $self->{'_batch_size'};
 }
diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
index 327a783dc..79610cabb 100644
--- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
+++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
@@ -481,7 +481,7 @@ sub store_out_files {
     my $jobs  = $job_adaptor->grab_jobs_for_worker( $worker );
   Description: 
     For the specified worker, it will search available jobs, 
-    and using the workers requested batch_size, claim/fetch that
+    and using the how_many_this_batch parameter, claim/fetch that
     number of jobs, and then return them.
   Returntype : 
     reference to array of Bio::EnsEMBL::Hive::AnalysisJob objects
@@ -490,17 +490,16 @@ sub store_out_files {
 =cut
 
 sub grab_jobs_for_worker {
-    my ($self, $worker) = @_;
+    my ($self, $worker, $how_many_this_batch) = @_;
   
   my $analysis_id = $worker->analysis->dbID();
   my $worker_id   = $worker->dbID();
-  my $batch_size  = $worker->batch_size();
 
   my $update_sql            = "UPDATE job SET worker_id='$worker_id', status='CLAIMED'";
   my $selection_start_sql   = " WHERE analysis_id='$analysis_id' AND status='READY' AND semaphore_count<=0";
 
-  my $virgin_selection_sql  = $selection_start_sql . " AND retry_count=0 LIMIT $batch_size";
-  my $any_selection_sql     = $selection_start_sql . " LIMIT $batch_size";
+  my $virgin_selection_sql  = $selection_start_sql . " AND retry_count=0 LIMIT $how_many_this_batch";
+  my $any_selection_sql     = $selection_start_sql . " LIMIT $how_many_this_batch";
 
   if($self->dbc->driver eq 'sqlite') {
             # we have to be explicitly numereic here because of '0E0' value returned by DBI if "no rows have been affected":
diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm
index 3fe913e5f..091360049 100755
--- a/modules/Bio/EnsEMBL/Hive/Queen.pm
+++ b/modules/Bio/EnsEMBL/Hive/Queen.pm
@@ -25,8 +25,8 @@
 
   Each worker is linked to an analysis_id, registers its self on creation
   into the Hive, creates a RunnableDB instance of the Analysis->module,
-  gets $worker->batch_size() jobs from the job table, does its
-  work, creates the next layer of job entries by interfacing to
+  gets $analysis->stats->batch_size jobs from the job table, does its work,
+  creates the next layer of job entries by interfacing to
   the DataflowRuleAdaptor to determine the analyses it needs to pass its
   output data to and creates jobs on the next analysis database.
   It repeats this cycle until it has lived its lifetime or until there are no
@@ -99,11 +99,11 @@ sub create_new_worker {
 
   my (  $meadow_type, $process_id, $exec_host,
         $rc_id, $logic_name, $analysis_id, $input_id, $job_id,
-        $no_write, $debug, $worker_output_dir, $hive_output_dir, $batch_size, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs) =
+        $no_write, $debug, $worker_output_dir, $hive_output_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs) =
 
  rearrange([qw(meadow_type process_id exec_host
         rc_id logic_name analysis_id input_id job_id
-        no_write debug worker_output_dir hive_output_dir batch_size job_limit life_span no_cleanup retry_throwing_jobs) ], @args);
+        no_write debug worker_output_dir hive_output_dir job_limit life_span no_cleanup retry_throwing_jobs) ], @args);
 
     if($logic_name) {
         if($analysis_id) {
@@ -223,15 +223,9 @@ sub create_new_worker {
     }
     $worker->hive_output_dir($hive_output_dir);
 
-    if($batch_size) {
-        $worker->batch_size($batch_size);
-    }
     if($job_limit) {
       $worker->job_limit($job_limit);
       $worker->life_span(0);
-      if($job_limit < $worker->batch_size()) {
-        $worker->batch_size( $job_limit );
-      }
     }
     if($life_span) {
       $worker->life_span($life_span * 60);
@@ -546,19 +540,19 @@ sub synchronize_AnalysisStats {
       $analysisStats->failed_job_count(0);
       $analysisStats->num_required_workers(0);
 
-      my $sql = "SELECT status, count(*), semaphore_count FROM job ".
+      my $sql = "SELECT status, semaphore_count, count(*) FROM job ".
                 "WHERE analysis_id=? GROUP BY status, semaphore_count";
       my $sth = $self->prepare($sql);
       $sth->execute($analysisStats->analysis_id);
 
 
-      my $done_here      = 0;
-      my $done_elsewhere = 0;
-      while (my ($status, $job_count, $semaphore_count)=$sth->fetchrow_array()) {
+      my $done_here       = 0;
+      my $done_elsewhere  = 0;
+      my $total_job_count = 0;
+      while (my ($status, $semaphore_count, $job_count)=$sth->fetchrow_array()) {
     # print STDERR "$status: $job_count\n";
 
-        my $curr_total = $analysisStats->total_job_count();
-        $analysisStats->total_job_count($curr_total + $job_count);
+        my $total_job_count += $job_count;
 
         if(($status eq 'READY') and ($semaphore_count<=0)) {
             $analysisStats->unclaimed_job_count($job_count);
@@ -585,7 +579,8 @@ sub synchronize_AnalysisStats {
       }
       $sth->finish;
 
-      $analysisStats->done_job_count($done_here + $done_elsewhere);
+      $analysisStats->total_job_count( $total_job_count );
+      $analysisStats->done_job_count( $done_here + $done_elsewhere );
   } # /unless $self->{'_hive_use_triggers'}
 
   $analysisStats->check_blocking_control_rules();
diff --git a/modules/Bio/EnsEMBL/Hive/Worker.pm b/modules/Bio/EnsEMBL/Hive/Worker.pm
index 0b9388feb..5433b3e57 100755
--- a/modules/Bio/EnsEMBL/Hive/Worker.pm
+++ b/modules/Bio/EnsEMBL/Hive/Worker.pm
@@ -235,6 +235,22 @@ sub more_work_done {
   $self->{'work_done'}++;
 }
 
+
+sub next_batch_size {
+    my $self = shift @_;
+
+    my $batch_size = $self->analysis->stats->get_or_estimate_batch_size();
+
+    if(my $job_limit = $self->job_limit()) {               # if job_limit is set, it may influence batch_size
+        my $jobs_to_do = $job_limit - $self->work_done();
+        if($jobs_to_do < $batch_size) {
+            return $jobs_to_do;         # should return 0 when job_limit has been attained
+        }
+    }
+    return $batch_size;
+}
+
+
 sub job_limit_reached {
     my $self = shift @_;
 
@@ -378,7 +394,7 @@ sub print_worker {
      " host=",$self->host,
      " pid=",$self->process_id,
      "\n");
-  print("  batch_size = ", $self->batch_size,"\n");
+  print("  batch_size = ", $self->analysis->stats->get_or_estimate_batch_size(),"\n");
   print("  job_limit  = ", $self->job_limit,"\n") if(defined($self->job_limit));
   print("  life_span  = ", $self->life_span,"\n") if(defined($self->life_span));
   if(my $worker_output_dir = $self->worker_output_dir) {
@@ -418,28 +434,6 @@ sub cleanup_worker_process_temp_directory {
 #
 ###############################
 
-=head2 batch_size
-
-  Args    :   none
-  Title   :   batch_size
-  Usage   :   $value = $self->batch_size;
-              $self->batch_size($new_value);
-  Description: Defines the number of jobs that should run in batch
-               before querying the database for the next job batch.  Used by the
-               Hive system to manage the number of workers needed to complete a
-               particular job type.
-  DefaultValue : batch_size of analysis
-  Returntype : integer scalar
-
-=cut
-
-sub batch_size {
-    my $self = shift;
-
-    $self->{'_batch_size'} = shift if(@_);
-    return $self->{'_batch_size'} || $self->analysis->stats->get_or_estimate_batch_size();
-}
-
 
 =head2 run
 
@@ -503,7 +497,9 @@ sub run {
                 $self->cause_of_death('CONTAMINATED'); 
                 $job_adaptor->release_undone_jobs_from_worker($self, $msg);
             } else {
-                $jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->grab_jobs_for_worker( $self ) );
+                if(my $how_many_this_batch = $self->next_batch_size()) {
+                    $jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->grab_jobs_for_worker( $self, $how_many_this_batch ) );
+                }
 
                 if( my $jobs_completed = $self->job_limit_reached()) {
                     print "job_limit reached ($jobs_completed jobs completed)\n";
-- 
GitLab