Commit 2f733b8b authored by Leo Gordon's avatar Leo Gordon
Browse files

moved life_cycle() from Worker.pm into Process.pm and now also calling it from standaloneJob.pl

parent 128fc4da
......@@ -91,17 +91,92 @@ use Bio::EnsEMBL::DBSQL::DBConnection;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception ('throw');
use Bio::EnsEMBL::Hive::Utils ('url2dbconn_hash');
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
use base ('Bio::EnsEMBL::Utils::Exception'); # provide these methods for deriving classes
sub new {
my ($class,@args) = @_;
my $self = bless {}, $class;
my ($analysis) = rearrange([qw( ANALYSIS )], @args);
$self->analysis($analysis) if($analysis);
return $self;
my ($class, @args) = @_;
my $self = bless {}, $class;
my ($analysis) = rearrange([qw( ANALYSIS )], @args);
$self->analysis($analysis) if($analysis);
return $self;
}
sub life_cycle {
my ($self, $worker) = @_;
my $job = $self->input_job();
my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
my %job_partial_timing = ();
$job->autoflow(1);
if( $self->can('pre_cleanup') and $job->retry_count()>0 ) {
$self->enter_status('PRE_CLEANUP');
$self->pre_cleanup;
}
$self->enter_status('FETCH_INPUT');
$partial_stopwatch->restart();
$self->fetch_input;
$job_partial_timing{'FETCH_INPUT'} = $partial_stopwatch->get_elapsed();
$self->enter_status('RUN');
$partial_stopwatch->restart();
$self->run;
$job_partial_timing{'RUN'} = $partial_stopwatch->get_elapsed();
if($self->execute_writes) {
$self->enter_status('WRITE_OUTPUT');
$partial_stopwatch->restart();
$self->write_output;
$job_partial_timing{'WRITE_OUTPUT'} = $partial_stopwatch->get_elapsed();
if( $job->autoflow ) {
print STDERR "\njob ".$job->dbID." : AUTOFLOW input->output\n" if($self->debug);
$job->dataflow_output_id();
}
} else {
print STDERR "\n!!! *no* WRITE_OUTPUT requested, so there will be no AUTOFLOW\n" if($self->debug);
}
if( $self->can('post_cleanup') ) { # Todo: may need to run it after the eval, to clean up the memory even after partially failed attempts?
$self->enter_status('POST_CLEANUP');
$self->post_cleanup;
}
my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
$job->transient_error(0);
die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
}
return \%job_partial_timing;
}
sub enter_status {
my ($self, $status) = @_;
my $job = $self->input_job();
my $worker = $self->worker();
if($self->debug) {
print STDERR "\nworker_id=".($worker ? $worker->dbID : '(standalone)').($job ? ', job_id='.$job->dbID : ''). " : $status\n";
}
if($job) {
$job->update_status( $status );
}
if($worker) {
$worker->status( $status );
$worker->queen->check_in_worker( $worker );
}
}
......@@ -230,7 +305,7 @@ sub write_output {
Title : worker
Usage : my $worker = $self->worker;
Function: returns the Worker object this Process is run by
Returns : Bio::EnsEMBL::Hive::Queen
Returns : Bio::EnsEMBL::Hive::Worker
=cut
......@@ -242,6 +317,23 @@ sub worker {
}
=head2 execute_writes
Title : execute_writes
Usage : $self->execute_writes( 1 );
Function: getter/setter for whether we want the 'write_output' method to be run
Returns : boolean
=cut
sub execute_writes {
my $self = shift;
$self->{'_execute_writes'} = shift if(@_);
return $self->{'_execute_writes'};
}
=head2 db
Title : db
......
......@@ -521,6 +521,7 @@ sub run {
$runnable_object->db( $self->db );
$runnable_object->worker( $self );
$runnable_object->debug( $self->debug );
$runnable_object->execute_writes( $self->execute_writes );
$self->runnable_object( $runnable_object );
$self->enter_status('READY');
......@@ -645,6 +646,7 @@ sub run_one_batch {
$runnable_object->db( $self->db );
$runnable_object->worker( $self );
$runnable_object->debug( $self->debug );
$runnable_object->execute_writes( $self->execute_writes );
}
$self->db->dbc->query_count(0);
......@@ -653,7 +655,7 @@ sub run_one_batch {
$job->param_init( $runnable_object->strict_hash_format(), $runnable_object->param_defaults(), $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() );
$runnable_object->input_job( $job );
$job_partial_timing = $self->life_cycle($runnable_object);
$job_partial_timing = $runnable_object->life_cycle();
$job->incomplete(0);
};
......
......@@ -10,7 +10,7 @@ use Bio::EnsEMBL::Hive::Utils ('script_usage', 'load_file_or_module', 'parse_cmd
use Data::Dumper;
my ($reg_conf, $help, $debug, $no_write, $flow_into);
my ($reg_conf, $help, $debug, $no_write, $flow_into, $input_id);
my $module_or_file = shift @ARGV or script_usage();
......@@ -20,6 +20,7 @@ GetOptions(
'reg_conf|regfile=s' => \$reg_conf,
'no_write|nowrite' => \$no_write,
'flow_into|flow=s' => \$flow_into,
'input_id=s' => \$input_id,
);
if ($help or !$module_or_file) {
......@@ -33,9 +34,15 @@ if($reg_conf) {
}
my $runnable_object = $runnable_module->new();
my $job = Bio::EnsEMBL::Hive::AnalysisJob->new();
my ($param_hash, $param_list) = parse_cmdline_options();
$job->param_init( 1, $runnable_object->param_defaults(), $param_hash );
my $job = Bio::EnsEMBL::Hive::AnalysisJob->new( -dbID => -1 );
unless($input_id) {
my ($param_hash, $param_list) = parse_cmdline_options();
$input_id = stringify($param_hash);
}
$job->input_id( $input_id );
warn "\nRunning '$runnable_module' with input_id='$input_id' :\n";
$job->param_init( $runnable_object->strict_hash_format(), $runnable_object->param_defaults(), $job->input_id() );
$flow_into = $flow_into ? destringify($flow_into) : []; # empty dataflow for branch 1 by default
$flow_into = { 1 => $flow_into } unless(ref($flow_into) eq 'HASH'); # force non-hash into a hash
......@@ -49,35 +56,13 @@ foreach my $branch_code (keys %$flow_into) {
$job->dataflow_rules( $branch_code, \@dataflow_rules );
}
my $input_id = stringify($param_hash);
$job->autoflow(1);
$job->input_id( $input_id );
warn "\nRunning '$runnable_module' with '$input_id' :\n";
$runnable_object->input_job($job);
if($debug) {
$runnable_object->debug($debug);
}
$runnable_object->execute_writes( not $no_write );
# job's life cycle:
warn "\nFETCH_INPUT:\n";
$runnable_object->fetch_input();
warn "\nRUN:\n";
$runnable_object->run();
unless($no_write) {
warn "\nWRITE_OUTPUT:\n";
$runnable_object->write_output();
if( $job->autoflow ) {
warn "\nAUTOFLOW input->output\n";
$job->dataflow_output_id( $job->param() );
}
}
warn "\nDONE.\n";
exit(0);
$runnable_object->life_cycle();
__DATA__
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment