Commit 1b89a412 authored by Leo Gordon's avatar Leo Gordon
Browse files

store_jobs_and_adjust_counters() now also deals with semaphore_count...

store_jobs_and_adjust_counters() now also deals with semaphore_count corrections. It is now the preferred job-creation interface.
parent 6f8ef59b
......@@ -303,7 +303,8 @@ sub dataflow_output_id {
my $param_id_stack = $self->param_id_stack();
my $accu_id_stack = $self->accu_id_stack();
my $hive_use_param_stack = $self->adaptor() && $self->adaptor->db->hive_use_param_stack();
my $job_adaptor = $self->adaptor();
my $hive_use_param_stack = $job_adaptor && $job_adaptor->db->hive_use_param_stack();
if($hive_use_param_stack) {
if($input_id and ($input_id ne '{}')) { # add the parent to the param_id_stack if it had non-trivial extra parameters
......@@ -347,80 +348,63 @@ sub dataflow_output_id {
$target_analysis_or_table->dataflow( $output_ids_for_this_rule, $self );
} else {
if(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) { # members of a semaphored fan will have to wait in cache until the funnel is created:
} elsif(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) { # members of a semaphored fan will have to wait in cache until the funnel is created:
my $fan_cache_this_branch = $self->fan_cache()->{$funnel_dataflow_rule_id} ||= [];
push @$fan_cache_this_branch, map { [$_, $target_analysis_or_table] } @$output_ids_for_this_rule;
push @$fan_cache_this_branch, map { Bio::EnsEMBL::Hive::AnalysisJob->new(
-prev_job_id => $self->dbID,
-analysis_id => $target_analysis_or_table->dbID, # expecting an Analysis
-input_id => $_,
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
# -semaphored_job_id => to be set when the $funnel_job has been stored
) } @$output_ids_for_this_rule;
} else { # either a semaphored funnel or a non-semaphored dataflow:
} else { # either a semaphored funnel or a non-semaphored dataflow:
my $fan_cache = delete $self->fan_cache()->{$rule->dbID}; # clear the cache at the same time
my $fan_jobs = delete $self->fan_cache()->{$rule->dbID}; # clear the cache at the same time
if($fan_jobs && @$fan_jobs) { # a semaphored funnel
if($fan_cache && @$fan_cache) { # a semaphored funnel
my $funnel_job_id;
if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
$self->transient_error(0);
die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
} elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( # if a semaphored funnel job creation succeeded, ...
} else {
my $funnel_job = Bio::EnsEMBL::Hive::AnalysisJob->new(
-prev_job_id => $self->dbID,
-analysis_id => $target_analysis_or_table->dbID, # expecting an Analysis
-input_id => $output_ids_for_this_rule->[0],
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
-analysis => $target_analysis_or_table,
-prev_job => $self,
-semaphore_count => scalar(@$fan_cache), # "pre-increase" the semaphore count before creating the dependent jobs
-semaphore_count => scalar(@$fan_jobs), # "pre-increase" the semaphore count before creating the dependent jobs
-semaphored_job_id => $self->semaphored_job_id(), # propagate parent's semaphore if any
)) { # ... then create the fan out of the cache:
push @output_job_ids, $funnel_job_id;
my $failed_to_create = 0;
foreach my $pair ( @$fan_cache ) {
my ($output_id, $fan_analysis) = @$pair;
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
-analysis => $fan_analysis,
-prev_job => $self,
-semaphored_job_id => $funnel_job_id, # by passing this parameter we request not to propagate semaphores
-push_new_semaphore => 1, # inform the adaptor that semaphore count doesn't need up-adjustment
)) {
push @output_job_ids, $job_id;
} else { # count all dependent jobs that failed to create
$failed_to_create++;
}
}
if($failed_to_create) { # adjust semaphore_count for jobs that failed to create:
$self->adaptor->decrease_semaphore_count_for_jobid( $funnel_job_id, $failed_to_create );
}
} else { # assume the whole semaphored group of jobs has been created already
}
);
} else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
my ($funnel_job_id) = @{ $job_adaptor->store_jobs_and_adjust_counters( [ $funnel_job ], 0) };
if($funnel_job_id) { # if a semaphored funnel job creation succeeded, then store the fan out of the cache:
foreach my $output_id ( @$output_ids_for_this_rule ) {
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
-analysis => $target_analysis_or_table,
-prev_job => $self,
-semaphored_job_id => $self->semaphored_job_id(), # propagate parent's semaphore if any
)) {
# only add the ones that were indeed created:
push @output_job_ids, $job_id;
foreach my $fan_job (@$fan_jobs) { # set the funnel in every fan's job:
$fan_job->semaphored_job_id( $funnel_job_id );
}
push @output_job_ids, $funnel_job_id, @{ $job_adaptor->store_jobs_and_adjust_counters( $fan_jobs, 1) };
}
}
} else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
my @non_semaphored_jobs = map { Bio::EnsEMBL::Hive::AnalysisJob->new(
-prev_job_id => $self->dbID,
-analysis_id => $target_analysis_or_table->dbID, # expecting an Analysis
-input_id => $_,
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
-semaphored_job_id => $self->semaphored_job_id(), # propagate parent's semaphore if any
) } @$output_ids_for_this_rule;
push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0) };
}
}
}
}
} # /if
} # /foreach my $rule
return \@output_job_ids;
}
......
......@@ -125,7 +125,9 @@ sub CreateNewJob {
-semaphored_job_id => $semaphored_job_id,
);
return $analysis->adaptor->db->get_AnalysisJobAdaptor->store_job_and_adjust_counters( $job, $push_new_semaphore );
my ($job_id) = @{ $analysis->adaptor->db->get_AnalysisJobAdaptor->store_jobs_and_adjust_counters( [ $job ], $push_new_semaphore ) };
return $job_id;
}
......@@ -136,41 +138,58 @@ sub CreateNewJob {
###############################################################################
sub store_job_and_adjust_counters {
my ($self, $job, $push_new_semaphore) = @_;
sub store_jobs_and_adjust_counters {
my ($self, $jobs, $push_new_semaphore) = @_;
my $dbc = $self->dbc;
my $semaphored_job_id = $job->semaphored_job_id();
# assume all jobs from the same storing batch share the same semaphored_job_id:
my $semaphored_job_id = scalar(@$jobs) && $jobs->[0]->semaphored_job_id();
my $need_to_increase_semaphore_count = ($semaphored_job_id && !$push_new_semaphore);
my $dbc = $self->dbc;
# avoid deadlocks when dataflowing under transactional mode (used in Ortheus Runnable for example):
$dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($need_to_increase_semaphore_count and ($dbc->driver ne 'sqlite'));
my @output_job_ids = ();
my $failed_to_store = 0;
my ($job, $stored_this_time) = $self->store( $job, 0 );
foreach my $job (@$jobs) {
# avoid deadlocks when dataflowing under transactional mode (used in Ortheus Runnable for example):
$dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($need_to_increase_semaphore_count and ($dbc->driver ne 'sqlite'));
if($stored_this_time) {
if($need_to_increase_semaphore_count) { # if we are not creating a new semaphore (where dependent jobs have already been counted),
# but rather propagating an existing one (same or other level), we have to up-adjust the counter
$self->increase_semaphore_count_for_jobid( $semaphored_job_id );
}
my ($job, $stored_this_time) = $self->store( $job, 0 );
if($stored_this_time) {
if($need_to_increase_semaphore_count) { # if we are not creating a new semaphore (where dependent jobs have already been counted),
# but rather propagating an existing one (same or other level), we have to up-adjust the counter
$self->increase_semaphore_count_for_jobid( $semaphored_job_id );
}
unless($self->db->hive_use_triggers()) {
$dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
}
.(($job->status eq 'READY')
? " ,ready_job_count=ready_job_count+1 "
: " ,semaphored_job_count=semaphored_job_count+1 "
).(($dbc->driver eq 'pgsql')
? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
: " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
)." WHERE analysis_id=".$job->analysis_id
);
unless($self->db->hive_use_triggers()) {
$dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
}
.(($job->status eq 'READY')
? " ,ready_job_count=ready_job_count+1 "
: " ,semaphored_job_count=semaphored_job_count+1 "
).(($dbc->driver eq 'pgsql')
? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
: " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
)." WHERE analysis_id=".$job->analysis_id
);
}
push @output_job_ids, $job->dbID();
} else {
$failed_to_store++;
}
}
return $job->dbID();
# adjust semaphore_count for jobs that failed to be stored (but have been pre-counted during funnel's creation):
if($push_new_semaphore and $failed_to_store) {
$self->decrease_semaphore_count_for_jobid( $semaphored_job_id, $failed_to_store );
}
return \@output_job_ids;
}
......
......@@ -68,6 +68,7 @@ use Bio::EnsEMBL::Hive::DBSQL::SqlSchemaAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::Hive::Extensions;
use Bio::EnsEMBL::Hive::Valley;
......@@ -523,8 +524,9 @@ sub run {
warn "Done.\n\n";
}
my $analysis_adaptor = $hive_dba->get_AnalysisAdaptor;
my $analysis_stats_adaptor = $hive_dba->get_AnalysisStatsAdaptor;
my $analysis_adaptor = $hive_dba->get_AnalysisAdaptor;
my $analysis_stats_adaptor = $hive_dba->get_AnalysisStatsAdaptor;
my $job_adaptor = $hive_dba->get_AnalysisJobAdaptor;
my $valley = Bio::EnsEMBL::Hive::Valley->new( {}, 'LOCAL' );
......@@ -611,13 +613,14 @@ sub run {
}
# now create the corresponding jobs (if there are any):
foreach my $input_id_hash (@{$input_ids || []}) {
Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $input_id_hash, # input_ids are now centrally stringified in the AnalysisJobAdaptor
-analysis => $analysis,
-prev_job_id => undef, # these jobs are created by the initialization script, not by another job
);
if($input_ids) {
my @jobs = map { Bio::EnsEMBL::Hive::AnalysisJob->new(
-prev_job_id => undef, # these jobs are created by the initialization script, not by another job
-analysis_id => $analysis->dbID,
-input_id => $_, # input_ids are now centrally stringified in the AnalysisJob itself
) } @$input_ids;
$job_adaptor->store_jobs_and_adjust_counters( \@jobs );
}
}
......
......@@ -13,6 +13,7 @@ BEGIN {
use Getopt::Long;
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::Utils ('destringify', 'stringify', 'script_usage');
......@@ -89,22 +90,21 @@ sub main {
warn "Since -input_id has not been set, assuming input_id='$input_id'\n";
}
# Make sure all job creations undergo re-stringification
# to avoid alternative "spellings" of the same input_id hash:
$input_id = stringify( destringify( $input_id ) );
my $job = Bio::EnsEMBL::Hive::AnalysisJob->new(
-prev_job_id => undef, # this job has been created by the initialization script, not by another job
-analysis_id => $analysis->dbID,
-input_id => destringify( $input_id ), # Make sure all job creations undergo re-stringification to avoid alternative "spellings" of the same input_id hash
);
my ($job_id) = @{ $hive_dba->get_AnalysisJobAdaptor->store_jobs_and_adjust_counters( [ $job ] ) };
if( my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-analysis => $analysis,
-input_id => $input_id,
-prev_job_id => undef, # this job has been created by the initialization script, not by another job
) ) {
if($job_id) {
print "Job $job_id [ ".$analysis->logic_name.'('.$analysis->dbID.")] : '$input_id'\n";
} else {
warn "Could not create job '$input_id' (it may have been created already)\n";
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment