Skip to content
Snippets Groups Projects
Commit 6f8eb067 authored by Jessica Severin's avatar Jessica Severin
Browse files

added methods related to fetching 'overdue' workers (potential lost/dead

workers) and added methods for reseting jobs for a lost worker.
$queen->register_worker_death($worker) without a defined cause_of_death
will register worker as a FATALITY and reset its unfinished jobs.
parent 689bce79
No related branches found
No related tags found
No related merge requests found
......@@ -105,7 +105,7 @@ sub status {
if(defined $value) {
$self->{'_status'} = $value;
$self->adaptor->update($self) if($self->adaptor);
$self->adaptor->update_status($self->dbID, $value) if($self->adaptor);
}
return $self->{'_status'};
}
......
......@@ -363,6 +363,38 @@ sub claim_jobs_for_worker {
return $claim;
}
sub reset_dead_jobs_for_worker {
my $self = shift;
my $worker = shift;
throw("must define worker") unless($worker);
# an update with select on status and hive_id took 4seconds per worker to complete,
# while doing a select followed by update on analysis_job_id returned almost instantly
my $sql = "select analysis_job_id from analysis_job ".
" WHERE status in ('CLAIMED','GET_INPUT','RUN','WRITE_OUTPUT')".
" AND hive_id='" . $worker->hive_id ."'";
#print("$sql\n");
my $sth = $self->prepare($sql);
$sth->execute();
my @jobIDS;
while (my ($job_id)=$sth->fetchrow_array()) { push @jobIDS, $job_id; }
$sth->finish;
#print("reset ", scalar(@jobIDS), " jobs\n");
foreach my $job_id (@jobIDS) {
my $sql = "UPDATE analysis_job SET job_claim='', hive_id=0, status='READY'".
" retry_count=retry_count+1".
" WHERE analysis_job_id=$job_id";
my $sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
}
}
1;
......@@ -283,6 +283,19 @@ sub update {
}
sub update_status
{
my ($self, $analysis_id, $status) = @_;
my $sql = "UPDATE analysis_stats SET status='$status' ";
$sql .= " WHERE analysis_id='$analysis_id' ";
my $sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
}
sub _create_new_for_analysis_id {
my ($self, $analysis_id) = @_;
......
......@@ -105,6 +105,10 @@ sub register_worker_death {
my ($self, $worker) = @_;
return unless($worker);
# if called without a defined cause_of_death, assume catastrophic failure
$worker->cause_of_death('FATALITY') unless(defined($worker->cause_of_death));
my $sql = "UPDATE hive SET died=now(), last_check_in=now()";
$sql .= " ,work_done='" . $worker->work_done . "'";
$sql .= " ,cause_of_death='". $worker->cause_of_death ."'";
......@@ -115,12 +119,16 @@ sub register_worker_death {
$sth->finish;
if($worker->cause_of_death eq "NO_WORK") {
$worker->analysis_stats->status("ALL_CLAIMED");
$self->db->get_AnalysisStatsAdaptor->update_status($worker->analysis->dbID, "ALL_CLAIMED");
}
if($worker->cause_of_death eq "FATALITY") {
#print("FATAL DEATH Arrrrgggghhhhhhhh (hive_id=",$worker->hive_id,")\n");
$self->db->get_AnalysisJobAdaptor->reset_dead_jobs_for_worker($worker);
}
}
sub check_in {
sub worker_check_in {
my ($self, $worker) = @_;
return unless($worker);
......@@ -133,6 +141,16 @@ sub check_in {
}
sub fetch_overdue_workers {
my ($self,$overdue_secs) = @_;
$overdue_secs = 3600 unless(defined($overdue_secs));
my $constraint = "h.cause_of_death='' ".
"AND (UNIX_TIMESTAMP()-UNIX_TIMESTAMP(h.last_check_in))>$overdue_secs";
return $self->_generic_fetch($constraint);
}
#
# INTERNAL METHODS
......@@ -282,7 +300,6 @@ sub _objs_from_sth {
if($column{'analysis_id'} and $self->db->get_AnalysisAdaptor) {
$worker->analysis($self->db->get_AnalysisAdaptor->fetch_by_dbID($column{'analysis_id'}));
$worker->analysis_stats($self->db->get_AnalysisStatsAdaptor->fetch_by_dbID($column{'analysis_id'}));
}
push @workers, $worker;
......
......@@ -66,6 +66,8 @@ use Bio::EnsEMBL::Analysis;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Pipeline::RunnableDB;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
use Bio::EnsEMBL::Hive::Extensions;
use vars qw(@ISA);
......@@ -112,19 +114,6 @@ sub analysis {
return $self->{'_analysis'};
}
sub analysis_stats {
my $self = shift;
my $analysisStats = shift;
if(defined($analysisStats)) {
$self->throw("arg must be a [Bio::EnsEMBL::Hive::AnalysisStats] not a [$analysisStats]")
unless($analysisStats->isa('Bio::EnsEMBL::Hive::AnalysisStats'));
$self->{'_analysis_stats'} = $analysisStats;
}
return $self->{'_analysis_stats'};
}
=head2 life_span
Arg [1] : (optional) integer $value (in seconds)
......@@ -292,7 +281,7 @@ sub run
my $claim = $jobDBA->claim_jobs_for_worker($self);
my $jobs = $jobDBA->fetch_by_claim_analysis($claim, $self->analysis->dbID);
$self->adaptor->check_in($self);
$self->adaptor->worker_check_in($self);
$self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment