From 76b1900d4364cbfc8c54abf9662eb7c39ac54f91 Mon Sep 17 00:00:00 2001
From: Leo Gordon <lg4@ebi.ac.uk>
Date: Thu, 17 Nov 2011 14:56:20 +0000
Subject: [PATCH] make num_running_workers updatable by triggers + better
 updates during worker check-in

---
 .../Hive/DBSQL/AnalysisStatsAdaptor.pm        | 21 ++-----
 modules/Bio/EnsEMBL/Hive/Queen.pm             | 56 +++++++++----------
 modules/Bio/EnsEMBL/Hive/Worker.pm            | 49 ++++++++--------
 sql/triggers.mysql                            | 34 +++++++++--
 sql/triggers.sqlite                           | 38 ++++++++++---
 5 files changed, 118 insertions(+), 80 deletions(-)

diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm
index 611ae91e7..5bfa656b6 100644
--- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm
+++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm
@@ -203,11 +203,11 @@ sub update {
       $sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count();
       $sql .= ",done_job_count=" . $stats->done_job_count();
       $sql .= ",failed_job_count=" . $stats->failed_job_count();
-  }
 
-  $stats->num_running_workers( $self->db->get_Queen->count_running_workers( $stats->analysis_id() ) );
+      $stats->num_running_workers( $self->db->get_Queen->count_running_workers( $stats->analysis_id() ) );
+      $sql .= ",num_running_workers=" . $stats->num_running_workers();
+  }
 
-  $sql .= ",num_running_workers=" . $stats->num_running_workers();
   $sql .= ",num_required_workers=" . $stats->num_required_workers();
   $sql .= ",last_update=CURRENT_TIMESTAMP";
   $sql .= ",sync_lock='0'";
@@ -308,18 +308,7 @@ sub decrease_running_workers
   $self->dbc->do($sql);
 }
 
-sub decrease_running_workers_on_hive_overload {
-    my $self        = shift;
-    my $analysis_id = shift;
-
-    my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
-              "WHERE num_running_workers > hive_capacity AND analysis_id = $analysis_id ";
-
-    my $row_count = $self->dbc->do($sql);
-    return $row_count;
-}
-
-sub decrease_needed_workers
+sub decrease_required_workers
 {
   my $self = shift;
   my $analysis_id = shift;
@@ -331,7 +320,7 @@ sub decrease_needed_workers
 }
 
 
-sub increase_needed_workers
+sub increase_required_workers
 {
   my $self = shift;
   my $analysis_id = shift;
diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm
index 26e77ac79..3fe913e5f 100755
--- a/modules/Bio/EnsEMBL/Hive/Queen.pm
+++ b/modules/Bio/EnsEMBL/Hive/Queen.pm
@@ -178,11 +178,21 @@ sub create_new_worker {
       return;
     }
 
-    $analysis_stats_adaptor->decrease_needed_workers($analysisStats->analysis_id);
-    $analysis_stats_adaptor->increase_running_workers($analysisStats->analysis_id);
+    $analysis_stats_adaptor->decrease_required_workers($analysisStats->analysis_id);
+
     $analysisStats->print_stats;
   }
   
+    # The following increment used to be done only when no specific task was given to the worker,
+    # thereby excluding such "special task" workers from being counted in num_running_workers.
+    #
+    # However this may be tricky to emulate by triggers that know nothing about "special tasks",
+    # so I am (temporarily?) simplifying the accounting algorithm.
+    #
+  unless( $self->db->hive_use_triggers() ) {
+        $analysis_stats_adaptor->increase_running_workers($analysisStats->analysis_id);
+  }
+
   my $sql = q{INSERT INTO worker 
               (born, last_check_in, meadow_type, process_id, host, analysis_id)
               VALUES (CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, ?,?,?,?)};
@@ -243,11 +253,6 @@ sub register_worker_death {
 
   my $cod = $worker->cause_of_death();
 
-  unless ($cod eq 'HIVE_OVERLOAD') {
-    ## HIVE_OVERLOAD occurs after a successful update of the analysis_stats teble. (c.f. Worker.pm)
-    $worker->analysis->stats->adaptor->decrease_running_workers($worker->analysis->stats->analysis_id);
-  }
-
   my $sql = "UPDATE worker SET died=CURRENT_TIMESTAMP, last_check_in=CURRENT_TIMESTAMP";
   $sql .= " ,status='DEAD'";
   $sql .= " ,work_done='" . $worker->work_done . "'";
@@ -256,6 +261,10 @@ sub register_worker_death {
 
   $self->dbc->do( $sql );
 
+  unless( $self->db->hive_use_triggers() ) {
+      $worker->analysis->stats->adaptor->decrease_running_workers($worker->analysis->stats->analysis_id);
+  }
+
   if($cod eq 'NO_WORK') {
     $self->db->get_AnalysisStatsAdaptor->update_status($worker->analysis->dbID, 'ALL_CLAIMED');
   }
@@ -270,7 +279,7 @@ sub register_worker_death {
   if($self->safe_synchronize_AnalysisStats($worker->analysis->stats)->status ne 'DONE') {
     # since I'm dying I should make sure there is someone to take my place after I'm gone ...
     # above synch still sees me as a 'living worker' so I need to compensate for that
-    $self->db->get_AnalysisStatsAdaptor->increase_needed_workers($worker->analysis->dbID);
+    $self->db->get_AnalysisStatsAdaptor->increase_required_workers($worker->analysis->dbID);
   }
 
 }
@@ -334,17 +343,12 @@ sub check_for_dead_workers {    # a bit counter-intuitively only looks for curre
     }
 }
 
-sub worker_check_in {
-  my ($self, $worker) = @_;
 
-  return unless($worker);
-  my $sql = "UPDATE worker SET last_check_in=CURRENT_TIMESTAMP";
-  $sql .= " ,work_done='" . $worker->work_done . "'";
-  $sql .= " WHERE worker_id='" . $worker->dbID ."'";
+    # a new version that both checks in and updates the status
+sub check_in_worker {
+    my ($self, $worker) = @_;
 
-  my $sth = $self->prepare($sql);
-  $sth->execute();
-  $sth->finish;
+    $self->dbc->do("UPDATE worker SET last_check_in=CURRENT_TIMESTAMP, status='".$worker->status."', work_done='".$worker->work_done."' WHERE worker_id='".$worker->dbID."'");
 }
 
 
@@ -527,10 +531,10 @@ sub synchronize_AnalysisStats {
 
                 # adjust_stats_for_living_workers:
             if($hive_capacity > 0) {
-                my $capacity_allows_to_add = $hive_capacity - $self->count_running_workers( $analysisStats->analysis_id() );
+                my $unfulfilled_capacity = $hive_capacity - $analysisStats->num_running_workers();
 
-                if($capacity_allows_to_add < $required_workers ) {
-                    $required_workers = (0 < $capacity_allows_to_add) ? $capacity_allows_to_add : 0;
+                if($unfulfilled_capacity < $required_workers ) {
+                    $required_workers = (0 < $unfulfilled_capacity) ? $unfulfilled_capacity : 0;
                 }
             }
             $analysisStats->num_required_workers( $required_workers );
@@ -563,10 +567,10 @@ sub synchronize_AnalysisStats {
 
                 # adjust_stats_for_living_workers:
             if($hive_capacity > 0) {
-                my $capacity_allows_to_add = $hive_capacity - $self->count_running_workers( $analysisStats->analysis_id() );
+                my $unfulfilled_capacity = $hive_capacity - $self->count_running_workers( $analysisStats->analysis_id() );
 
-                if($capacity_allows_to_add < $required_workers ) {
-                    $required_workers = (0 < $capacity_allows_to_add) ? $capacity_allows_to_add : 0;
+                if($unfulfilled_capacity < $required_workers ) {
+                    $required_workers = (0 < $unfulfilled_capacity) ? $unfulfilled_capacity : 0;
                 }
             }
             $analysisStats->num_required_workers( $required_workers );
@@ -653,12 +657,6 @@ sub count_running_workers {
 }
 
 
-sub enter_status {
-  my ($self, $worker, $status) = @_;
-
-  $self->dbc->do("UPDATE worker SET status = '$status' WHERE worker_id = ".$worker->dbID);
-}
-
 =head2 get_num_needed_workers
 
   Arg[1]     : Bio::EnsEMBL::Analysis object (optional)
diff --git a/modules/Bio/EnsEMBL/Hive/Worker.pm b/modules/Bio/EnsEMBL/Hive/Worker.pm
index 128a3a6f9..0b9388feb 100755
--- a/modules/Bio/EnsEMBL/Hive/Worker.pm
+++ b/modules/Bio/EnsEMBL/Hive/Worker.pm
@@ -532,12 +532,12 @@ sub run {
     }
 
     if (!$self->cause_of_death
-    and $self->analysis->stats->hive_capacity >= 0
-    and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity
-    and $self->analysis->stats->adaptor->decrease_running_workers_on_hive_overload( $self->analysis->dbID ) # careful with order, this operation has side-effect
+    and 0 <= $self->analysis->stats->hive_capacity
+    and $self->analysis->stats->hive_capacity < $self->analysis->stats->num_running_workers
     ) {
         $self->cause_of_death('HIVE_OVERLOAD');
     }
+
   } while (!$self->cause_of_death); # /Worker's lifespan loop
 
   if($self->perform_cleanup) {
@@ -569,7 +569,7 @@ sub run_one_batch {
 
     my $max_retry_count = $self->analysis->stats->max_retry_count();  # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
 
-    $self->queen->worker_check_in($self);
+    $self->queen->check_in_worker( $self );
     $self->queen->safe_synchronize_AnalysisStats($self->analysis->stats);
 
     $self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
@@ -609,20 +609,21 @@ sub run_one_batch {
             or $self->prev_job_error                # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
             or $job->lethal_for_worker ) {          # trust the job's expert knowledge
                 my $reason = ($self->status eq 'COMPILATION') ? 'compilation error'
-                           : $self->prev_job_error           ? 'two failed jobs in a row'
-                           :                                   'suggested by job itself';
+                           : $self->prev_job_error            ? 'two failed jobs in a row'
+                           :                                    'suggested by job itself';
                 warn "Job's error has contaminated the Worker ($reason), so the Worker will now die\n";
                 $self->cause_of_death('CONTAMINATED');
                 return $jobs_done_here;
             }
         } else {    # job successfully completed:
 
-            if(my $semaphored_job_id = $job->semaphored_job_id) {
-                $job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore
-            }
             $self->more_work_done;
             $jobs_done_here++;
             $job->update_status('DONE');
+
+            if(my $semaphored_job_id = $job->semaphored_job_id) {
+                $job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore
+            }
         }
 
         $self->prev_job_error( $job->incomplete );
@@ -639,8 +640,7 @@ sub run_module_with_job {
   $job->incomplete(1);
   $job->autoflow(1);
 
-  $self->enter_status('COMPILATION');
-  $job->update_status('COMPILATION');
+  $self->enter_status('COMPILATION', $job);
   my $runObj = $self->analysis->process or die "Unknown compilation error";
   
   my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
@@ -662,26 +662,20 @@ sub run_module_with_job {
     $job->param_init( 0, $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() ); # Well, why not?
   }
 
-    $self->enter_status('GET_INPUT');
-    $job->update_status('GET_INPUT');
-    print("\nGET_INPUT\n") if($self->debug); 
+    $self->enter_status('GET_INPUT', $job);
 
     $self->{'fetching_stopwatch'}->continue();
     $runObj->fetch_input;
     $self->{'fetching_stopwatch'}->pause();
 
-    $self->enter_status('RUN');
-    $job->update_status('RUN');
-    print("\nRUN\n") if($self->debug); 
+    $self->enter_status('RUN', $job);
 
     $self->{'running_stopwatch'}->continue();
     $runObj->run;
     $self->{'running_stopwatch'}->pause();
 
     if($self->execute_writes) {
-        $self->enter_status('WRITE_OUTPUT');
-        $job->update_status('WRITE_OUTPUT');
-        print("\nWRITE_OUTPUT\n") if($self->debug); 
+        $self->enter_status('WRITE_OUTPUT', $job);
 
         $self->{'writing_stopwatch'}->continue();
         $runObj->write_output;
@@ -692,7 +686,7 @@ sub run_module_with_job {
             $job->dataflow_output_id();
         }
     } else {
-        print("\n\n!!!! NOT write_output\n\n\n") if($self->debug); 
+        print("\n!!! *no* WRITE_OUTPUT and *no* AUTOFLOW\n") if($self->debug); 
     }
 
     $job->query_count($self->queen->dbc->query_count);
@@ -702,8 +696,17 @@ sub run_module_with_job {
 }
 
 sub enter_status {
-  my ($self, $status) = @_;
-  return $self->queen->enter_status($self, $status);
+    my ($self, $status, $job) = @_;
+
+    if($self->debug) {
+        printf("\n%s : $status\n", $job ? 'job '.$job->dbID : 'worker');
+    }
+
+    if($job) {
+        $job->update_status( $status );
+    }
+    $self->status( $status );
+    $self->queen->check_in_worker( $self );
 }
 
 sub start_job_output_redirection {
diff --git a/sql/triggers.mysql b/sql/triggers.mysql
index 3d4c55086..5de3c1b81 100644
--- a/sql/triggers.mysql
+++ b/sql/triggers.mysql
@@ -18,16 +18,40 @@ FOR EACH ROW
         failed_job_count    = failed_job_count    - (CASE OLD.status WHEN 'FAILED' THEN 1                         ELSE 0 END)
     WHERE analysis_id = OLD.analysis_id;
 
+DELIMITER $$
+
 CREATE TRIGGER update_job AFTER UPDATE ON job
 FOR EACH ROW
+BEGIN
+    UPDATE analysis_stats SET
+        total_job_count     = total_job_count     - 1,
+        unclaimed_job_count = unclaimed_job_count - (CASE OLD.status WHEN 'READY'  THEN 1                          ELSE 0 END),
+        done_job_count      = done_job_count      - (CASE OLD.status WHEN 'DONE'   THEN 1 WHEN 'PASSED_ON' THEN  1 ELSE 0 END),
+        failed_job_count    = failed_job_count    - (CASE OLD.status WHEN 'FAILED' THEN 1                          ELSE 0 END)
+    WHERE analysis_id = OLD.analysis_id;
     UPDATE analysis_stats SET
-        unclaimed_job_count = unclaimed_job_count + (CASE NEW.status WHEN 'READY'  THEN 1                          ELSE 0 END)
-                                                  - (CASE OLD.status WHEN 'READY'  THEN 1                          ELSE 0 END),
-        done_job_count      = done_job_count      + (CASE NEW.status WHEN 'DONE'   THEN 1 WHEN 'PASSED_ON' THEN  1 ELSE 0 END)
-                                                  - (CASE OLD.status WHEN 'DONE'   THEN 1 WHEN 'PASSED_ON' THEN  1 ELSE 0 END),
+        total_job_count     = total_job_count     + 1,
+        unclaimed_job_count = unclaimed_job_count + (CASE NEW.status WHEN 'READY'  THEN 1                          ELSE 0 END),
+        done_job_count      = done_job_count      + (CASE NEW.status WHEN 'DONE'   THEN 1 WHEN 'PASSED_ON' THEN  1 ELSE 0 END),
         failed_job_count    = failed_job_count    + (CASE NEW.status WHEN 'FAILED' THEN 1                          ELSE 0 END)
-                                                  - (CASE OLD.status WHEN 'FAILED' THEN 1                          ELSE 0 END)
     WHERE analysis_id = NEW.analysis_id;
+END$$
+
+DELIMITER ;
+
+CREATE TRIGGER add_worker AFTER INSERT ON worker
+FOR EACH ROW
+    UPDATE analysis_stats SET
+        num_running_workers = num_running_workers + 1
+    WHERE analysis_id = NEW.analysis_id;
+
+CREATE TRIGGER update_worker AFTER UPDATE ON worker
+FOR EACH ROW
+    UPDATE analysis_stats SET
+        num_running_workers = num_running_workers - 1
+    WHERE analysis_id = NEW.analysis_id
+      AND OLD.status <> 'DEAD'
+      AND NEW.status =  'DEAD';
 
 
     # inform the runtime part of the system that triggers are in place:
diff --git a/sql/triggers.sqlite b/sql/triggers.sqlite
index 58587941a..a04c96f9f 100644
--- a/sql/triggers.sqlite
+++ b/sql/triggers.sqlite
@@ -26,14 +26,38 @@ CREATE TRIGGER update_job AFTER UPDATE ON job
 FOR EACH ROW
 BEGIN
     UPDATE analysis_stats SET
-        unclaimed_job_count = unclaimed_job_count + (CASE NEW.status WHEN 'READY'  THEN 1                          ELSE 0 END)
-                                                  - (CASE OLD.status WHEN 'READY'  THEN 1                          ELSE 0 END),
-        done_job_count      = done_job_count      + (CASE NEW.status WHEN 'DONE'   THEN 1 WHEN 'PASSED_ON' THEN  1 ELSE 0 END) 
-                                                  - (CASE OLD.status WHEN 'DONE'   THEN 1 WHEN 'PASSED_ON' THEN  1 ELSE 0 END),
-        failed_job_count    = failed_job_count    + (CASE NEW.status WHEN 'FAILED' THEN 1                          ELSE 0 END)
-                                                  - (CASE OLD.status WHEN 'FAILED' THEN 1                          ELSE 0 END)
+        total_job_count     = total_job_count     - 1,
+        unclaimed_job_count = unclaimed_job_count - (CASE OLD.status WHEN 'READY'  THEN 1                         ELSE 0 END),
+        done_job_count      = done_job_count      - (CASE OLD.status WHEN 'DONE'   THEN 1 WHEN 'PASSED_ON' THEN 1 ELSE 0 END),
+        failed_job_count    = failed_job_count    - (CASE OLD.status WHEN 'FAILED' THEN 1                         ELSE 0 END)
+    WHERE analysis_id = OLD.analysis_id;
+    UPDATE analysis_stats SET
+        total_job_count     = total_job_count     + 1,
+        unclaimed_job_count = unclaimed_job_count + (CASE NEW.status WHEN 'READY'  THEN 1                         ELSE 0 END),
+        done_job_count      = done_job_count      + (CASE NEW.status WHEN 'DONE'   THEN 1 WHEN 'PASSED_ON' THEN 1 ELSE 0 END),
+        failed_job_count    = failed_job_count    + (CASE NEW.status WHEN 'FAILED' THEN 1                         ELSE 0 END)
+    WHERE analysis_id = NEW.analysis_id;
+END;
+
+
+CREATE TRIGGER add_worker AFTER INSERT ON worker
+FOR EACH ROW
+BEGIN
+    UPDATE analysis_stats SET
+        num_running_workers = num_running_workers + 1
     WHERE analysis_id = NEW.analysis_id;
 END;
 
-    # inform the runtime part of the system that triggers are in place:
+CREATE TRIGGER update_worker AFTER UPDATE ON worker
+FOR EACH ROW
+BEGIN
+    UPDATE analysis_stats SET
+        num_running_workers = num_running_workers - 1
+    WHERE analysis_id = NEW.analysis_id
+      AND OLD.status <> 'DEAD'
+      AND NEW.status =  'DEAD';
+END;
+
+
+    -- inform the runtime part of the system that triggers are in place:
 INSERT INTO meta (meta_key, meta_value) VALUES ('hive_use_triggers', '1');
-- 
GitLab