From 4956d67260f4b3fc9c7c1828e947fcf0dd47273b Mon Sep 17 00:00:00 2001
From: Leo Gordon <lg4@ebi.ac.uk>
Date: Tue, 19 Nov 2013 11:17:38 +0000
Subject: [PATCH] use OFFSET to separate jobs being claimed into ranges

---
 .../EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm  | 47 ++++++++++---------
 modules/Bio/EnsEMBL/Hive/Queen.pm             | 16 +++----
 modules/Bio/EnsEMBL/Hive/Worker.pm            |  3 +-
 3 files changed, 34 insertions(+), 32 deletions(-)

diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
index 39bbaf0d3..96d8cf29f 100644
--- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
+++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm
@@ -545,30 +545,35 @@ sub reset_or_grab_job_by_dbID {
 =cut
 
 sub grab_jobs_for_worker {
-    my ($self, $worker, $how_many_this_batch) = @_;
+    my ($self, $worker, $how_many_this_batch, $workers_rank) = @_;
   
-  my $analysis_id = $worker->analysis_id();
-  my $worker_id   = $worker->dbID();
-
-  my $update_sql            = "UPDATE job SET worker_id='$worker_id', status='CLAIMED'";
-  my $selection_start_sql   = " WHERE analysis_id='$analysis_id' AND status='READY'";
-
-  my $virgin_selection_sql  = $selection_start_sql . " AND retry_count=0 LIMIT $how_many_this_batch";
-  my $any_selection_sql     = $selection_start_sql . " LIMIT $how_many_this_batch";
+    my $analysis_id = $worker->analysis_id();
+    my $worker_id   = $worker->dbID();
+    my $offset      = $how_many_this_batch*$workers_rank;
+
+    my $prefix_sql = qq{
+         UPDATE job j
+           JOIN (
+                            SELECT job_id
+                              FROM job
+                             WHERE analysis_id='$analysis_id'
+                               AND status='READY'
+    };
+    my $suffix_sql = qq{
+                             LIMIT $how_many_this_batch
+                            OFFSET $offset
+                 ) as x
+         USING (job_id)
+           SET j.worker_id='$worker_id', j.status='CLAIMED'
+           WHERE j.status='READY'
+    };
 
-  if($self->dbc->driver eq 'mysql') {
-            # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
-      if( (my $claim_count = $self->dbc->do( $update_sql . $virgin_selection_sql )) == 0 ) {
-            $claim_count = $self->dbc->do( $update_sql . $any_selection_sql );
-      }
-  } else {
-            # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
-      if( (my $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $virgin_selection_sql) AND status='READY'" )) == 0 ) {
-            $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $any_selection_sql) AND status='READY'" );
-      }
-  }
+        # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
+    if( (my $claim_count = $self->dbc->do( $prefix_sql . ' AND retry_count=0 '. $suffix_sql)) == 0 ) {
+        $claim_count = $self->dbc->do( $prefix_sql . $suffix_sql);
+    }
 
-#  my $constraint = "j.analysis_id='$analysis_id' AND j.worker_id='$worker_id' AND j.status='CLAIMED'";
+#   my $constraint = "j.analysis_id='$analysis_id' AND j.worker_id='$worker_id' AND j.status='CLAIMED'";
     my $constraint = "j.worker_id='$worker_id' AND j.status='CLAIMED'";
     return $self->_generic_fetch($constraint);
 }
diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm
index 31f17a950..29501ad5a 100644
--- a/modules/Bio/EnsEMBL/Hive/Queen.pm
+++ b/modules/Bio/EnsEMBL/Hive/Queen.pm
@@ -692,18 +692,14 @@ sub get_hive_current_load {
 sub count_running_workers {
     my ($self, $analysis_id) = @_;
 
-    my $sql = qq{
-            SELECT count(*)
-            FROM worker
-            WHERE status!='DEAD'
-        } . ($analysis_id ? " AND analysis_id='$analysis_id'" : '');
+    return $self->count_all( "status!='DEAD'".($analysis_id ? " AND analysis_id=$analysis_id" : '') );
+}
 
-    my $sth = $self->prepare($sql);
-    $sth->execute();
-    (my $queen_running_workers)=$sth->fetchrow_array();
-    $sth->finish();
 
-    return $queen_running_workers || 0;
+sub get_workers_rank {
+    my ($self, $worker) = @_;
+
+    return $self->count_all( "status!='DEAD' AND analysis_id=".$worker->analysis_id." AND worker_id<".$worker->dbID );
 }
 
 
diff --git a/modules/Bio/EnsEMBL/Hive/Worker.pm b/modules/Bio/EnsEMBL/Hive/Worker.pm
index 20dd11506..8492487fe 100644
--- a/modules/Bio/EnsEMBL/Hive/Worker.pm
+++ b/modules/Bio/EnsEMBL/Hive/Worker.pm
@@ -523,7 +523,8 @@ sub run {
                     my $desired_batch_size = $self->analysis->stats->get_or_estimate_batch_size();
                     $desired_batch_size = $self->job_limiter->preliminary_offer( $desired_batch_size );
 
-                    my $actual_batch = $job_adaptor->grab_jobs_for_worker( $self, $desired_batch_size );
+                    my $workers_rank = $self->adaptor->get_workers_rank( $self );
+                    my $actual_batch = $job_adaptor->grab_jobs_for_worker( $self, $desired_batch_size, $workers_rank );
                     if(scalar(@$actual_batch)) {
                         my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
                         $jobs_done_by_batches_loop += $jobs_done_by_this_batch;
-- 
GitLab