diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm index b7659575f840778b03970ebf041bc0c1809bbe16..6b3e2906e7726e7134c4a9307e13e2aef35d18be 100644 --- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm +++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm @@ -84,36 +84,42 @@ sub default_overflow_limit { sub store_jobs_and_adjust_counters { my ($self, $jobs, $push_new_semaphore) = @_; - my $dbc = $self->dbc; - # NB: our use patterns assume all jobs from the same storing batch share the same semaphored_job_id: my $semaphored_job_id = scalar(@$jobs) && $jobs->[0]->semaphored_job_id(); my $need_to_increase_semaphore_count = ($semaphored_job_id && !$push_new_semaphore); - my @output_job_ids = (); - my $failed_to_store = 0; + my @output_job_ids = (); + my $failed_to_store_local_jobs = 0; foreach my $job (@$jobs) { + + my $job_adaptor = $job->analysis->adaptor->db->get_AnalysisJobAdaptor; + my $local_job = $job_adaptor == $self; + # avoid deadlocks when dataflowing under transactional mode (used in Ortheus Runnable for example): - $dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($need_to_increase_semaphore_count and ($dbc->driver ne 'sqlite')); + if($need_to_increase_semaphore_count and $local_job and ($job_adaptor->dbc->driver ne 'sqlite')) { + $job_adaptor->dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ); + } + + $job->prev_job( undef ) unless( $local_job ); # break the link with the previous job if dataflowing across databases (current schema doesn't support URLs for job_ids) - my ($job, $stored_this_time) = $self->store( $job ); + my ($job, $stored_this_time) = $job_adaptor->store( $job ); if($stored_this_time) { - if($need_to_increase_semaphore_count) { # if we are not creating a new semaphore (where dependent jobs have already been counted), - # but rather propagating an existing one (same or other level), we have to up-adjust the counter + if($need_to_increase_semaphore_count and $local_job) { # if we are not creating a new semaphore (where dependent jobs have already been counted), + # but rather propagating an existing one (same or other level), we have to up-adjust the counter $self->increase_semaphore_count_for_jobid( $semaphored_job_id ); } - unless($self->db->hive_use_triggers()) { - $dbc->do(qq{ + unless($job_adaptor->db->hive_use_triggers()) { + $job_adaptor->dbc->do(qq{ UPDATE analysis_stats SET total_job_count=total_job_count+1 } .(($job->status eq 'READY') ? " ,ready_job_count=ready_job_count+1 " : " ,semaphored_job_count=semaphored_job_count+1 " - ).(($dbc->driver eq 'pgsql') + ).(($job_adaptor->dbc->driver eq 'pgsql') ? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) " : " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END " )." WHERE analysis_id=".$job->analysis_id @@ -122,14 +128,14 @@ sub store_jobs_and_adjust_counters { push @output_job_ids, $job->dbID(); - } else { - $failed_to_store++; + } elsif( $local_job ) { + $failed_to_store_local_jobs++; } } # adjust semaphore_count for jobs that failed to be stored (but have been pre-counted during funnel's creation): - if($push_new_semaphore and $failed_to_store) { - $self->decrease_semaphore_count_for_jobid( $semaphored_job_id, $failed_to_store ); + if($push_new_semaphore and $failed_to_store_local_jobs) { + $self->decrease_semaphore_count_for_jobid( $semaphored_job_id, $failed_to_store_local_jobs ); } return \@output_job_ids; diff --git a/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMultWf_conf.pm b/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMultWf_conf.pm index 6e58083c6aedd228b99aed316d47a08ca4683132..10bd01a1e7343ca73b5ac2fb773a54a77d7981e7 100644 --- a/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMultWf_conf.pm +++ b/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMultWf_conf.pm @@ -145,6 +145,7 @@ sub pipeline_analyses { -flow_into => { 1 => [ ':////intermediate_result' ], }, + -can_be_empty => 1, }, { -logic_name => 'add_together',