Commit 57398320 authored by Leo Gordon's avatar Leo Gordon
Browse files

merge reset_and_grab into one subroutine; pre-increment dependent semaphore if...

merge reset_and_grab into one subroutine; pre-increment dependent semaphore if re-running a DONE job; use -force flag for force-running an individual job
parent 4dba7d72
...@@ -473,6 +473,47 @@ sub store_out_files { ...@@ -473,6 +473,47 @@ sub store_out_files {
} }
=head2 reset_or_grab_job_by_dbID
Arg [1] : int $job_id
Arg [2] : int $worker_id (optional)
Description: resets a job to to 'READY' (if no $worker_id given) or directly to 'CLAIMED' so it can be run again, and fetches it..
NB: Will also reset a previously 'SEMAPHORED' job to READY.
The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
but will not change retry_count if a job has never *really* started.
Returntype : Bio::EnsEMBL::Hive::AnalysisJob or undef
=cut
sub reset_or_grab_job_by_dbID {
my $self = shift;
my $job_id = shift;
my $worker_id = shift;
my $new_status = ($worker_id?'CLAIMED':'READY');
# Note: the order of the fields being updated is critical!
my $sql = qq{
UPDATE job
SET retry_count = (CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END)
, status=?
, worker_id=?
WHERE job_id=?
};
my @values = ($new_status, $worker_id, $job_id);
my $sth = $self->prepare( $sql );
my $return_code = $sth->execute( @values )
or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
$sth->finish;
my $constraint = "j.job_id='$job_id' AND j.status='$new_status'";
my ($job) = @{ $self->_generic_fetch($constraint) };
return $job;
}
=head2 grab_jobs_for_worker =head2 grab_jobs_for_worker
Arg [1] : Bio::EnsEMBL::Hive::Worker object $worker Arg [1] : Bio::EnsEMBL::Hive::Worker object $worker
...@@ -495,7 +536,7 @@ sub grab_jobs_for_worker { ...@@ -495,7 +536,7 @@ sub grab_jobs_for_worker {
my $worker_id = $worker->dbID(); my $worker_id = $worker->dbID();
my $update_sql = "UPDATE job SET worker_id='$worker_id', status='CLAIMED'"; my $update_sql = "UPDATE job SET worker_id='$worker_id', status='CLAIMED'";
my $selection_start_sql = " WHERE analysis_id='$analysis_id' AND status='READY' AND semaphore_count<=0"; my $selection_start_sql = " WHERE analysis_id='$analysis_id' AND status='READY'";
my $virgin_selection_sql = $selection_start_sql . " AND retry_count=0 LIMIT $how_many_this_batch"; my $virgin_selection_sql = $selection_start_sql . " AND retry_count=0 LIMIT $how_many_this_batch";
my $any_selection_sql = $selection_start_sql . " LIMIT $how_many_this_batch"; my $any_selection_sql = $selection_start_sql . " LIMIT $how_many_this_batch";
...@@ -512,48 +553,8 @@ sub grab_jobs_for_worker { ...@@ -512,48 +553,8 @@ sub grab_jobs_for_worker {
} }
} }
my $constraint = "j.analysis_id='$analysis_id' AND j.worker_id='$worker_id' AND j.status='CLAIMED'"; # my $constraint = "j.analysis_id='$analysis_id' AND j.worker_id='$worker_id' AND j.status='CLAIMED'";
return $self->_generic_fetch($constraint); my $constraint = "j.worker_id='$worker_id' AND j.status='CLAIMED'";
}
=head2 reset_job_by_dbID
Arg [1] : int $job_id
Example :
Description: Forces a job to be reset to 'READY' so it can be run again.
FIXME: Will also reset a previously 'SEMAPHORED' jobs to READY.
The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
but will not change retry_count if a job has never *really* started.
Exceptions : $job_id must not be false or zero
Caller : user process
=cut
sub reset_job_by_dbID {
my $self = shift;
my $job_id = shift;
# Note: the order of the fields being updated is critical!
$self->dbc->do( qq{
UPDATE job
SET retry_count = (CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END)
, status='READY'
WHERE job_id=$job_id
} );
}
sub reclaim_job_for_worker {
my ($self, $job_id, $worker_id) = @_;
my $sql = "UPDATE job SET status='CLAIMED', worker_id=? WHERE job_id=? AND status='READY'";
my $sth = $self->prepare($sql);
$sth->execute($worker_id, $job_id);
$sth->finish;
my $constraint = "j.job_id='$job_id' AND j.worker_id='$worker_id' AND j.status='CLAIMED'";
return $self->_generic_fetch($constraint); return $self->_generic_fetch($constraint);
} }
...@@ -587,8 +588,8 @@ sub release_undone_jobs_from_worker { ...@@ -587,8 +588,8 @@ sub release_undone_jobs_from_worker {
$self->dbc->do( qq{ $self->dbc->do( qq{
UPDATE job UPDATE job
SET status='READY', worker_id=NULL SET status='READY', worker_id=NULL
WHERE status='CLAIMED' WHERE worker_id='$worker_id'
AND worker_id='$worker_id' AND status='CLAIMED'
} ); } );
my $sth = $self->prepare( qq{ my $sth = $self->prepare( qq{
......
...@@ -194,15 +194,28 @@ sub specialize_new_worker { ...@@ -194,15 +194,28 @@ sub specialize_new_worker {
print "resetting and fetching job for job_id '$job_id'\n"; print "resetting and fetching job for job_id '$job_id'\n";
my $job_adaptor = $self->db->get_AnalysisJobAdaptor; my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
$job_adaptor->reset_job_by_dbID($job_id);
my $worker_id = $worker->dbID; my $job = $job_adaptor->fetch_by_dbID( $job_id )
$special_batch = $job_adaptor->reclaim_job_for_worker($job_id, $worker_id); or die "Could not fetch job with dbID='$job_id'";
my $job_status = $job->status();
if($job_status =~/(CLAIMED|PRE_CLEANUP|FETCH_INPUT|RUN|WRITE_OUTPUT|POST_CLEANUP)/ ) {
die "Job with dbID='$job_id' is already in progress, cannot run"; # FIXME: try GC first, then complain
} elsif($job_status =~/(DONE|SEMAPHORED)/ and !$force) {
die "Job with dbID='$job_id' is $job_status, please use -force 1 to override";
}
if(my $job = $special_batch->[0]) { if(($job_status eq 'DONE') and $job->semaphored_job_id) {
warn "Increasing the semaphore count of the dependent job";
$job_adaptor->increase_semaphore_count_for_jobid( $job->semaphored_job_id );
}
my $worker_id = $worker->dbID;
if($job = $job_adaptor->reset_or_grab_job_by_dbID($job_id, $worker_id)) {
$special_batch = [ $job ];
$analysis_id = $job->analysis_id; $analysis_id = $job->analysis_id;
} else { } else {
die "Could not reclaim job with dbID='$job_id' for worker with dbID='$worker_id'"; die "Could not claim job with dbID='$job_id' for worker with dbID='$worker_id'";
} }
} }
...@@ -429,9 +442,8 @@ sub reset_job_by_dbID_and_sync { ...@@ -429,9 +442,8 @@ sub reset_job_by_dbID_and_sync {
my ($self, $job_id) = @_; my ($self, $job_id) = @_;
my $job_adaptor = $self->db->get_AnalysisJobAdaptor; my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
$job_adaptor->reset_job_by_dbID($job_id); my $job = $job_adaptor->reset_or_grab_job_by_dbID($job_id);
my $job = $job_adaptor->fetch_by_dbID($job_id);
my $stats = $self->db->get_AnalysisStatsAdaptor->fetch_by_analysis_id($job->analysis_id); my $stats = $self->db->get_AnalysisStatsAdaptor->fetch_by_analysis_id($job->analysis_id);
$self->synchronize_AnalysisStats($stats); $self->synchronize_AnalysisStats($stats);
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment