Skip to content
Snippets Groups Projects
Commit 86f57415 authored by Leo Gordon's avatar Leo Gordon
Browse files

bugfix: death in FETCH/RUN/WRITE should be reported separately from possible death in POST_CLEANUP

parent d30e1c1c
No related branches found
No related tags found
No related merge requests found
......@@ -226,6 +226,14 @@ sub incomplete { # Job should set this to 0 prior to throwing if the
return $self->{'_incomplete'};
}
sub died_somewhere {
my $self = shift;
$self->{'_died_somewhere'} ||= shift if(@_); # NB: the '||=' only applies in this case - do not copy around!
return $self->{'_died_somewhere'} ||=0;
}
##-----------------[/indicators to the Worker]-------------------------------
=head2 warning
......@@ -235,15 +243,22 @@ sub incomplete { # Job should set this to 0 prior to throwing if the
=cut
sub warning {
my ($self, $msg) = @_;
my ($self, $msg, $is_error) = @_;
if( my $job_adaptor = $self->adaptor ) {
$job_adaptor->db->get_LogMessageAdaptor()->store_job_message($self->dbID, $msg, 0);
} else {
print STDERR "Warning: $msg\n";
$is_error //= 0;
my $job_adaptor = $self->adaptor;
if( $is_error or !$job_adaptor) {
my $class = $is_error ? 'Error' : 'Warning';
print STDERR "Job${class}: $msg\n";
}
if( $job_adaptor ) {
$job_adaptor->db->get_LogMessageAdaptor()->store_job_message($self->dbID, $msg, $is_error);
}
}
sub fan_cache { # a self-initializing getter (no setting)
# Returns a hash-of-lists { 2 => [list of jobs waiting to be funneled into 2], 3 => [list of jobs waiting to be funneled into 3], etc}
my $self = shift;
......
......@@ -66,9 +66,7 @@ sub default_options {
return {
%{ $self->SUPER::default_options() }, # inherit other stuff from the base class
'pipeline_name' => 'failure_test', # name used by the beekeeper to prefix job names on the farm
'job_count' => 20, # controls the total number of FailureTest jobs
'job_count' => 10, # controls the total number of FailureTest jobs
'failure_rate' => 3, # controls the rate of jobs that are programmed to fail
'state' => 'RUN', # controls in which state the jobs are programmed to fail
'lethal_after' => 0,
......
......@@ -149,39 +149,40 @@ sub life_cycle {
}
};
my $error_msg = $@;
if(my $life_cycle_msg = $@) {
$job->died_somewhere( $job->incomplete ); # it will be OR'd inside
$self->warning( $life_cycle_msg, $job->incomplete );
}
if( $self->can('post_cleanup') ) { # may be run to clean up memory even after partially failed attempts
eval {
$job->incomplete(1); # it could have been reset by a previous call to complete_early
$self->enter_status('POST_CLEANUP');
$self->post_cleanup;
};
if($@) {
$error_msg .= $@;
$job->incomplete(1);
if(my $post_cleanup_msg = $@) {
$job->died_somewhere( $job->incomplete ); # it will be OR'd inside
$self->warning( $post_cleanup_msg, $job->incomplete );
}
}
if( $error_msg ) {
if( $job->incomplete ) { # retransmit the death message if it was not a suicide, continue otherwise
die $error_msg;
} else {
$self->warning( $error_msg );
unless( $job->died_somewhere ) {
if( $self->execute_writes and $job->autoflow ) { # AUTOFLOW doesn't have its own status so will have whatever previous state of the job
print STDERR "\njob ".$job->dbID." : AUTOFLOW input->output\n" if($self->debug);
$job->dataflow_output_id();
}
}
if( $self->execute_writes and $job->autoflow ) { # AUTOFLOW doesn't have its own status so will have whatever previous state of the job
print STDERR "\njob ".$job->dbID." : AUTOFLOW input->output\n" if($self->debug);
$job->dataflow_output_id();
}
my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
$job->transient_error(0);
die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
}
my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
$job->transient_error(0);
die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
}
$job->incomplete(0);
return \%job_partial_timing;
return \%job_partial_timing;
}
}
......
......@@ -161,6 +161,9 @@ sub write_output {
=cut
sub post_cleanup {
# my $self = shift @_;
#
# $self->dangerous_math( $self->param('state') ); # uncomment to simulate failures in POST_CLEANUP
}
......
......@@ -666,30 +666,26 @@ sub run_one_batch {
$runnable_object->input_job( $job ); # "take" the job
$job_partial_timing = $runnable_object->life_cycle();
$runnable_object->input_job( undef ); # release an extra reference to the job
$job->incomplete(0);
};
my $msg_thrown = $@;
if(my $msg = $@) {
$job->died_somewhere( $job->incomplete ); # it will be OR'd inside
$self->runnable_object->input_job->warning( $msg, $job->incomplete );
}
$job->runtime_msec( $job_stopwatch->get_elapsed ); # whether successful or not
# whether the job completed successfully or not:
$self->runnable_object->input_job( undef ); # release an extra reference to the job
$job->runtime_msec( $job_stopwatch->get_elapsed );
$job->query_count( $self->adaptor->db->dbc->query_count );
my $job_completion_line = "Job $job_id : complete";
if($msg_thrown) { # record the death message
my $job_status_at_the_moment = $job->status();
$job_completion_line = "Job $job_id : died in status '$job_status_at_the_moment' for the following reason: $msg_thrown";
$self->adaptor->db->get_LogMessageAdaptor()->store_job_message($job_id, $msg_thrown, $job->incomplete );
}
my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died' : 'complete' );
print STDERR "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->incomplete)); # one copy goes to the job's STDERR
print STDERR "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->died_somewhere)); # one copy goes to the job's STDERR
$self->stop_job_output_redirection($job); # and then we switch back to worker's STDERR
$self->worker_say( $job_completion_line ); # one copy goes to the worker's STDERR
$self->current_role->register_attempt( ! $job->incomplete );
$self->current_role->register_attempt( ! $job->died_somewhere );
if($job->incomplete) {
if($job->died_somewhere) {
# If the job specifically said what to do next, respect that last wish.
# Otherwise follow the default behaviour set by the beekeeper in $worker:
#
......@@ -724,7 +720,7 @@ sub run_one_batch {
}
}
$self->prev_job_error( $job->incomplete );
$self->prev_job_error( $job->died_somewhere );
$self->enter_status('READY');
} # /while(my $job = shift @$jobs)
......@@ -767,7 +763,7 @@ sub stop_job_output_redirection {
$self->get_stdout_redirector->pop();
$self->get_stderr_redirector->pop();
my $force_cleanup = !($self->debug || $job->incomplete);
my $force_cleanup = !($self->debug || $job->died_somewhere);
if($force_cleanup or -z $job->stdout_file) {
$self->worker_say( "Deleting '".$job->stdout_file."' file" );
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment