Commit 81f9f84d authored by Leo Gordon's avatar Leo Gordon
Browse files

streamlined some code

parent 7299c868
......@@ -519,7 +519,7 @@ sub claim_jobs_for_worker {
}
=head2 reset_dead_jobs_for_worker
=head2 release_undone_jobs_from_worker
Arg [1] : Bio::EnsEMBL::Hive::Worker object
Example :
......@@ -535,117 +535,78 @@ sub claim_jobs_for_worker {
=cut
sub reset_dead_jobs_for_worker {
sub release_undone_jobs_from_worker {
my ($self, $worker) = @_;
#added worker_id index to analysis_job table which made this operation much faster
my $max_retry_count = $worker->analysis->stats->max_retry_count();
my $worker_id = $worker->worker_id();
my $max_retry_count = $worker->analysis->stats->max_retry_count();
my $worker_id = $worker->worker_id();
#first just reset the claimed jobs, these don't need a retry_count index increment
$self->dbc->do( qq{
#first just reset the claimed jobs, these don't need a retry_count index increment:
$self->dbc->do( qq{
UPDATE analysis_job
SET job_claim='', status='READY'
WHERE status='CLAIMED'
AND worker_id='$worker_id'
} );
if(0) {
# an update with select on status and worker_id took 4seconds per worker to complete,
# while doing a select followed by update on analysis_job_id returned almost instantly
$self->dbc->do( qq{
UPDATE analysis_job SET job_claim='', status='READY', retry_count=retry_count+1
WHERE status in ('COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT')
AND retry_count<$max_retry_count
AND worker_id='$worker_id'
} );
$self->dbc->do( qq{
UPDATE analysis_job SET status='FAILED', retry_count=retry_count+1
WHERE status in ('COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT')
AND retry_count>=$max_retry_count
AND worker_id='$worker_id'
} );
}
} );
my $sql = qq{
my $sth = $self->prepare( qq{
SELECT analysis_job_id
FROM analysis_job
WHERE worker_id='$worker_id'
AND status in ('COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT')
};
my $sth = $self->prepare($sql);
} );
$sth->execute();
if(my ($job_id, $retry_count) = $sth->fetchrow_array()) {
my $cod = $worker->cause_of_death();
$self->db()->get_JobMessageAdaptor()->register_message($job_id, "GarbageCollected. The worker died because of $cod", 1 );
my $cod = $worker->cause_of_death();
my $msg = "GarbageCollector: The worker (worker_id=$worker_id) died because of $cod";
my $counter = 0;
while(my ($job_id, $retry_count) = $sth->fetchrow_array()) {
$self->db()->get_JobMessageAdaptor()->register_message($job_id, $msg, 1 );
if($cod eq 'MEMLIMIT'
or $cod eq 'RUNLIMIT') {
$self->dbc->do( qq{ UPDATE analysis_job SET status='FAILED' WHERE analysis_job_id=$job_id } );
} else {
$self->reset_dead_job_by_dbID($job_id);
}
my $may_retry = ($cod ne 'MEMLIMIT' and $cod ne 'RUNLIMIT');
$self->release_and_age_job( $job_id, $max_retry_count, $may_retry );
$counter++;
}
}
$sth->finish();
warn "$msg, released $counter job(s)\n";
}
sub reset_dead_job_by_dbID {
my $self = shift;
my $job_id = shift;
# an update with select on status and worker_id took 4seconds per worker to complete,
# while doing a select followed by update on analysis_job_id returned almost instantly
my $sql = "
UPDATE analysis_job, analysis_stats
SET job_claim='', analysis_job.status='READY', retry_count=retry_count+1
WHERE
analysis_job.status in ('COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT')
AND analysis_job.analysis_id = analysis_stats.analysis_id
AND retry_count < max_retry_count
AND analysis_job_id=$job_id";
#print("$sql\n");
$self->dbc->do($sql);
sub release_and_age_job {
my ($self, $job_id, $max_retry_count, $may_retry) = @_;
$sql = "
UPDATE analysis_job, analysis_stats
SET job_claim='', analysis_job.status='FAILED', retry_count=retry_count+1
WHERE
analysis_job.status in ('COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT')
AND analysis_job.analysis_id = analysis_stats.analysis_id
AND retry_count >= max_retry_count
AND analysis_job_id=$job_id";
#print("$sql\n");
$self->dbc->do($sql);
# NB: The order of updates IS important. Here we first find out the new status and then increment the retry_count:
$self->dbc->do( qq{
UPDATE analysis_job
SET worker_id=0, job_claim='', status=IF( $may_retry AND (retry_count<$max_retry_count), 'READY', 'FAILED'), retry_count=retry_count+1
WHERE status in ('COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT')
AND analysis_job_id=$job_id
} );
}
=head2 reset_job_by_dbID
Arg [1] : int $analysis_job_id
Arg [1] : int $job_id
Example :
Description: Forces a job to be reset to 'READY' so it can be run again.
Will also reset a previously 'BLOCKED' jobs to READY.
Exceptions : $job must be defined
Exceptions : $job_id must not be false or zero
Caller : user process
=cut
sub reset_job_by_dbID {
my $self = shift;
my $analysis_job_id = shift;
throw("must define job") unless($analysis_job_id);
my $self = shift;
my $job_id = shift or throw("job_id of the job to be reset is undefined");
my ($sql, $sth);
#first just reset the claimed jobs, these don't need a retry_count index increment
$sql = "UPDATE analysis_job SET worker_id=0, job_claim='', status='READY', retry_count=0 WHERE analysis_job_id=?";
$sth = $self->prepare($sql);
$sth->execute($analysis_job_id);
$sth->finish;
#print(" done update CLAIMED\n");
$self->dbc->do( qq{
UPDATE analysis_job
SET worker_id=0, job_claim='', status='READY', retry_count=0
WHERE analysis_job_id=$job_id
} );
}
......
......@@ -186,7 +186,7 @@ sub register_worker_death {
or $cod eq 'MEMLIMIT'
or $cod eq 'RUNLIMIT'
or $cod eq 'KILLED_BY_USER') {
$self->db->get_AnalysisJobAdaptor->reset_dead_jobs_for_worker($worker);
$self->db->get_AnalysisJobAdaptor->release_undone_jobs_from_worker($worker);
}
# re-sync the analysis_stats when a worker dies as part of dynamic sync system
......@@ -231,7 +231,7 @@ sub check_for_dead_workers {
if($bih_number) {
my $job_adaptor = $self->db->get_AnalysisJobAdaptor();
foreach my $worker (@$buried_in_haste_list) {
$job_adaptor->reset_dead_jobs_for_worker($worker);
$job_adaptor->release_undone_jobs_from_worker($worker);
}
}
} else {
......
......@@ -484,8 +484,7 @@ sub batch_size {
=cut
sub run
{
sub run {
my $self = shift;
my $specific_job = $self->_specific_job;
......@@ -503,6 +502,8 @@ sub run
$self->db->dbc->disconnect_when_inactive(0);
my $max_retry_count = $self->analysis->stats->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
do { # Worker's lifespan loop (ends only when the worker dies)
my $batches_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
$self->{'fetching_stopwatch'} = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
......@@ -548,17 +549,14 @@ sub run
# If the job specifically said what to do next, respect that last wish.
# Otherwise follow the default behaviour set by the beekeeper in $worker:
#
my $attempt_to_retry_this_job = defined($job->transient_error) ? $job->transient_error : $self->retry_throwing_jobs;
if($attempt_to_retry_this_job) {
$job->adaptor->reset_dead_job_by_dbID($job->dbID);
} else {
$job->update_status('FAILED');
}
my $may_retry = defined($job->transient_error) ? $job->transient_error : $self->retry_throwing_jobs;
$job->adaptor->release_and_age_job( $job->dbID, $max_retry_count, $may_retry );
if( ($job->status eq 'COMPILATION') # if it failed to compile, there is no point in continuing as the code WILL be broken
if($self->status eq 'COMPILATION' # if it failed to compile, there is no point in continuing as the code WILL be broken
or $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
or $job->lethal_for_worker ) { # trust the job's expert knowledge
my $reason = ($job->status eq 'COMPILATION') ? 'compilation error'
my $reason = ($self->status eq 'COMPILATION') ? 'compilation error'
: $self->prev_job_error ? 'two failed jobs in a row'
: 'suggested by job itself';
warn "Job's error has contaminated the Worker ($reason), so the Worker will now die\n";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment