Commit d5f534d8 authored by Leo Gordon's avatar Leo Gordon
Browse files

adding ability to jobs to throw and record messages on success (plus other cleanup bits)

parent df8280ab
......@@ -141,12 +141,13 @@ sub stderr_file {
##-----------------[indicators to the Worker]--------------------------------
sub lethal_for_worker { # Job should set this to 1 prior to dying (or before running code that might cause death - such as RunnableDB's compilation)
# if it believes that the state of things will not allow the Worker to continue normally.
# The Worker will check the flag and commit suicide if it is set to true.
my $self = shift;
$self->{'_lethal_for_worker'} = shift if(@_);
return $self->{'_lethal_for_worker'};
my $self = shift;
$self->{'_lethal_for_worker'} = shift if(@_);
return $self->{'_lethal_for_worker'};
}
sub transient_error { # Job should set this to 1 prior to dying (or before running code that might cause death)
......@@ -154,9 +155,16 @@ sub transient_error { # Job should set this to 1 prior to dying (or before
# It may also set it to 0 prior to dying (or before running code that might cause death)
# if it believes that there is no point in re-trying (say, if the parameters are wrong).
# The Worker will check the flag and make necessary adjustments to the database state.
my $self = shift;
$self->{'_transient_error'} = shift if(@_);
return $self->{'_transient_error'};
my $self = shift;
$self->{'_transient_error'} = shift if(@_);
return $self->{'_transient_error'};
}
sub incomplete { # Job should set this to 0 prior to throwing if the job is done,
# but it wants the thrown message to be recorded with is_error=0.
my $self = shift;
$self->{'_incomplete'} = shift if(@_);
return $self->{'_incomplete'};
}
##-----------------[/indicators to the Worker]-------------------------------
......
......@@ -43,8 +43,9 @@ use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::DBSQL::BaseAdaptor;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
=head2 fetch_by_analysis_id
......@@ -242,7 +243,11 @@ sub update_status
Arg [1] : int $analysis_id
Arg [2] : int $jobs_done_in_interval
Arg [3] : int $interval_msec
Example : $statsDBA->incremental_update_work_done($analysis_id, $jobs_done, $interval_msecs);
Arg [4] : int $fetching_msec
Arg [5] : int $running_msec
Arg [6] : int $writing_msec
Arg [7] : real $weight_factor [optional]
Example : $statsDBA->interval_update_work_done($analysis_id, $jobs_done, $interval_msec, $fetching_msec, $running_msec, $writing_msec);
Description : does a database update to recalculate the avg_msec_per_job and done_job_count
does an interval equation by multiplying out the previous done_job_count with the
previous avg_msec_per_job and then expanding by new interval values to give a better average.
......@@ -250,23 +255,21 @@ sub update_status
=cut
sub interval_update_work_done
{
my ($self, $analysis_id, $job_count, $interval, $worker, $weight_factor) = @_;
sub interval_update_work_done {
my ($self, $analysis_id, $job_count, $interval_msec, $fetching_msec, $running_msec, $writing_msec, $weight_factor) = @_;
$weight_factor ||= 3; # makes it more sensitive to the dynamics of the farm
my $sql = "UPDATE analysis_stats SET ".
"unclaimed_job_count = unclaimed_job_count - $job_count, ".
"avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval) / (done_job_count/$weight_factor + $job_count)), ".
"avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + ".
($worker->{fetch_time}).") / (done_job_count/$weight_factor + $job_count)), ".
"avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + ".
($worker->{run_time}).") / (done_job_count/$weight_factor + $job_count)), ".
"avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + ".
($worker->{write_time}).") / (done_job_count/$weight_factor + $job_count)), ".
"done_job_count = done_job_count + $job_count ".
" WHERE analysis_id= $analysis_id";
my $sql = qq{
UPDATE analysis_stats SET
unclaimed_job_count = unclaimed_job_count - $job_count,
avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)),
avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)),
avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)),
avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count)),
done_job_count = done_job_count + $job_count ".
WHERE analysis_id= $analysis_id
};
$self->dbc->do($sql);
}
......@@ -324,6 +327,16 @@ sub decrease_running_workers
$self->dbc->do($sql);
}
sub decrease_running_workers_on_hive_overload {
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
"WHERE num_running_workers > hive_capacity AND analysis_id = $analysis_id ";
my $row_count = $self->dbc->do($sql);
return $row_count;
}
sub decrease_needed_workers
{
......
......@@ -63,7 +63,7 @@ sub get_available_adaptors {
'DataflowRule' => 'Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor',
'ResourceDescription' => 'Bio::EnsEMBL::Hive::DBSQL::ResourceDescriptionAdaptor',
'NakedTable' => 'Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor',
'JobError' => 'Bio::EnsEMBL::Hive::DBSQL::JobErrorAdaptor',
'JobMessage' => 'Bio::EnsEMBL::Hive::DBSQL::JobMessageAdaptor',
);
return (\%pairs);
}
......
......@@ -2,15 +2,15 @@
=head1 NAME
Bio::EnsEMBL::Hive::DBSQL::JobErrorAdaptor
Bio::EnsEMBL::Hive::DBSQL::JobMessageAdaptor
=head1 SYNOPSIS
$dba->get_JobErrorAdaptor->register_error($job_id, $error_msg);
$dba->get_JobMessageAdaptor->register_message($job_id, $msg, $is_error);
=head1 DESCRIPTION
This is currently an "objectless" adaptor that simply helps to store job death events into job_error table.
This is currently an "objectless" adaptor that simply helps to store messages thrown by jobs into job_message table.
=head1 CONTACT
......@@ -19,26 +19,26 @@
=cut
package Bio::EnsEMBL::Hive::DBSQL::JobErrorAdaptor;
package Bio::EnsEMBL::Hive::DBSQL::JobMessageAdaptor;
use strict;
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
sub register_error {
my ($self, $job_id, $error_msg) = @_;
sub register_message {
my ($self, $job_id, $msg, $is_error) = @_;
chomp $error_msg; # we don't want that last "\n" in the database
chomp $msg; # we don't want that last "\n" in the database
# (the timestamp column will be set automatically)
# (the timestamp 'moment' column will be set automatically)
my $sql = qq{
REPLACE INTO job_error (analysis_job_id, worker_id, retry_count, status, error_msg)
SELECT analysis_job_id, worker_id, retry_count, status, ?
REPLACE INTO job_error (analysis_job_id, worker_id, retry_count, status, msg, is_error)
SELECT analysis_job_id, worker_id, retry_count, status, ?, ?
FROM analysis_job WHERE analysis_job_id=?
};
my $sth = $self->prepare( $sql );
$sth->execute( $error_msg, $job_id );
$sth->execute( $msg, $is_error, $job_id );
$sth->finish();
}
......
......@@ -75,8 +75,6 @@ sub fetch_input {
# Store the value with parameter substitutions for the actual execution:
#
$self->param('cmd', $self->param_substitute($cmd));
return 1; # success
}
=head2 run
......@@ -94,8 +92,6 @@ sub run {
$return_value >>= 8;
die "system( $cmd ) failed: $return_value";
}
return 1; # success
}
=head2 write_output
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::Utils::Stopwatch
=head1 SYNOPSIS
my $total_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart;
my $fetching_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
my $running_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
$fetching_stopwatch->continue();
$runnable->fetch_input();
$fetching_stopwatch->pause();
$running_stopwatch->continue();
$runnable->run();
$running_stopwatch->pause();
# ...
my $only_fetches = $fetching_stopwatch->get_elapsed; # probably stopped
my $total_time_elapsed = $total_stopwatch->get_elapsed; # running through
=head1 DESCRIPTION
This is a standalone object used to time various events in the Hive.
=head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut
package Bio::EnsEMBL::Hive::Utils::Stopwatch;
use strict;
use Time::HiRes qw(time);
my $default_unit = 1000; # milliseconds
sub new {
my $class = shift @_;
my $self = bless {}, $class;
return $self;
}
sub _unit { # only set it once for each timer to avoid messing everything up
my $self = shift;
$self->{'_unit'} = shift if(@_);
return $self->{'_unit'} || $default_unit;
}
sub is_counting {
my $self = shift;
$self->{'_is_counting'} = shift if(@_);
return $self->{'_is_counting'} || 0;
}
sub accumulated {
my $self = shift;
$self->{'_accumulated'} = shift if(@_);
return $self->{'_accumulated'} || 0;
}
sub continue {
my $self = shift @_;
unless($self->is_counting) { # ignore if it was already running
$self->is_counting(1);
$self->{'_start'} = time() * $self->_unit
}
}
sub restart {
my $self = shift @_;
$self->accumulated(0);
$self->continue;
}
sub get_elapsed { # peek without stopping (in case it was running)
my $self = shift @_;
return ($self->accumulated + $self->is_counting * (time() * $self->_unit - $self->{'_start'}));
}
sub pause {
my $self = shift @_;
$self->accumulated( $self->get_elapsed );
$self->is_counting(0);
}
1;
......@@ -66,8 +66,7 @@ package Bio::EnsEMBL::Hive::Worker;
use strict;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
use Sys::Hostname;
use Time::HiRes qw(time);
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
use POSIX;
use Bio::EnsEMBL::Analysis;
......@@ -93,7 +92,12 @@ sub new {
sub init {
my $self = shift;
$self->start_time(time());
my $lifespan_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
$lifespan_stopwatch->_unit(1); # count in seconds (default is milliseconds)
$lifespan_stopwatch->restart;
$self->lifespan_stopwatch( $lifespan_stopwatch );
$self->debug(0);
return $self;
}
......@@ -167,26 +171,30 @@ sub analysis {
=cut
sub life_span { # default life_span = 60minutes
my( $self, $value ) = @_;
$self->{'_life_span'} = 60*60 unless(defined($self->{'_life_span'}));
$self->{'_life_span'} = $value if(defined($value));
return $self->{'_life_span'};
my ($self, $value) = @_;
if(defined($value)) { # you can still set it to 0 and avoid having the limit on lifespan
$self->{'_life_span'} = $value;
} elsif(!defined($self->{'_life_span'})) {
$self->{'_life_span'} = 60*60;
}
return $self->{'_life_span'};
}
sub start_time {
sub lifespan_stopwatch {
my $self = shift @_;
if(@_) {
$self->{'start_time'} = shift @_;
$self->{'_lifespan_stopwatch'} = shift @_;
}
return $self->{'start_time'};
return $self->{'_lifespan_stopwatch'};
}
sub life_span_limit_reached {
my $self = shift @_;
if( $self->life_span() ) {
my $alive_for_secs = time()-$self->start_time();
my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
if($alive_for_secs > $self->life_span() ) {
return $alive_for_secs;
}
......@@ -240,6 +248,17 @@ sub job_limit_reached {
return 0;
}
# By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
#
# Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
# but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die.
sub prev_job_error {
my $self = shift @_;
$self->{'_prev_job_error'} = shift if(@_);
return $self->{'_prev_job_error'};
}
sub worker_id {
......@@ -485,11 +504,11 @@ sub run
$self->db->dbc->disconnect_when_inactive(0);
do { # Worker's lifespan loop (ends only when the worker dies)
my $batches_start = time() * 1000;
my $batches_end = $batches_start;
$self->{fetch_time} = 0;
$self->{run_time} = 0;
$self->{write_time} = 0;
my $batches_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
$self->{'fetching_stopwatch'} = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
$self->{'running_stopwatch'} = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
$self->{'writing_stopwatch'} = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
BATCHES: { # note: in order to label a do{} loop you have to enclose it in an extra block
do { # Worker's "batches loop" exists to prevent logging the status too frequently.
......@@ -512,41 +531,50 @@ sub run
$job->print_job if($self->debug);
$self->start_job_output_redirection($job);
eval { # capture any death event
eval { # capture any throw/die
$self->run_module_with_job($job);
};
my $error_msg = $@;
my $msg_thrown = $@;
$self->stop_job_output_redirection($job);
if($error_msg) {
my $job_id = $job->dbID();
my $job_status_when_died = $job->status();
warn "Job with id=$job_id died in status '$job_status_when_died' for the following reason: $error_msg\n";
$self->db()->get_JobErrorAdaptor()->register_error($job_id, $error_msg);
if($msg_thrown) { # record the message - whether it was a success or failure:
my $job_id = $job->dbID();
my $job_status_at_the_moment = $job->status();
warn "Job with id=$job_id died in status '$job_status_at_the_moment' for the following reason: $msg_thrown\n";
$self->db()->get_JobMessageAdaptor()->register_message($job_id, $msg_thrown, $job->incomplete );
}
if($job->incomplete) {
# If the job specifically said what to do next, respect that last wish.
# Otherwise follow the default behaviour set by the beekeeper in $worker:
#
my $attempt_to_retry_this_job = defined($job->transient_error) ? $job->transient_error : $self->retry_throwing_jobs;
if($attempt_to_retry_this_job) {
$job->adaptor->reset_dead_job_by_dbID($job_id);
$job->adaptor->reset_dead_job_by_dbID($job->dbID);
} else {
$job->update_status('FAILED');
}
if($job->lethal_for_worker) { # either a compilation error or other job-sanctioned contamination
if( ($job->status eq 'COMPILATION') # if it failed to compile, there is no point in continuing as the code WILL be broken
or $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
or $job->lethal_for_worker ) { # trust the job's expert knowledge
warn "Job's error has contaminated the Worker, so the Worker will now die\n";
$self->cause_of_death('CONTAMINATED');
last BATCHES;
}
} else {
} else { # job successfully completed:
if(my $semaphored_job_id = $job->semaphored_job_id) {
$job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # step-unblock the semaphore after job is (successfully) done
$job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # step-unblock the semaphore
}
$self->more_work_done;
$jobs_done_by_batches_loop++;
$job->update_status('DONE');
}
$self->prev_job_error( $job->incomplete );
$self->enter_status('READY');
}
$batches_end = time() * 1000;
if( $specific_job ) {
$self->cause_of_death('JOB_LIMIT');
......@@ -557,24 +585,27 @@ sub run
print "life_span limit reached (alive for $alive_for_secs secs)\n";
$self->cause_of_death('LIFESPAN');
}
} while (!$self->cause_of_death and $batches_end-$batches_start < $MIN_BATCH_TIME);
} while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $MIN_BATCH_TIME);
} # this is the extra block enclosing a labelled do{} loop
# The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
# so they are not allowed to happen too frequently (not before $MIN_BATCH_TIME of work has been done)
#
$self->db->get_AnalysisStatsAdaptor->interval_update_work_done($self->analysis->dbID,
$self->work_done, $batches_end-$batches_start, $self);
$self->db->get_AnalysisStatsAdaptor->interval_update_work_done(
$self->analysis->dbID,
$jobs_done_by_batches_loop,
$batches_stopwatch->get_elapsed,
$self->{'fetching_stopwatch'}->get_elapsed,
$self->{'running_stopwatch'}->get_elapsed,
$self->{'writing_stopwatch'}->get_elapsed,
);
if (!$self->cause_of_death
and $self->analysis->stats->hive_capacity >= 0
and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity) {
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
"WHERE num_running_workers > hive_capacity AND analysis_id = " . $self->analysis->stats->analysis_id;
my $row_count = $self->queen->dbc->do($sql);
if ($row_count == 1) {
and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity
and $self->analysis->stats->adaptor->decrease_running_workers_on_hive_overload( $self->analysis->dbID ) # careful with order, this operation has side-effect
) {
$self->cause_of_death('HIVE_OVERLOAD');
}
}
} while (!$self->cause_of_death); # /Worker's lifespan loop
......@@ -606,17 +637,15 @@ sub run
sub run_module_with_job {
my ($self, $job) = @_;
my ($start_time, $end_time);
$job->incomplete(1);
$self->enter_status('COMPILATION');
$job->update_status('COMPILATION');
$job->lethal_for_worker(1); # if it dies in this state, it will kill the Worker
my $runObj = $self->analysis->process or die "Unknown compilation error";
$job->lethal_for_worker(0); # not dangerous anymore
my $native_hive_process = $runObj->isa("Bio::EnsEMBL::Hive::Process");
my $init_time = time() * 1000;
my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
$self->queen->dbc->query_count(0);
#pass the input_id from the job into the Process object
......@@ -630,55 +659,43 @@ sub run_module_with_job {
$runObj->db($self->db);
}
CARRY_ON: {
my $carry_on = 1;
$self->enter_status('GET_INPUT');
$job->update_status('GET_INPUT');
print("\nGET_INPUT\n") if($self->debug);
$self->enter_status('GET_INPUT');
$job->update_status('GET_INPUT');
print("\nGET_INPUT\n") if($self->debug);
$self->{'fetching_stopwatch'}->continue();
$runObj->fetch_input;
$self->{'fetching_stopwatch'}->pause();
$start_time = time() * 1000;
$carry_on = $runObj->fetch_input;
$end_time = time() * 1000;
$self->{fetch_time} += $end_time - $start_time;
$self->enter_status('RUN');
$job->update_status('RUN');
print("\nRUN\n") if($self->debug);
last CARRY_ON if(defined($carry_on) and $carry_on==0); # if 0 is returned, leave early
$self->{'running_stopwatch'}->continue();
$runObj->run;
$self->{'running_stopwatch'}->pause();
$self->enter_status('RUN');
$job->update_status('RUN');
print("\nRUN\n") if($self->debug);
if($self->execute_writes) {
$self->enter_status('WRITE_OUTPUT');
$job->update_status('WRITE_OUTPUT');
print("\nWRITE_OUTPUT\n") if($self->debug);
$start_time = time() * 1000;
$carry_on = $runObj->run;
$end_time = time() * 1000;
$self->{run_time} += $end_time - $start_time;
$self->{'writing_stopwatch'}->continue();
$runObj->write_output;
$self->{'writing_stopwatch'}->pause();
last CARRY_ON if(defined($carry_on) and $carry_on==0); # if 0 is returned, leave early
if($self->execute_writes) {
$self->enter_status('WRITE_OUTPUT');
$job->update_status('WRITE_OUTPUT');
print("\nWRITE_OUTPUT\n") if($self->debug);
$start_time = time() * 1000;
$runObj->write_output;
$end_time = time() * 1000;
$self->{write_time} += $end_time - $start_time;
if( $native_hive_process and $runObj->autoflow_inputjob ) {
printf("AUTOFLOW input->output\n") if($self->debug);
$runObj->dataflow_output_id();
}
} else {
print("\n\n!!!! NOT write_output\n\n\n") if($self->debug);
if( $native_hive_process and $runObj->autoflow_inputjob ) {
printf("AUTOFLOW input->output\n") if($self->debug);
$runObj->dataflow_output_id();
}
} else {
print("\n\n!!!! NOT write_output\n\n\n") if($self->debug);
}
$job->query_count($self->queen->dbc->query_count);
$job->runtime_msec(time()*1000 - $init_time);
$job->runtime_msec( $job_stopwatch->get_elapsed );
$job->update_status('DONE');
$self->enter_status('READY');
$job->incomplete(0);
}
sub enter_status {
......
......@@ -148,26 +148,29 @@ CREATE TABLE analysis_job (