Commit d6bf4791 authored by Leo Gordon's avatar Leo Gordon
Browse files

bugfix (long-standing) : make sure foreign jobs are created in the right database

parent 64420ecf
......@@ -84,36 +84,42 @@ sub default_overflow_limit {
sub store_jobs_and_adjust_counters {
my ($self, $jobs, $push_new_semaphore) = @_;
my $dbc = $self->dbc;
# NB: our use patterns assume all jobs from the same storing batch share the same semaphored_job_id:
my $semaphored_job_id = scalar(@$jobs) && $jobs->[0]->semaphored_job_id();
my $need_to_increase_semaphore_count = ($semaphored_job_id && !$push_new_semaphore);
my @output_job_ids = ();
my $failed_to_store = 0;
my @output_job_ids = ();
my $failed_to_store_local_jobs = 0;
foreach my $job (@$jobs) {
my $job_adaptor = $job->analysis->adaptor->db->get_AnalysisJobAdaptor;
my $local_job = $job_adaptor == $self;
# avoid deadlocks when dataflowing under transactional mode (used in Ortheus Runnable for example):
$dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($need_to_increase_semaphore_count and ($dbc->driver ne 'sqlite'));
if($need_to_increase_semaphore_count and $local_job and ($job_adaptor->dbc->driver ne 'sqlite')) {
$job_adaptor->dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" );
}
$job->prev_job( undef ) unless( $local_job ); # break the link with the previous job if dataflowing across databases (current schema doesn't support URLs for job_ids)
my ($job, $stored_this_time) = $self->store( $job );
my ($job, $stored_this_time) = $job_adaptor->store( $job );
if($stored_this_time) {
if($need_to_increase_semaphore_count) { # if we are not creating a new semaphore (where dependent jobs have already been counted),
# but rather propagating an existing one (same or other level), we have to up-adjust the counter
if($need_to_increase_semaphore_count and $local_job) { # if we are not creating a new semaphore (where dependent jobs have already been counted),
# but rather propagating an existing one (same or other level), we have to up-adjust the counter
$self->increase_semaphore_count_for_jobid( $semaphored_job_id );
}
unless($self->db->hive_use_triggers()) {
$dbc->do(qq{
unless($job_adaptor->db->hive_use_triggers()) {
$job_adaptor->dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
}
.(($job->status eq 'READY')
? " ,ready_job_count=ready_job_count+1 "
: " ,semaphored_job_count=semaphored_job_count+1 "
).(($dbc->driver eq 'pgsql')
).(($job_adaptor->dbc->driver eq 'pgsql')
? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
: " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
)." WHERE analysis_id=".$job->analysis_id
......@@ -122,14 +128,14 @@ sub store_jobs_and_adjust_counters {
push @output_job_ids, $job->dbID();
} else {
$failed_to_store++;
} elsif( $local_job ) {
$failed_to_store_local_jobs++;
}
}
# adjust semaphore_count for jobs that failed to be stored (but have been pre-counted during funnel's creation):
if($push_new_semaphore and $failed_to_store) {
$self->decrease_semaphore_count_for_jobid( $semaphored_job_id, $failed_to_store );
if($push_new_semaphore and $failed_to_store_local_jobs) {
$self->decrease_semaphore_count_for_jobid( $semaphored_job_id, $failed_to_store_local_jobs );
}
return \@output_job_ids;
......
......@@ -145,6 +145,7 @@ sub pipeline_analyses {
-flow_into => {
1 => [ ':////intermediate_result' ],
},
-can_be_empty => 1,
},
{ -logic_name => 'add_together',
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment