Commit 59e5c2e6 authored by Leo Gordon's avatar Leo Gordon
Browse files

moved special-job-reset and special-job-reclaim into the same call, removed...

moved special-job-reset and special-job-reclaim into the same call, removed the unnecessary fetch in between
parent 3892e180
...@@ -501,12 +501,12 @@ sub grab_jobs_for_worker { ...@@ -501,12 +501,12 @@ sub grab_jobs_for_worker {
my $any_selection_sql = $selection_start_sql . " LIMIT $how_many_this_batch"; my $any_selection_sql = $selection_start_sql . " LIMIT $how_many_this_batch";
if($self->dbc->driver eq 'sqlite') { if($self->dbc->driver eq 'sqlite') {
# we have to be explicitly numereic here because of '0E0' value returned by DBI if "no rows have been affected": # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
if( (my $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $virgin_selection_sql) AND status='READY'" )) == 0 ) { if( (my $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $virgin_selection_sql) AND status='READY'" )) == 0 ) {
$claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $any_selection_sql) AND status='READY'" ); $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $any_selection_sql) AND status='READY'" );
} }
} else { } else {
# we have to be explicitly numereic here because of '0E0' value returned by DBI if "no rows have been affected": # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
if( (my $claim_count = $self->dbc->do( $update_sql . $virgin_selection_sql )) == 0 ) { if( (my $claim_count = $self->dbc->do( $update_sql . $virgin_selection_sql )) == 0 ) {
$claim_count = $self->dbc->do( $update_sql . $any_selection_sql ); $claim_count = $self->dbc->do( $update_sql . $any_selection_sql );
} }
...@@ -517,11 +517,35 @@ sub grab_jobs_for_worker { ...@@ -517,11 +517,35 @@ sub grab_jobs_for_worker {
} }
sub reclaim_job_for_worker { =head2 reset_job_by_dbID
my ($self, $worker, $job) = @_;
my $worker_id = $worker->dbID(); Arg [1] : int $job_id
my $job_id = $job->dbID; Example :
Description: Forces a job to be reset to 'READY' so it can be run again.
FIXME: Will also reset a previously 'SEMAPHORED' jobs to READY.
The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
but will not change retry_count if a job has never *really* started.
Exceptions : $job_id must not be false or zero
Caller : user process
=cut
sub reset_job_by_dbID {
my $self = shift;
my $job_id = shift;
# Note: the order of the fields being updated is critical!
$self->dbc->do( qq{
UPDATE job
SET retry_count = (CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END)
, status='READY'
WHERE job_id=$job_id
} );
}
sub reclaim_job_for_worker {
my ($self, $job_id, $worker_id) = @_;
my $sql = "UPDATE job SET status='CLAIMED', worker_id=? WHERE job_id=? AND status='READY'"; my $sql = "UPDATE job SET status='CLAIMED', worker_id=? WHERE job_id=? AND status='READY'";
...@@ -652,33 +676,6 @@ sub gc_dataflow { ...@@ -652,33 +676,6 @@ sub gc_dataflow {
} }
=head2 reset_job_by_dbID
Arg [1] : int $job_id
Example :
Description: Forces a job to be reset to 'READY' so it can be run again.
Will also reset a previously 'SEMAPHORED' jobs to READY (FIXME?).
The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
but will not change retry_count if a job has never *really* started.
Exceptions : $job_id must not be false or zero
Caller : user process
=cut
sub reset_job_by_dbID {
my $self = shift;
my $job_id = shift or throw("job_id of the job to be reset is undefined");
# Note: the order of the fields being updated is critical!
$self->dbc->do( qq{
UPDATE job
SET retry_count = (CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END)
, status='READY'
WHERE job_id=$job_id
} );
}
=head2 reset_jobs_for_analysis_id =head2 reset_jobs_for_analysis_id
Arg [1] : int $analysis_id Arg [1] : int $analysis_id
......
...@@ -171,8 +171,7 @@ sub create_new_worker { ...@@ -171,8 +171,7 @@ sub create_new_worker {
Description: If analysis_id or logic_name is specified it will try to specialize the Worker into this analysis. Description: If analysis_id or logic_name is specified it will try to specialize the Worker into this analysis.
If not specified the Queen will analyze the hive and pick the most suitable analysis. If not specified the Queen will analyze the hive and pick the most suitable analysis.
Returntype : Bio::EnsEMBL::Hive::Worker Caller : Bio::EnsEMBL::Hive::Worker
Caller : runWorker.pl
=cut =cut
...@@ -186,7 +185,7 @@ sub specialize_new_worker { ...@@ -186,7 +185,7 @@ sub specialize_new_worker {
die "At most one of the options {-analysis_id, -logic_name, -job_id} can be set to pre-specialize a Worker"; die "At most one of the options {-analysis_id, -logic_name, -job_id} can be set to pre-specialize a Worker";
} }
my ($job, $analysis, $stats); my ($analysis, $stats, $special_batch);
my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor; my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
if($job_id or $analysis_id or $logic_name) { # probably pre-specialized from command-line if($job_id or $analysis_id or $logic_name) { # probably pre-specialized from command-line
...@@ -197,10 +196,14 @@ sub specialize_new_worker { ...@@ -197,10 +196,14 @@ sub specialize_new_worker {
my $job_adaptor = $self->db->get_AnalysisJobAdaptor; my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
$job_adaptor->reset_job_by_dbID($job_id); $job_adaptor->reset_job_by_dbID($job_id);
$job = $job_adaptor->fetch_by_dbID($job_id) my $worker_id = $worker->dbID;
or die "job with dbID='$job_id' could not be fetched from the database"; $special_batch = $job_adaptor->reclaim_job_for_worker($job_id, $worker_id);
$analysis_id = $job->analysis_id; if(my $job = $special_batch->[0]) {
$analysis_id = $job->analysis_id;
} else {
die "Could not reclaim job with dbID='$job_id' for worker with dbID='$worker_id'";
}
} }
if($logic_name) { if($logic_name) {
...@@ -222,7 +225,7 @@ sub specialize_new_worker { ...@@ -222,7 +225,7 @@ sub specialize_new_worker {
$stats = $analysis_stats_adaptor->fetch_by_analysis_id($analysis_id); $stats = $analysis_stats_adaptor->fetch_by_analysis_id($analysis_id);
$self->safe_synchronize_AnalysisStats($stats); $self->safe_synchronize_AnalysisStats($stats);
unless($job or $force) { # do we really need to run this analysis? unless($special_batch or $force) { # do we really need to run this analysis?
if($self->get_hive_current_load() >= 1.1) { if($self->get_hive_current_load() >= 1.1) {
die "Hive is overloaded, can't specialize a worker"; die "Hive is overloaded, can't specialize a worker";
} }
...@@ -255,8 +258,8 @@ sub specialize_new_worker { ...@@ -255,8 +258,8 @@ sub specialize_new_worker {
$sth_update_analysis_id->execute($worker->analysis_id, $worker->dbID); $sth_update_analysis_id->execute($worker->analysis_id, $worker->dbID);
$sth_update_analysis_id->finish; $sth_update_analysis_id->finish;
if($job) { if($special_batch) {
$worker->_specific_job($job); $worker->special_batch( $special_batch );
} else { # count it as autonomous worker sharing the load of that analysis: } else { # count it as autonomous worker sharing the load of that analysis:
$stats->update_status('WORKING'); $stats->update_status('WORKING');
...@@ -273,8 +276,6 @@ sub specialize_new_worker { ...@@ -273,8 +276,6 @@ sub specialize_new_worker {
unless( $self->db->hive_use_triggers() ) { unless( $self->db->hive_use_triggers() ) {
$analysis_stats_adaptor->increase_running_workers($worker->analysis_id); $analysis_stats_adaptor->increase_running_workers($worker->analysis_id);
} }
return $worker;
} }
......
...@@ -245,6 +245,12 @@ sub execute_writes { ...@@ -245,6 +245,12 @@ sub execute_writes {
} }
sub special_batch {
my $self = shift;
$self->{'_special_batch'} = shift if(@_);
return $self->{'_special_batch'};
}
=head2 analysis =head2 analysis
Arg [1] : (optional) Bio::EnsEMBL::Hive::Analysis $value Arg [1] : (optional) Bio::EnsEMBL::Hive::Analysis $value
...@@ -534,8 +540,8 @@ sub run { ...@@ -534,8 +540,8 @@ sub run {
my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
$self->{'_interval_partial_timing'} = {}; $self->{'_interval_partial_timing'} = {};
if( my $specific_job = $self->_specific_job() ) { if( my $special_batch = $self->special_batch() ) {
$jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->reclaim_job_for_worker($self, $specific_job) ); $jobs_done_by_batches_loop += $self->run_one_batch( $special_batch );
$self->cause_of_death('JOB_LIMIT'); $self->cause_of_death('JOB_LIMIT');
} else { # a proper "BATCHES" loop } else { # a proper "BATCHES" loop
...@@ -779,11 +785,4 @@ sub stop_job_output_redirection { ...@@ -779,11 +785,4 @@ sub stop_job_output_redirection {
} }
sub _specific_job {
my $self = shift;
$self->{'_specific_job'} = shift if(@_);
return $self->{'_specific_job'};
}
1; 1;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment