From c8c45241b19246c7f9d54b827e5f981f8cafc6ba Mon Sep 17 00:00:00 2001
From: Jessica Severin <jessica@ebi.ac.uk>
Date: Mon, 13 Jun 2005 11:08:05 +0000
Subject: [PATCH] - moved worker/process code related to persistant
 /tmp/worker_## directory   into the Worker object (and out of the Process) -
 added Process::worker method so that running processes can talk to the  
 worker that is currently running itself. - modified system so that if a
 process subclass uses Process::dataflow_output_id   on branch_code 1, it will
 turn off the automatic of flowing of the input_job   out on branch_code 1. 
 This will make coding much cleaner so that processes   no longer need to
 modifiy the input_id of the input_job - added method
 Process::autoflow_inputjob which toggles this autoflow behaviour   if a
 subclass would like to modify this directly - auto_dataflow now happens right
 after the Process::write_output stage

---
 modules/Bio/EnsEMBL/Hive/Process.pm | 36 ++++++++++++++++++-----------
 modules/Bio/EnsEMBL/Hive/Queen.pm   | 18 ++++++++-------
 modules/Bio/EnsEMBL/Hive/Worker.pm  | 36 ++++++++++++++++++++++++++---
 3 files changed, 65 insertions(+), 25 deletions(-)

diff --git a/modules/Bio/EnsEMBL/Hive/Process.pm b/modules/Bio/EnsEMBL/Hive/Process.pm
index c94639885..edd6cfacb 100755
--- a/modules/Bio/EnsEMBL/Hive/Process.pm
+++ b/modules/Bio/EnsEMBL/Hive/Process.pm
@@ -69,6 +69,12 @@ sub queen {
   return $self->{'_queen'};
 }
 
+sub worker {
+  my $self = shift;
+  $self->{'_worker'} = shift if(@_);
+  return $self->{'_worker'};
+}
+
 =head2 db
 
     Title   :   db
@@ -131,6 +137,13 @@ sub input_job {
   return $self->{'_input_job'};
 }
 
+sub autoflow_inputjob {
+  my $self = shift;
+  $self->{'_autoflow_inputjob'} = shift if(@_);
+  $self->{'_autoflow_inputjob'}=1 unless(defined($self->{'_autoflow_inputjob'}));  
+  return $self->{'_autoflow_inputjob'};
+}
+
 =head2 dataflow_output_id
 
     Title        :  dataflow_output_id
@@ -152,10 +165,15 @@ sub dataflow_output_id {
   return unless($output_id);
   return unless($self->analysis);
 
+  $branch_code=1 unless(defined($branch_code));
+
   my $job = new Bio::EnsEMBL::Hive::AnalysisJob;
   $job->input_id($output_id);
   $job->analysis_id($self->analysis->dbID);
-  $job->branch_code($branch_code) if(defined($branch_code));
+  $job->branch_code($branch_code);
+  
+  #if process uses branch_code 1 explicitly, turn off automatic dataflow
+  $self->autoflow_inputjob(0) if($branch_code==1);
 
   $self->queen->flow_output_job($job);  
 }
