Commit f27c721c authored by Leo Gordon's avatar Leo Gordon
Browse files

job_error table now holds jobs' termination messages; workers do not die when...

job_error table now holds jobs' termination messages; workers do not die when jobs die, but can be instructed to to so
parent c1df377e
...@@ -139,6 +139,14 @@ sub stderr_file { ...@@ -139,6 +139,14 @@ sub stderr_file {
return $self->{'_stderr_file'}; return $self->{'_stderr_file'};
} }
sub lethal { # Job should set this to 1 prior to dying (or before running code that might cause death - such as RunnableDB's compilation)
# if it believes that the state of things will not allow the Worker to continue normally.
# The Worker will check the flag and commit suicide if it is set to true.
my $self = shift;
$self->{'_lethal'} = shift if(@_);
return $self->{'_lethal'};
}
sub print_job { sub print_job {
my $self = shift; my $self = shift;
my $logic_name = $self->adaptor() my $logic_name = $self->adaptor()
......
...@@ -63,6 +63,7 @@ sub get_available_adaptors { ...@@ -63,6 +63,7 @@ sub get_available_adaptors {
'DataflowRule' => 'Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor', 'DataflowRule' => 'Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor',
'ResourceDescription' => 'Bio::EnsEMBL::Hive::DBSQL::ResourceDescriptionAdaptor', 'ResourceDescription' => 'Bio::EnsEMBL::Hive::DBSQL::ResourceDescriptionAdaptor',
'NakedTable' => 'Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor', 'NakedTable' => 'Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor',
'JobError' => 'Bio::EnsEMBL::Hive::DBSQL::JobErrorAdaptor',
); );
return (\%pairs); return (\%pairs);
} }
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::DBSQL::JobErrorAdaptor
=head1 SYNOPSIS
$dba->get_JobErrorAdaptor->register_error($job_id, $error_msg);
=head1 DESCRIPTION
This is currently an "objectless" adaptor that simply helps to store job death events into job_error table.
=head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut
package Bio::EnsEMBL::Hive::DBSQL::JobErrorAdaptor;
use strict;
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
sub register_error {
my ($self, $job_id, $error_msg) = @_;
chomp $error_msg; # we don't want that last "\n" in the database
# (the timestamp column will be set automatically)
my $sql = qq{
REPLACE INTO job_error (analysis_job_id, worker_id, retry_count, status, error_msg)
SELECT analysis_job_id, worker_id, retry_count, status, ?
FROM analysis_job WHERE analysis_job_id=?
};
my $sth = $self->prepare( $sql );
$sth->execute( $error_msg, $job_id );
$sth->finish();
}
1;
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
package Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor; package Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor;
use strict; use strict;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::NakedTable; use Bio::EnsEMBL::Hive::NakedTable;
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor'); use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
......
...@@ -25,7 +25,6 @@ ...@@ -25,7 +25,6 @@
package Bio::EnsEMBL::Hive::DBSQL::ResourceDescriptionAdaptor; package Bio::EnsEMBL::Hive::DBSQL::ResourceDescriptionAdaptor;
use strict; use strict;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::ResourceDescription; use Bio::EnsEMBL::Hive::ResourceDescription;
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor'); use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
......
...@@ -49,9 +49,8 @@ use Bio::EnsEMBL::Hive::URLFactory; ...@@ -49,9 +49,8 @@ use Bio::EnsEMBL::Hive::URLFactory;
=cut =cut
sub Bio::EnsEMBL::Analysis::process sub Bio::EnsEMBL::Analysis::process {
{ my $self = shift; #self is an Analysis object
my $self = shift; #self is an Analysis object
return undef unless($self); return undef unless($self);
die("self must be a [Bio::EnsEMBL::Analysis] not a [$self]") die("self must be a [Bio::EnsEMBL::Analysis] not a [$self]")
...@@ -67,12 +66,6 @@ sub Bio::EnsEMBL::Analysis::process ...@@ -67,12 +66,6 @@ sub Bio::EnsEMBL::Analysis::process
require "$file.pm"; require "$file.pm";
print STDERR "creating runnable ".$file."\n" if($self->{'verbose'}); print STDERR "creating runnable ".$file."\n" if($self->{'verbose'});
#make copy of analysis ($self) to pass into the Process
#to insulate the infrastructure from any modification the Process may
#do to the analysis object
my $copy_self = new Bio::EnsEMBL::Analysis;
%$copy_self = %$self;
$process_class =~ s/\//::/g; $process_class =~ s/\//::/g;
my $runobj = "$process_class"->new( my $runobj = "$process_class"->new(
-db => $self->adaptor->db, -db => $self->adaptor->db,
......
...@@ -61,8 +61,9 @@ sub default_options { ...@@ -61,8 +61,9 @@ sub default_options {
-dbname => $ENV{USER}.'_'.$self->o('pipeline_name'), # a rule where a previously defined parameter is used (which makes both of them optional) -dbname => $ENV{USER}.'_'.$self->o('pipeline_name'), # a rule where a previously defined parameter is used (which makes both of them optional)
}, },
'job_count' => 50, # controls the total number of FailureTest jobs 'job_count' => 20, # controls the total number of FailureTest jobs
'failure_rate' => 2, # controls the rate of jobs that are programmed to fail 'failure_rate' => 3, # controls the rate of jobs that are programmed to fail
'state' => 'RUN', # controls in which state the jobs are programmed to fail
}; };
} }
...@@ -83,13 +84,14 @@ sub pipeline_analyses { ...@@ -83,13 +84,14 @@ sub pipeline_analyses {
{ -logic_name => 'generate_jobs', { -logic_name => 'generate_jobs',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::JobFactory', -module => 'Bio::EnsEMBL::Hive::RunnableDB::JobFactory',
-parameters => { -parameters => {
'inputlist' => '#expr([1..$job_count])expr#', # this expression will evaluate into a listref 'inputlist' => '#expr([0..$job_count-1])expr#', # this expression will evaluate into a listref
}, },
-input_ids => [ -input_ids => [
{ {
'job_count' => $self->o('job_count'), # turn this option into a passable parameter 'job_count' => $self->o('job_count'), # turn this option into a passable parameter
'failure_rate' => $self->o('failure_rate'), # turn the other option into a passable parameter as well 'failure_rate' => $self->o('failure_rate'), # turn the other option into a passable parameter as well
'input_id' => { 'value' => '#_range_start#', 'divisor' => '#failure_rate#' }, 'state' => $self->o('state'), # turn the third option into a passable parameter too
'input_id' => { 'value' => '#_range_start#', 'divisor' => '#failure_rate#', 'state' => '#state#' },
}, },
], ],
-flow_into => { -flow_into => {
......
...@@ -324,19 +324,6 @@ sub worker_reclaim_job { ...@@ -324,19 +324,6 @@ sub worker_reclaim_job {
} }
sub worker_register_job_done {
my $self = shift;
my $worker = shift;
my $job = shift;
return unless($job);
return unless($job->dbID and $job->adaptor and $job->worker_id);
return unless($worker and $worker->analysis and $worker->analysis->dbID);
$job->update_status('DONE');
}
###################################### ######################################
# #
# Public API interface for beekeeper # Public API interface for beekeeper
......
...@@ -29,15 +29,17 @@ Available parameters: ...@@ -29,15 +29,17 @@ Available parameters:
param('divisor'): defines the failure rate for this particular analysis. If the modulo (value % divisor) is 0, the job will fail. param('divisor'): defines the failure rate for this particular analysis. If the modulo (value % divisor) is 0, the job will fail.
For example, if param('divisor')==5, jobs with 5, 10, 15, 20, 25,... param('value') will fail. For example, if param('divisor')==5, jobs with 5, 10, 15, 20, 25,... param('value') will fail.
param('time_fetching'): is time in seconds that the job will spend sleeping in FETCH_INPUT state. param('state'): defines the state in which the jobs of this analysis may be failing.
param('time_running'): is time in seconds that the job will spend sleeping in RUN state. param('time_GET_INPUT'): time in seconds that the job will spend sleeping in GET_INPUT state.
param('time_writing'): is time in seconds that the job will spend sleeping in WRITE_OUTPUT state. param('time_RUN'): time in seconds that the job will spend sleeping in RUN state.
param('time_WRITE_OUTPUT'): time in seconds that the job will spend sleeping in WRITE_OUTPUT state.
=head1 CONTACT =head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions. Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut =cut
...@@ -48,10 +50,14 @@ use strict; ...@@ -48,10 +50,14 @@ use strict;
use base ('Bio::EnsEMBL::Hive::ProcessWithParams'); use base ('Bio::EnsEMBL::Hive::ProcessWithParams');
BEGIN {
# die "Could not compile this nonsense!";
}
=head2 fetch_input =head2 fetch_input
Description : Implements fetch_input() interface method of Bio::EnsEMBL::Hive::Process that is used to read in parameters and load data. Description : Implements fetch_input() interface method of Bio::EnsEMBL::Hive::Process that is used to read in parameters and load data.
Here it only sets default values of parameters and sleeps for param('time_fetching'). Here it sets default values of parameters and calls dangerous_math() subroutine.
=cut =cut
...@@ -59,53 +65,70 @@ sub fetch_input { ...@@ -59,53 +65,70 @@ sub fetch_input {
my $self = shift @_; my $self = shift @_;
$self->param_init( $self->param_init(
'value' => 1, # normally you generate a batch of jobs with different values of param('value') 'value' => 1, # normally you generate a batch of jobs with different values of param('value')
'divisor' => 2, # but the same param('divisor') and see how every param('divisor')'s job will crash 'divisor' => 2, # but the same param('divisor') and see how every param('divisor')'s job will crash
'state' => 'RUN', # the state in which the process may commit apoptosis
'time_fetching' => 0, # how much time fetch_input() will spend in sleeping state 'time_GET_INPUT' => 0, # how much time fetch_input() will spend in sleeping state
'time_running' => 0, # how much time run() will spend in sleeping state 'time_RUN' => 1, # how much time run() will spend in sleeping state
'time_writing' => 0, # how much time write_output() will spend in sleeping state 'time_WRITE_OUTPUT' => 0, # how much time write_output() will spend in sleeping state
); );
# Sleep as required: $self->dangerous_math('GET_INPUT');
sleep($self->param('time_fetching'));
} }
=head2 run =head2 run
Description : Implements run() interface method of Bio::EnsEMBL::Hive::Process that is used to perform the main bulk of the job (minus input and output). Description : Implements run() interface method of Bio::EnsEMBL::Hive::Process that is used to perform the main bulk of the job (minus input and output).
Here it sleeps for param('time_running') and then decides whether to fail or succeed depending on param('value') and param('divisor'). Here it only calls dangerous_math() subroutine.
=cut =cut
sub run { sub run {
my $self = shift @_; my $self = shift @_;
my $value = $self->param('value'); $self->dangerous_math('RUN');
my $divisor = $self->param('divisor');
# Sleep as required:
sleep($self->param('time_running'));
if(!$divisor or !$value) {
die "Wrong parameters: divisor = $divisor and value = $value\n";
} elsif ($value % $divisor == 0) {
die "$value % $divisor is 0 => die!\n";
}
} }
=head2 write_output =head2 write_output
Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution. Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
Here it only sleeps for param('time_writing'). Here it only calls dangerous_math() subroutine.
=cut =cut
sub write_output { sub write_output {
my $self = shift @_; my $self = shift @_;
# Sleep as required: $self->dangerous_math('WRITE_OUTPUT');
sleep($self->param('time_writing')); }
=head2 dangerous_math
Description: an internal subroutine that will first sleep for some predefined time,
and then either return or crash if $value is an integral multiple of $divisor.
=cut
sub dangerous_math {
my ($self, $current_state) = @_;
# First, sleep as required:
sleep($self->param('time_'.$current_state));
my $state = $self->param('state');
return if($current_state ne $state);
my $value = $self->param('value') or die "param('value') has to be a nonzero integer";
my $divisor = $self->param('divisor') or die "param('divisor') has to be a nonzero integer";
if($value % $divisor == 0) {
if($value>10) { # take the Worker with us into the grave
$self->input_job->lethal(1);
}
die "Preprogrammed death since $value is a multiple of $divisor";
}
} }
1; 1;
......
...@@ -479,13 +479,13 @@ sub run ...@@ -479,13 +479,13 @@ sub run
do { # Worker's lifespan loop (ends only when the worker dies) do { # Worker's lifespan loop (ends only when the worker dies)
my $batches_start = time() * 1000; my $batches_start = time() * 1000;
my $batches_end = $batches_start; my $batches_end = $batches_start;
my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
$self->{fetch_time} = 0; $self->{fetch_time} = 0;
$self->{run_time} = 0; $self->{run_time} = 0;
$self->{write_time} = 0; $self->{write_time} = 0;
BATCHES: { # note: in order to label a do{} loop you have to enclose it in an extra block
do { # Worker's "batches loop" exists to prevent logging the status too frequently. do { # Worker's "batches loop" exists to prevent logging the status too frequently.
# If a batch took less than $MIN_BATCH_TIME to run, the Worker keeps taking&running more batches. # If a batch took less than $MIN_BATCH_TIME to run, the Worker keeps taking&running more batches.
my $jobs = $specific_job my $jobs = $specific_job
? [ $self->queen->worker_reclaim_job($self,$specific_job) ] ? [ $self->queen->worker_reclaim_job($self,$specific_job) ]
...@@ -503,20 +503,33 @@ sub run ...@@ -503,20 +503,33 @@ sub run
foreach my $job (@{$jobs}) { foreach my $job (@{$jobs}) {
$job->print_job if($self->debug); $job->print_job if($self->debug);
$self->redirect_job_output($job); $self->start_job_output_redirection($job);
$self->run_module_with_job($job); eval { # capture any death event
$self->close_and_update_job_output($job); $self->run_module_with_job($job);
};
$self->queen->worker_register_job_done($self, $job); my $error_msg = $@;
$self->stop_job_output_redirection($job);
if(my $semaphored_job_id = $job->semaphored_job_id) {
$job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # step-unblock the semaphore after job is (successfully) done if($error_msg) {
my $job_id = $job->dbID();
my $job_status_when_died = $job->status();
warn "Job with id=$job_id died in status '$job_status_when_died' for the following reason: $error_msg\n";
$self->db()->get_JobErrorAdaptor()->register_error($job_id, $error_msg);
$job->update_status('FAILED');
if($job->lethal) { # either a compilation error or other job-sanctioned contamination
warn "Job's error has contaminated the Worker, so the Worker will now die\n";
$self->cause_of_death('CONTAMINATED');
last BATCHES;
}
} else {
if(my $semaphored_job_id = $job->semaphored_job_id) {
$job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # step-unblock the semaphore after job is (successfully) done
}
$self->more_work_done;
} }
$self->more_work_done;
} }
$batches_end = time() * 1000; $batches_end = time() * 1000;
$jobs_done_by_batches_loop += scalar(@$jobs);
if( $specific_job ) { if( $specific_job ) {
$self->cause_of_death('JOB_LIMIT'); $self->cause_of_death('JOB_LIMIT');
...@@ -528,12 +541,13 @@ sub run ...@@ -528,12 +541,13 @@ sub run
$self->cause_of_death('LIFESPAN'); $self->cause_of_death('LIFESPAN');
} }
} while (!$self->cause_of_death and $batches_end-$batches_start < $MIN_BATCH_TIME); } while (!$self->cause_of_death and $batches_end-$batches_start < $MIN_BATCH_TIME);
} # this is the extra block enclosing a labelled do{} loop
# The following two database-updating operations are resource-expensive (all workers hammering the same database+tables), # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
# so they are not allowed to happen too frequently (not before $MIN_BATCH_TIME of work has been done) # so they are not allowed to happen too frequently (not before $MIN_BATCH_TIME of work has been done)
# #
$self->db->get_AnalysisStatsAdaptor->interval_update_work_done($self->analysis->dbID, $self->db->get_AnalysisStatsAdaptor->interval_update_work_done($self->analysis->dbID,
$jobs_done_by_batches_loop, $batches_end-$batches_start, $self); $self->work_done, $batches_end-$batches_start, $self);
if (!$self->cause_of_death if (!$self->cause_of_death
and $self->analysis->stats->hive_capacity >= 0 and $self->analysis->stats->hive_capacity >= 0
...@@ -577,15 +591,19 @@ sub run_module_with_job { ...@@ -577,15 +591,19 @@ sub run_module_with_job {
my ($start_time, $end_time); my ($start_time, $end_time);
my $runObj = $self->analysis->process; $self->enter_status('COMPILATION');
return 0 unless($runObj); $job->update_status('COMPILATION');
return 0 unless($job and ($job->worker_id eq $self->worker_id)); $job->lethal(1); # if it dies in this state, it will kill the Worker
my $runObj = $self->analysis->process or die "Unknown compilation error";
$job->lethal(0); # not dangerous anymore
my $native_hive_process = $runObj->isa("Bio::EnsEMBL::Hive::Process");
my $init_time = time() * 1000; my $init_time = time() * 1000;
$self->queen->dbc->query_count(0); $self->queen->dbc->query_count(0);
#pass the input_id from the job into the Process object #pass the input_id from the job into the Process object
if($runObj->isa("Bio::EnsEMBL::Hive::Process")) { if($native_hive_process) {
$runObj->input_job($job); $runObj->input_job($job);
$runObj->queen($self->queen); $runObj->queen($self->queen);
$runObj->worker($self); $runObj->worker($self);
...@@ -597,7 +615,7 @@ sub run_module_with_job { ...@@ -597,7 +615,7 @@ sub run_module_with_job {
my $analysis_stats = $self->analysis->stats; my $analysis_stats = $self->analysis->stats;
$self->enter_status("GET_INPUT"); $self->enter_status('GET_INPUT');
$job->update_status('GET_INPUT'); $job->update_status('GET_INPUT');
print("\nGET_INPUT\n") if($self->debug); print("\nGET_INPUT\n") if($self->debug);
...@@ -606,7 +624,7 @@ sub run_module_with_job { ...@@ -606,7 +624,7 @@ sub run_module_with_job {
$end_time = time() * 1000; $end_time = time() * 1000;
$self->{fetch_time} += $end_time - $start_time; $self->{fetch_time} += $end_time - $start_time;
$self->enter_status("RUN"); $self->enter_status('RUN');
$job->update_status('RUN'); $job->update_status('RUN');
print("\nRUN\n") if($self->debug); print("\nRUN\n") if($self->debug);
...@@ -616,7 +634,7 @@ sub run_module_with_job { ...@@ -616,7 +634,7 @@ sub run_module_with_job {
$self->{run_time} += $end_time - $start_time; $self->{run_time} += $end_time - $start_time;
if($self->execute_writes) { if($self->execute_writes) {
$self->enter_status("WRITE_OUTPUT"); $self->enter_status('WRITE_OUTPUT');
$job->update_status('WRITE_OUTPUT'); $job->update_status('WRITE_OUTPUT');
print("\nWRITE_OUTPUT\n") if($self->debug); print("\nWRITE_OUTPUT\n") if($self->debug);
...@@ -624,21 +642,20 @@ sub run_module_with_job { ...@@ -624,21 +642,20 @@ sub run_module_with_job {
$runObj->write_output; $runObj->write_output;
$end_time = time() * 1000; $end_time = time() * 1000;
$self->{write_time} += $end_time - $start_time; $self->{write_time} += $end_time - $start_time;
if( $native_hive_process and $runObj->autoflow_inputjob ) {
printf("AUTOFLOW input->output\n") if($self->debug);
$runObj->dataflow_output_id();
}
} else { } else {
print("\n\n!!!! NOT write_output\n\n\n") if($self->debug); print("\n\n!!!! NOT write_output\n\n\n") if($self->debug);
} }
$self->enter_status("READY");
$job->query_count($self->queen->dbc->query_count); $job->query_count($self->queen->dbc->query_count);
$job->runtime_msec(time()*1000 - $init_time); $job->runtime_msec(time()*1000 - $init_time);
if ($runObj->isa("Bio::EnsEMBL::Hive::Process") and $runObj->autoflow_inputjob $job->update_status('DONE');
and $self->execute_writes) { $self->enter_status('READY');
printf("AUTOFLOW input->output\n") if($self->debug);
$runObj->dataflow_output_id();
}
return 1;
} }
sub enter_status { sub enter_status {
...@@ -646,7 +663,7 @@ sub enter_status { ...@@ -646,7 +663,7 @@ sub enter_status {
return $self->queen->enter_status($self, $status); return $self->queen->enter_status($self, $status);
} }
sub redirect_job_output { sub start_job_output_redirection {
my $self = shift; my $self = shift;