Commit c8c45241 authored by Jessica Severin's avatar Jessica Severin
Browse files

- moved worker/process code related to persistant /tmp/worker_## directory

  into the Worker object (and out of the Process)
- added Process::worker method so that running processes can talk to the
  worker that is currently running itself.
- modified system so that if a process subclass uses Process::dataflow_output_id
  on branch_code 1, it will turn off the automatic of flowing of the input_job
  out on branch_code 1.  This will make coding much cleaner so that processes
  no longer need to modifiy the input_id of the input_job
- added method Process::autoflow_inputjob which toggles this autoflow behaviour
  if a subclass would like to modify this directly
- auto_dataflow now happens right after the Process::write_output stage
parent d2b10fe5
...@@ -69,6 +69,12 @@ sub queen { ...@@ -69,6 +69,12 @@ sub queen {
return $self->{'_queen'}; return $self->{'_queen'};
} }
sub worker {
my $self = shift;
$self->{'_worker'} = shift if(@_);
return $self->{'_worker'};
}
=head2 db =head2 db
Title : db Title : db
...@@ -131,6 +137,13 @@ sub input_job { ...@@ -131,6 +137,13 @@ sub input_job {
return $self->{'_input_job'}; return $self->{'_input_job'};
} }
sub autoflow_inputjob {
my $self = shift;
$self->{'_autoflow_inputjob'} = shift if(@_);
$self->{'_autoflow_inputjob'}=1 unless(defined($self->{'_autoflow_inputjob'}));
return $self->{'_autoflow_inputjob'};
}
=head2 dataflow_output_id =head2 dataflow_output_id
Title : dataflow_output_id Title : dataflow_output_id
...@@ -152,10 +165,15 @@ sub dataflow_output_id { ...@@ -152,10 +165,15 @@ sub dataflow_output_id {
return unless($output_id); return unless($output_id);
return unless($self->analysis); return unless($self->analysis);
$branch_code=1 unless(defined($branch_code));
my $job = new Bio::EnsEMBL::Hive::AnalysisJob; my $job = new Bio::EnsEMBL::Hive::AnalysisJob;
$job->input_id($output_id); $job->input_id($output_id);
$job->analysis_id($self->analysis->dbID); $job->analysis_id($self->analysis->dbID);
$job->branch_code($branch_code) if(defined($branch_code)); $job->branch_code($branch_code);
#if process uses branch_code 1 explicitly, turn off automatic dataflow
$self->autoflow_inputjob(0) if($branch_code==1);
$self->queen->flow_output_job($job); $self->queen->flow_output_job($job);
} }
...@@ -188,12 +206,9 @@ sub encode_hash { ...@@ -188,12 +206,9 @@ sub encode_hash {
sub worker_temp_directory { sub worker_temp_directory {
unless(defined($g_hive_process_workdir) and (-e $g_hive_process_workdir)) { my $self = shift;
#create temp directory to hold fasta databases return undef unless($self->worker);
$g_hive_process_workdir = "/tmp/worker.$$/"; return $self->worker->worker_process_temp_directory;
mkdir($g_hive_process_workdir, 0777);
}
return $g_hive_process_workdir;
} }
################################################# #################################################
...@@ -238,13 +253,6 @@ sub write_output { ...@@ -238,13 +253,6 @@ sub write_output {
return 1; return 1;
} }
sub global_cleanup {
if($g_hive_process_workdir) {
unlink(<$g_hive_process_workdir/*>);
rmdir($g_hive_process_workdir);
}
return 1;
}
1; 1;
...@@ -99,7 +99,7 @@ our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor); ...@@ -99,7 +99,7 @@ our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
sub create_new_worker { sub create_new_worker {
my ($self, @args) = @_; my ($self, @args) = @_;
my ($analysis_id, $beekeeper ,$pid) = my ($analysis_id, $beekeeper ,$pid, $job) =
rearrange([qw(analysis_id beekeeper process_id) ], @args); rearrange([qw(analysis_id beekeeper process_id) ], @args);
my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor; my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor;
...@@ -111,7 +111,7 @@ sub create_new_worker { ...@@ -111,7 +111,7 @@ sub create_new_worker {
if($analysis_id) { if($analysis_id) {
$analysisStats = $analStatsDBA->fetch_by_analysis_id($analysis_id); $analysisStats = $analStatsDBA->fetch_by_analysis_id($analysis_id);
$self->safe_synchronize_AnalysisStats($analysisStats); $self->safe_synchronize_AnalysisStats($analysisStats);
return undef unless(($analysisStats->status ne 'BLOCKED') and ($analysisStats->num_required_workers > 0)); #return undef unless(($analysisStats->status ne 'BLOCKED') and ($analysisStats->num_required_workers > 0));
} else { } else {
$analysisStats = $self->_pick_best_analysis_for_new_worker; $analysisStats = $self->_pick_best_analysis_for_new_worker;
} }
...@@ -285,8 +285,6 @@ sub worker_register_job_done { ...@@ -285,8 +285,6 @@ sub worker_register_job_done {
return unless($worker and $worker->analysis and $worker->analysis->dbID); return unless($worker and $worker->analysis and $worker->analysis->dbID);
$job->update_status('DONE'); $job->update_status('DONE');
$self->flow_output_job($job);
} }
...@@ -618,18 +616,22 @@ sub get_num_needed_workers { ...@@ -618,18 +616,22 @@ sub get_num_needed_workers {
sub get_hive_progress sub get_hive_progress
{ {
my $self = shift; my $self = shift;
my $sql = "SELECT sum(done_job_count), sum(failed_job_count), sum(total_job_count) FROM analysis_stats"; my $sql = "SELECT sum(done_job_count), sum(failed_job_count), sum(total_job_count), ".
"sum(unclaimed_job_count * analysis_stats.avg_msec_per_job)/1000/60/60 ".
"FROM analysis_stats";
my $sth = $self->prepare($sql); my $sth = $self->prepare($sql);
$sth->execute(); $sth->execute();
my ($done, $failed, $total) = $sth->fetchrow_array(); my ($done, $failed, $total, $cpuhrs) = $sth->fetchrow_array();
$sth->finish; $sth->finish;
$done=0 unless($done); $done=0 unless($done);
$failed=0 unless($failed); $failed=0 unless($failed);
$total=0 unless($total); $total=0 unless($total);
my $completed=0.0; my $completed=0.0;
$completed = ((100.0 * ($done+$failed))/$total) if($total>0); $completed = ((100.0 * ($done+$failed))/$total) if($total>0);
printf("hive %1.3f%% complete (%d done + %d failed / %d total)\n", $completed, $done, $failed, $total); my $remaining = $total - $done - $failed;
return $done, $total; printf("hive %1.3f%% complete (< %1.3f CPU_hrs) (%d todo + %d done + %d failed = %d total)\n",
$completed, $cpuhrs, $remaining, $done, $failed, $total);
return $done, $total, $cpuhrs;
} }
sub print_hive_status sub print_hive_status
......
...@@ -67,6 +67,7 @@ ...@@ -67,6 +67,7 @@
=cut =cut
package Bio::EnsEMBL::Hive::Worker; package Bio::EnsEMBL::Hive::Worker;
use strict; use strict;
...@@ -289,6 +290,28 @@ sub print_worker { ...@@ -289,6 +290,28 @@ sub print_worker {
} }
} }
sub worker_process_temp_directory {
my $self = shift;
unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
#create temp directory to hold fasta databases
$self->{'_tmp_dir'} = "/tmp/worker.$$/";
mkdir($self->{'_tmp_dir'}, 0777);
throw("unable to create ".$self->{'_tmp_dir'}) unless(-e $self->{'_tmp_dir'});
}
return $self->{'_tmp_dir'};
}
sub cleanup_worker_process_temp_directory {
my $self = shift;
if($self->{'_tmp_dir'}) {
my $cmd = "rm -r ". $self->{'_tmp_dir'};
system($cmd);
}
}
############################### ###############################
# #
# WORK section # WORK section
...@@ -426,7 +449,7 @@ sub run ...@@ -426,7 +449,7 @@ sub run
if($self->perform_global_cleanup) { if($self->perform_global_cleanup) {
#have runnable cleanup any global/process files/data it may have created #have runnable cleanup any global/process files/data it may have created
$self->analysis->process->global_cleanup(); $self->cleanup_worker_process_temp_directory;
} }
$self->queen->register_worker_death($self); $self->queen->register_worker_death($self);
...@@ -463,11 +486,12 @@ sub run_module_with_job ...@@ -463,11 +486,12 @@ sub run_module_with_job
if($runObj->isa("Bio::EnsEMBL::Hive::Process")) { if($runObj->isa("Bio::EnsEMBL::Hive::Process")) {
$runObj->input_job($job); $runObj->input_job($job);
$runObj->queen($self->queen); $runObj->queen($self->queen);
$runObj->worker($self);
$runObj->debug($self->debug);
} else { } else {
$runObj->input_id($job->input_id); $runObj->input_id($job->input_id);
$runObj->db($self->db); $runObj->db($self->db);
} }
$runObj->debug($self->debug);
$job->update_status('GET_INPUT'); $job->update_status('GET_INPUT');
print("GET_INPUT\n") if($self->debug); print("GET_INPUT\n") if($self->debug);
...@@ -484,6 +508,12 @@ sub run_module_with_job ...@@ -484,6 +508,12 @@ sub run_module_with_job
$job->query_count($self->queen->dbc->query_count); $job->query_count($self->queen->dbc->query_count);
$job->runtime_msec(time()*1000 - $start_time); $job->runtime_msec(time()*1000 - $start_time);
unless($runObj->isa("Bio::EnsEMBL::Hive::Process") and
!($runObj->autoflow_inputjob)) {
printf("AUTOFLOW input->output\n") if($self->debug);
$self->queen->flow_output_job($job);
}
return 1; return 1;
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment