diff --git a/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm b/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm index 407c90e342f06d1e0f81c6f727b3ab9c5a340025..8ca1dffdca57e5d271686676d1d49b8b6bfcd03f 100644 --- a/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm +++ b/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm @@ -303,7 +303,8 @@ sub dataflow_output_id { my $param_id_stack = $self->param_id_stack(); my $accu_id_stack = $self->accu_id_stack(); - my $hive_use_param_stack = $self->adaptor() && $self->adaptor->db->hive_use_param_stack(); + my $job_adaptor = $self->adaptor(); + my $hive_use_param_stack = $job_adaptor && $job_adaptor->db->hive_use_param_stack(); if($hive_use_param_stack) { if($input_id and ($input_id ne '{}')) { # add the parent to the param_id_stack if it had non-trivial extra parameters @@ -347,80 +348,63 @@ sub dataflow_output_id { $target_analysis_or_table->dataflow( $output_ids_for_this_rule, $self ); - } else { - - if(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) { # members of a semaphored fan will have to wait in cache until the funnel is created: + } elsif(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) { # members of a semaphored fan will have to wait in cache until the funnel is created: my $fan_cache_this_branch = $self->fan_cache()->{$funnel_dataflow_rule_id} ||= []; - push @$fan_cache_this_branch, map { [$_, $target_analysis_or_table] } @$output_ids_for_this_rule; + push @$fan_cache_this_branch, map { Bio::EnsEMBL::Hive::AnalysisJob->new( + -prev_job_id => $self->dbID, + -analysis_id => $target_analysis_or_table->dbID, # expecting an Analysis + -input_id => $_, + -param_id_stack => $param_id_stack, + -accu_id_stack => $accu_id_stack, + # -semaphored_job_id => to be set when the $funnel_job has been stored + ) } @$output_ids_for_this_rule; - } else { # either a semaphored funnel or a non-semaphored dataflow: + } else { # either a semaphored funnel or a non-semaphored dataflow: - my $fan_cache = delete $self->fan_cache()->{$rule->dbID}; # clear the cache at the same time + my $fan_jobs = delete $self->fan_cache()->{$rule->dbID}; # clear the cache at the same time + + if($fan_jobs && @$fan_jobs) { # a semaphored funnel - if($fan_cache && @$fan_cache) { # a semaphored funnel - my $funnel_job_id; if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) { $self->transient_error(0); die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1"; - } elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( # if a semaphored funnel job creation succeeded, ... + } else { + my $funnel_job = Bio::EnsEMBL::Hive::AnalysisJob->new( + -prev_job_id => $self->dbID, + -analysis_id => $target_analysis_or_table->dbID, # expecting an Analysis -input_id => $output_ids_for_this_rule->[0], -param_id_stack => $param_id_stack, -accu_id_stack => $accu_id_stack, - -analysis => $target_analysis_or_table, - -prev_job => $self, - -semaphore_count => scalar(@$fan_cache), # "pre-increase" the semaphore count before creating the dependent jobs + -semaphore_count => scalar(@$fan_jobs), # "pre-increase" the semaphore count before creating the dependent jobs -semaphored_job_id => $self->semaphored_job_id(), # propagate parent's semaphore if any - )) { # ... then create the fan out of the cache: - push @output_job_ids, $funnel_job_id; - - my $failed_to_create = 0; - - foreach my $pair ( @$fan_cache ) { - my ($output_id, $fan_analysis) = @$pair; - if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( - -input_id => $output_id, - -param_id_stack => $param_id_stack, - -accu_id_stack => $accu_id_stack, - -analysis => $fan_analysis, - -prev_job => $self, - -semaphored_job_id => $funnel_job_id, # by passing this parameter we request not to propagate semaphores - -push_new_semaphore => 1, # inform the adaptor that semaphore count doesn't need up-adjustment - )) { - push @output_job_ids, $job_id; - } else { # count all dependent jobs that failed to create - $failed_to_create++; - } - } - if($failed_to_create) { # adjust semaphore_count for jobs that failed to create: - $self->adaptor->decrease_semaphore_count_for_jobid( $funnel_job_id, $failed_to_create ); - } - } else { # assume the whole semaphored group of jobs has been created already - } + ); - } else { # non-semaphored dataflow (but potentially propagating any existing semaphores) + my ($funnel_job_id) = @{ $job_adaptor->store_jobs_and_adjust_counters( [ $funnel_job ], 0) }; + if($funnel_job_id) { # if a semaphored funnel job creation succeeded, then store the fan out of the cache: - foreach my $output_id ( @$output_ids_for_this_rule ) { - - if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( - -input_id => $output_id, - -param_id_stack => $param_id_stack, - -accu_id_stack => $accu_id_stack, - -analysis => $target_analysis_or_table, - -prev_job => $self, - -semaphored_job_id => $self->semaphored_job_id(), # propagate parent's semaphore if any - )) { - # only add the ones that were indeed created: - push @output_job_ids, $job_id; + foreach my $fan_job (@$fan_jobs) { # set the funnel in every fan's job: + $fan_job->semaphored_job_id( $funnel_job_id ); + } + push @output_job_ids, $funnel_job_id, @{ $job_adaptor->store_jobs_and_adjust_counters( $fan_jobs, 1) }; } } + } else { # non-semaphored dataflow (but potentially propagating any existing semaphores) + my @non_semaphored_jobs = map { Bio::EnsEMBL::Hive::AnalysisJob->new( + -prev_job_id => $self->dbID, + -analysis_id => $target_analysis_or_table->dbID, # expecting an Analysis + -input_id => $_, + -param_id_stack => $param_id_stack, + -accu_id_stack => $accu_id_stack, + -semaphored_job_id => $self->semaphored_job_id(), # propagate parent's semaphore if any + ) } @$output_ids_for_this_rule; + + push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0) }; } - - } - } - } + } # /if + } # /foreach my $rule return \@output_job_ids; } diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm index 444a9fb50dd82a3af9060cb07607f55d820384d4..cb79ff0838b0e7b3effb61427da2ec49a913c932 100644 --- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm +++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm @@ -125,7 +125,9 @@ sub CreateNewJob { -semaphored_job_id => $semaphored_job_id, ); - return $analysis->adaptor->db->get_AnalysisJobAdaptor->store_job_and_adjust_counters( $job, $push_new_semaphore ); + my ($job_id) = @{ $analysis->adaptor->db->get_AnalysisJobAdaptor->store_jobs_and_adjust_counters( [ $job ], $push_new_semaphore ) }; + + return $job_id; } @@ -136,41 +138,58 @@ sub CreateNewJob { ############################################################################### -sub store_job_and_adjust_counters { - my ($self, $job, $push_new_semaphore) = @_; +sub store_jobs_and_adjust_counters { + my ($self, $jobs, $push_new_semaphore) = @_; + + my $dbc = $self->dbc; - my $semaphored_job_id = $job->semaphored_job_id(); + # assume all jobs from the same storing batch share the same semaphored_job_id: + my $semaphored_job_id = scalar(@$jobs) && $jobs->[0]->semaphored_job_id(); my $need_to_increase_semaphore_count = ($semaphored_job_id && !$push_new_semaphore); - my $dbc = $self->dbc; - # avoid deadlocks when dataflowing under transactional mode (used in Ortheus Runnable for example): - $dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($need_to_increase_semaphore_count and ($dbc->driver ne 'sqlite')); + my @output_job_ids = (); + my $failed_to_store = 0; - my ($job, $stored_this_time) = $self->store( $job, 0 ); + foreach my $job (@$jobs) { + # avoid deadlocks when dataflowing under transactional mode (used in Ortheus Runnable for example): + $dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($need_to_increase_semaphore_count and ($dbc->driver ne 'sqlite')); - if($stored_this_time) { - if($need_to_increase_semaphore_count) { # if we are not creating a new semaphore (where dependent jobs have already been counted), - # but rather propagating an existing one (same or other level), we have to up-adjust the counter - $self->increase_semaphore_count_for_jobid( $semaphored_job_id ); - } + my ($job, $stored_this_time) = $self->store( $job, 0 ); + + if($stored_this_time) { + if($need_to_increase_semaphore_count) { # if we are not creating a new semaphore (where dependent jobs have already been counted), + # but rather propagating an existing one (same or other level), we have to up-adjust the counter + $self->increase_semaphore_count_for_jobid( $semaphored_job_id ); + } - unless($self->db->hive_use_triggers()) { - $dbc->do(qq{ - UPDATE analysis_stats - SET total_job_count=total_job_count+1 - } - .(($job->status eq 'READY') - ? " ,ready_job_count=ready_job_count+1 " - : " ,semaphored_job_count=semaphored_job_count+1 " - ).(($dbc->driver eq 'pgsql') - ? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) " - : " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END " - )." WHERE analysis_id=".$job->analysis_id - ); + unless($self->db->hive_use_triggers()) { + $dbc->do(qq{ + UPDATE analysis_stats + SET total_job_count=total_job_count+1 + } + .(($job->status eq 'READY') + ? " ,ready_job_count=ready_job_count+1 " + : " ,semaphored_job_count=semaphored_job_count+1 " + ).(($dbc->driver eq 'pgsql') + ? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) " + : " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END " + )." WHERE analysis_id=".$job->analysis_id + ); + } + + push @output_job_ids, $job->dbID(); + + } else { + $failed_to_store++; } + } - return $job->dbID(); + # adjust semaphore_count for jobs that failed to be stored (but have been pre-counted during funnel's creation): + if($push_new_semaphore and $failed_to_store) { + $self->decrease_semaphore_count_for_jobid( $semaphored_job_id, $failed_to_store ); } + + return \@output_job_ids; } diff --git a/modules/Bio/EnsEMBL/Hive/PipeConfig/HiveGeneric_conf.pm b/modules/Bio/EnsEMBL/Hive/PipeConfig/HiveGeneric_conf.pm index 1995f76d2c299ae551f9352e1e9220e434346b0d..196adc3721f067f99eb674db0e9bf422917e7d95 100644 --- a/modules/Bio/EnsEMBL/Hive/PipeConfig/HiveGeneric_conf.pm +++ b/modules/Bio/EnsEMBL/Hive/PipeConfig/HiveGeneric_conf.pm @@ -68,6 +68,7 @@ use Bio::EnsEMBL::Hive::DBSQL::SqlSchemaAdaptor; use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor; use Bio::EnsEMBL::Hive::Analysis; use Bio::EnsEMBL::Hive::AnalysisStats; +use Bio::EnsEMBL::Hive::AnalysisJob; use Bio::EnsEMBL::Hive::Extensions; use Bio::EnsEMBL::Hive::Valley; @@ -523,8 +524,9 @@ sub run { warn "Done.\n\n"; } - my $analysis_adaptor = $hive_dba->get_AnalysisAdaptor; - my $analysis_stats_adaptor = $hive_dba->get_AnalysisStatsAdaptor; + my $analysis_adaptor = $hive_dba->get_AnalysisAdaptor; + my $analysis_stats_adaptor = $hive_dba->get_AnalysisStatsAdaptor; + my $job_adaptor = $hive_dba->get_AnalysisJobAdaptor; my $valley = Bio::EnsEMBL::Hive::Valley->new( {}, 'LOCAL' ); @@ -611,13 +613,14 @@ sub run { } # now create the corresponding jobs (if there are any): - foreach my $input_id_hash (@{$input_ids || []}) { - - Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( - -input_id => $input_id_hash, # input_ids are now centrally stringified in the AnalysisJobAdaptor - -analysis => $analysis, - -prev_job_id => undef, # these jobs are created by the initialization script, not by another job - ); + if($input_ids) { + my @jobs = map { Bio::EnsEMBL::Hive::AnalysisJob->new( + -prev_job_id => undef, # these jobs are created by the initialization script, not by another job + -analysis_id => $analysis->dbID, + -input_id => $_, # input_ids are now centrally stringified in the AnalysisJob itself + ) } @$input_ids; + + $job_adaptor->store_jobs_and_adjust_counters( \@jobs ); } } diff --git a/scripts/seed_pipeline.pl b/scripts/seed_pipeline.pl index 70dc5f50325d62c4c33121d463eebb3ed47877f6..108a1ea9b26b5c9c9cdfdb4e368ef728f14bb24e 100755 --- a/scripts/seed_pipeline.pl +++ b/scripts/seed_pipeline.pl @@ -13,6 +13,7 @@ BEGIN { use Getopt::Long; +use Bio::EnsEMBL::Hive::AnalysisJob; use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor; use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor; use Bio::EnsEMBL::Hive::Utils ('destringify', 'stringify', 'script_usage'); @@ -89,22 +90,21 @@ sub main { warn "Since -input_id has not been set, assuming input_id='$input_id'\n"; } - # Make sure all job creations undergo re-stringification - # to avoid alternative "spellings" of the same input_id hash: - $input_id = stringify( destringify( $input_id ) ); + my $job = Bio::EnsEMBL::Hive::AnalysisJob->new( + -prev_job_id => undef, # this job has been created by the initialization script, not by another job + -analysis_id => $analysis->dbID, + -input_id => destringify( $input_id ), # Make sure all job creations undergo re-stringification to avoid alternative "spellings" of the same input_id hash + ); + + my ($job_id) = @{ $hive_dba->get_AnalysisJobAdaptor->store_jobs_and_adjust_counters( [ $job ] ) }; - if( my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( - -analysis => $analysis, - -input_id => $input_id, - -prev_job_id => undef, # this job has been created by the initialization script, not by another job - ) ) { + if($job_id) { print "Job $job_id [ ".$analysis->logic_name.'('.$analysis->dbID.")] : '$input_id'\n"; } else { warn "Could not create job '$input_id' (it may have been created already)\n"; - } }