@@ -188,12 +206,9 @@ sub encode_hash {
 
 
 sub worker_temp_directory {
-  unless(defined($g_hive_process_workdir) and (-e $g_hive_process_workdir)) {
-    #create temp directory to hold fasta databases
-    $g_hive_process_workdir = "/tmp/worker.$$/";
-    mkdir($g_hive_process_workdir, 0777);
-  }
-  return $g_hive_process_workdir;
+  my $self = shift;
+  return undef unless($self->worker);
+  return $self->worker->worker_process_temp_directory;
 }
 
 #################################################
@@ -238,13 +253,6 @@ sub write_output {
   return 1;
 }
 
-sub global_cleanup {
-  if($g_hive_process_workdir) {
-    unlink(<$g_hive_process_workdir/*>);
-    rmdir($g_hive_process_workdir);
-  }
-  return 1;
-}
 
 1;
 
diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm
index e314a4ad1..57e34fff4 100755
--- a/modules/Bio/EnsEMBL/Hive/Queen.pm
+++ b/modules/Bio/EnsEMBL/Hive/Queen.pm
@@ -99,7 +99,7 @@ our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
 sub create_new_worker {
   my ($self, @args) = @_;
 
-  my ($analysis_id, $beekeeper ,$pid) =
+  my ($analysis_id, $beekeeper ,$pid, $job) =
      rearrange([qw(analysis_id beekeeper process_id) ], @args);
 
   my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor;
@@ -111,7 +111,7 @@ sub create_new_worker {
   if($analysis_id) {
     $analysisStats = $analStatsDBA->fetch_by_analysis_id($analysis_id);
     $self->safe_synchronize_AnalysisStats($analysisStats);
-    return undef unless(($analysisStats->status ne 'BLOCKED') and ($analysisStats->num_required_workers > 0));
+    #return undef unless(($analysisStats->status ne 'BLOCKED') and ($analysisStats->num_required_workers > 0));
   } else {
     $analysisStats = $self->_pick_best_analysis_for_new_worker;
   }
@@ -285,8 +285,6 @@ sub worker_register_job_done {
   return unless($worker and $worker->analysis and $worker->analysis->dbID);
   
   $job->update_status('DONE');
-
-  $self->flow_output_job($job);
 }
 
 
@@ -618,18 +616,22 @@ sub get_num_needed_workers {
 sub get_hive_progress
 {
   my $self = shift;
-  my $sql = "SELECT sum(done_job_count), sum(failed_job_count), sum(total_job_count) FROM analysis_stats";
+  my $sql = "SELECT sum(done_job_count), sum(failed_job_count), sum(total_job_count), ".
+            "sum(unclaimed_job_count * analysis_stats.avg_msec_per_job)/1000/60/60 ".
+            "FROM analysis_stats";
   my $sth = $self->prepare($sql);
   $sth->execute();
-  my ($done, $failed, $total) = $sth->fetchrow_array();
+  my ($done, $failed, $total, $cpuhrs) = $sth->fetchrow_array();
   $sth->finish;
   $done=0 unless($done);
   $failed=0 unless($failed);
   $total=0 unless($total);
   my $completed=0.0;
   $completed = ((100.0 * ($done+$failed))/$total)  if($total>0);
-  printf("hive %1.3f%% complete (%d done + %d failed / %d total)\n", $completed, $done, $failed, $total);
-  return $done, $total;
+  my $remaining = $total - $done - $failed;
+  printf("hive %1.3f%% complete (< %1.3f CPU_hrs) (%d todo + %d done + %d failed = %d total)\n", 
+          $completed, $cpuhrs, $remaining, $done, $failed, $total);
+  return $done, $total, $cpuhrs;
 }
 
 sub print_hive_status
diff --git a/modules/Bio/EnsEMBL/Hive/Worker.pm b/modules/Bio/EnsEMBL/Hive/Worker.pm
index 0fd95d54f..24f254f4a 100755
--- a/modules/Bio/EnsEMBL/Hive/Worker.pm
+++ b/modules/Bio/EnsEMBL/Hive/Worker.pm
@@ -67,6 +67,7 @@
 
 =cut
 
+
 package Bio::EnsEMBL::Hive::Worker;
 
 use strict;
@@ -289,6 +290,28 @@ sub print_worker {
   }
 }
 
+
+sub worker_process_temp_directory {
+  my $self = shift;
+  
+  unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
+    #create temp directory to hold fasta databases
+    $self->{'_tmp_dir'} = "/tmp/worker.$$/";
+    mkdir($self->{'_tmp_dir'}, 0777);
+    throw("unable to create ".$self->{'_tmp_dir'}) unless(-e $self->{'_tmp_dir'});
+  }
+  return $self->{'_tmp_dir'};
+}
+
+
+sub cleanup_worker_process_temp_directory {
+  my $self = shift;
+  if($self->{'_tmp_dir'}) {
+    my $cmd = "rm -r ". $self->{'_tmp_dir'};
+    system($cmd);
+  }
+}
+
 ###############################
 #
 # WORK section
@@ -426,7 +449,7 @@ sub run
   
   if($self->perform_global_cleanup) {
     #have runnable cleanup any global/process files/data it may have created
-    $self->analysis->process->global_cleanup();
+    $self->cleanup_worker_process_temp_directory;
   }
 
   $self->queen->register_worker_death($self);
@@ -463,11 +486,12 @@ sub run_module_with_job
   if($runObj->isa("Bio::EnsEMBL::Hive::Process")) { 
     $runObj->input_job($job);
     $runObj->queen($self->queen);
+    $runObj->worker($self);
+    $runObj->debug($self->debug);
   } else {
     $runObj->input_id($job->input_id);
     $runObj->db($self->db);
   }
-  $runObj->debug($self->debug);
   
   $job->update_status('GET_INPUT');
   print("GET_INPUT\n") if($self->debug); 
@@ -480,9 +504,15 @@ sub run_module_with_job
   $job->update_status('WRITE_OUTPUT');
   print("WRITE_OUTPUT\n") if($self->debug); 
   $runObj->write_output;
-
+  
   $job->query_count($self->queen->dbc->query_count);
   $job->runtime_msec(time()*1000 - $start_time);
+  
+  unless($runObj->isa("Bio::EnsEMBL::Hive::Process") and
+         !($runObj->autoflow_inputjob)) {
+    printf("AUTOFLOW input->output\n") if($self->debug);
+    $self->queen->flow_output_job($job);
+  }
 
   return 1;
 }
-- 
GitLab