Commit 128fc4da authored by Leo Gordon's avatar Leo Gordon
Browse files

only add partial timers' measurement if the job completed successfully

parent 07ad9326
......@@ -235,10 +235,15 @@ sub work_done {
return $self->{'work_done'} || 0;
}
sub more_work_done {
my $self = shift @_;
my ($self, $job_partial_timing) = @_;
$self->{'work_done'}++;
$self->{'work_done'}++;
while( my ($state, $partial_timing_in_state) = each %$job_partial_timing ) {
$self->{'_interval_partial_timing'}{$state} += $partial_timing_in_state;
}
}
......@@ -513,6 +518,10 @@ sub run {
if( $self->compile_module_once() ) {
$self->enter_status('COMPILATION');
my $runnable_object = $self->analysis->process or die "Unknown compilation error";
$runnable_object->db( $self->db );
$runnable_object->worker( $self );
$runnable_object->debug( $self->debug );
$self->runnable_object( $runnable_object );
$self->enter_status('READY');
}
......@@ -524,10 +533,8 @@ sub run {
do { # Worker's lifespan loop (ends only when the worker dies)
my $batches_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
$self->{'fetching_stopwatch'} = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
$self->{'running_stopwatch'} = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
$self->{'writing_stopwatch'} = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
$self->{'_interval_partial_timing'} = {};
if( my $specific_job = $self->_specific_job() ) {
$jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->reclaim_job_for_worker($self, $specific_job) );
......@@ -566,9 +573,9 @@ sub run {
$self->analysis->dbID,
$jobs_done_by_batches_loop,
$batches_stopwatch->get_elapsed,
$self->{'fetching_stopwatch'}->get_elapsed,
$self->{'running_stopwatch'}->get_elapsed,
$self->{'writing_stopwatch'}->get_elapsed,
$self->{'_interval_partial_timing'}{'FETCH_INPUT'},
$self->{'_interval_partial_timing'}{'RUN'},
$self->{'_interval_partial_timing'}{'WRITE_OUTPUT'},
);
}
......@@ -616,10 +623,13 @@ sub run_one_batch {
print "claimed ".scalar(@{$jobs})." jobs to process\n";
}
my $job_partial_timing;
foreach my $job (@{$jobs}) {
$job->print_job if($self->debug);
my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
$job_partial_timing = {};
$self->start_job_output_redirection($job); # switch logging into job's STDERR
eval { # capture any throw/die
......@@ -632,18 +642,24 @@ sub run_one_batch {
} else {
$self->enter_status('COMPILATION', $job);
$runnable_object = $self->analysis->process or die "Unknown compilation error";
$runnable_object->db( $self->db );
$runnable_object->worker( $self );
$runnable_object->debug( $self->debug );
}
$self->db->dbc->query_count(0);
$job_stopwatch->restart();
$self->run_module_with_job($runnable_object, $job);
$job->param_init( $runnable_object->strict_hash_format(), $runnable_object->param_defaults(), $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() );
$runnable_object->input_job( $job );
$job_partial_timing = $self->life_cycle($runnable_object);
$job->incomplete(0);
};
my $msg_thrown = $@;
$job->runtime_msec( $job_stopwatch->get_elapsed );
$job->runtime_msec( $job_stopwatch->get_elapsed ); # whether successful or not
$job->query_count( $self->db->dbc->query_count );
my $job_id = $job->dbID();
......@@ -679,7 +695,7 @@ sub run_one_batch {
return $jobs_done_here;
}
} else { # job successfully completed:
$self->more_work_done;
$self->more_work_done( $job_partial_timing );
$jobs_done_here++;
$job->update_status('DONE');
......@@ -696,15 +712,13 @@ sub run_one_batch {
}
sub run_module_with_job {
my ($self, $runnable_object, $job) = @_;
sub life_cycle {
my ($self, $runnable_object) = @_;
$runnable_object->input_job( $job );
$runnable_object->db( $self->db );
$runnable_object->worker( $self );
$runnable_object->debug( $self->debug );
my $job = $runnable_object->input_job();
my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
my %job_partial_timing = ();
$job->param_init( $runnable_object->strict_hash_format(), $runnable_object->param_defaults(), $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() );
$job->autoflow(1);
if( $runnable_object->can('pre_cleanup') and $job->retry_count()>0 ) {
......@@ -713,20 +727,20 @@ sub run_module_with_job {
}
$self->enter_status('FETCH_INPUT', $job);
$self->{'fetching_stopwatch'}->continue();
$partial_stopwatch->restart();
$runnable_object->fetch_input;
$self->{'fetching_stopwatch'}->pause();
$job_partial_timing{'FETCH_INPUT'} = $partial_stopwatch->get_elapsed();
$self->enter_status('RUN', $job);
$self->{'running_stopwatch'}->continue();
$partial_stopwatch->restart();
$runnable_object->run;
$self->{'running_stopwatch'}->pause();
$job_partial_timing{'RUN'} = $partial_stopwatch->get_elapsed();
if($self->execute_writes) {
$self->enter_status('WRITE_OUTPUT', $job);
$self->{'writing_stopwatch'}->continue();
$partial_stopwatch->restart();
$runnable_object->write_output;
$self->{'writing_stopwatch'}->pause();
$job_partial_timing{'WRITE_OUTPUT'} = $partial_stopwatch->get_elapsed();
if( $job->autoflow ) {
print STDERR "\njob ".$job->dbID." : AUTOFLOW input->output\n" if($self->debug);
......@@ -747,6 +761,7 @@ sub run_module_with_job {
die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
}
return \%job_partial_timing;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment