Commit 90bd8f85 authored by Leo Gordon's avatar Leo Gordon
Browse files

simplified logic that controls how semaphores are propagates (preparing for...

simplified logic that controls how semaphores are propagates (preparing for semaphore escaping rule support)
parent a3bbdb8d
......@@ -299,12 +299,12 @@ sub dataflow_output_id {
} else {
if(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) { # a semaphored fan: they will have to wait in cache until the funnel is created
if(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) { # members of a semaphored fan will have to wait in cache until the funnel is created:
my $fan_cache_this_branch = $self->fan_cache()->{$funnel_dataflow_rule_id} ||= [];
push @$fan_cache_this_branch, map { [$_, $target_analysis_or_table] } @$output_ids_for_this_rule;
} else {
} else { # either a semaphored funnel or a non-semaphored dataflow:
my $fan_cache = delete $self->fan_cache()->{$rule->dbID}; # clear the cache at the same time
......@@ -315,14 +315,17 @@ sub dataflow_output_id {
$self->transient_error(0);
die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
} elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( # if a semaphored funnel job creation succeeded,
} elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( # if a semaphored funnel job creation succeeded, ...
-input_id => $output_ids_for_this_rule->[0],
-analysis => $target_analysis_or_table,
-prev_job => $self,
-semaphore_count => scalar(@$fan_cache),
)) { # then create the fan out of the cache:
-semaphore_count => scalar(@$fan_cache), # "pre-increase" the semaphore count before creating the dependent jobs
-semaphored_job_id => $self->semaphored_job_id(), # propagate parent's semaphore if any
)) { # ... then create the fan out of the cache:
push @output_job_ids, $funnel_job_id;
my $failed_to_create = 0;
foreach my $pair ( @$fan_cache ) {
my ($output_id, $fan_analysis) = @$pair;
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
......@@ -330,10 +333,17 @@ sub dataflow_output_id {
-analysis => $fan_analysis,
-prev_job => $self,
-semaphored_job_id => $funnel_job_id, # by passing this parameter we request not to propagate semaphores
-push_new_semaphore => 1, # inform the adaptor that semaphore count doesn't need up-adjustment
)) {
push @output_job_ids, $job_id;
} else { # count all dependent jobs that failed to create
$failed_to_create++;
}
}
if($failed_to_create) { # adjust semaphore_count for jobs that failed to create:
$self->adaptor->decrease_semaphore_count_for_jobid( $funnel_job_id, $failed_to_create );
}
} else {
die "Could not create a funnel job";
}
......@@ -343,9 +353,10 @@ sub dataflow_output_id {
foreach my $output_id ( @$output_ids_for_this_rule ) {
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-analysis => $target_analysis_or_table,
-prev_job => $self,
-input_id => $output_id,
-analysis => $target_analysis_or_table,
-prev_job => $self,
-semaphored_job_id => $self->semaphored_job_id(), # propagate parent's semaphore if any
)) {
# only add the ones that were indeed created:
push @output_job_ids, $job_id;
......
......@@ -69,6 +69,7 @@ use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
Also updates corresponding analysis_stats by incrementing total_job_count,
unclaimed_job_count and flagging the incremental update by changing the status
to 'LOADING' (but only if the analysis is not blocked).
NOTE: no AnalysisJob object is created in memory as the result of this call; it is simply a "fast store".
Returntype : int job_id on database analysis is from.
Exceptions : thrown if either -input_id or -analysis are not properly defined
Caller : general
......@@ -80,8 +81,8 @@ sub CreateNewJob {
return undef unless(scalar @args);
my ($input_id, $analysis, $prev_job, $prev_job_id, $blocked, $semaphore_count, $semaphored_job_id) =
rearrange([qw(INPUT_ID ANALYSIS PREV_JOB INPUT_JOB_ID BLOCK SEMAPHORE_COUNT SEMAPHORED_JOB_ID)], @args);
my ($input_id, $analysis, $prev_job, $prev_job_id, $blocked, $semaphore_count, $semaphored_job_id, $push_new_semaphore) =
rearrange([qw(INPUT_ID ANALYSIS PREV_JOB INPUT_JOB_ID BLOCK SEMAPHORE_COUNT SEMAPHORED_JOB_ID PUSH_NEW_SEMAPHORE)], @args);
throw("must define input_id") unless($input_id);
throw("must define analysis") unless($analysis);
......@@ -93,12 +94,6 @@ sub CreateNewJob {
$prev_job_id = $prev_job && $prev_job->dbID();
# if the user did not specifically ask for a new fan, consider propagation:
my $propagate_semaphore = !defined($semaphored_job_id);
# if nothing is supplied, semaphored_job_id will be propagated from the parent job:
$semaphored_job_id ||= $prev_job && $prev_job->semaphored_job_id();
if(ref($input_id)) { # let's do the Perl hash stringification centrally rather than in many places:
$input_id = stringify($input_id);
}
......@@ -123,15 +118,16 @@ sub CreateNewJob {
my $return_code = $sth->execute(@values)
# using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
or die "Coule not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
my $job_id;
if($return_code > 0) { # <--- for the same reason we have to be explicitly numeric here:
$job_id = $dbc->db_handle->last_insert_id(undef, undef, 'job', 'job_id');
$sth->finish;
if($semaphored_job_id and $propagate_semaphore) { # ready to propagate and something to propagate
$prev_job->adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id ); # propagate the semaphore
if($semaphored_job_id and !$push_new_semaphore) { # if we are not creating a new semaphore (where dependent jobs have already been counted),
# but rather propagating an existing one (same or other level), we have to up-adjust the counter
$prev_job->adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id );
}
unless($dba->hive_use_triggers()) {
......@@ -143,8 +139,6 @@ sub CreateNewJob {
WHERE analysis_id=$analysis_id
});
}
} elsif($semaphored_job_id and !$propagate_semaphore) { # if we didn't succeed in creating the job, fix the semaphore
$prev_job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );
}
return $job_id;
......@@ -389,7 +383,7 @@ sub _objs_from_sth {
sub decrease_semaphore_count_for_jobid { # used in semaphore annihilation or unsuccessful creation
my $self = shift @_;
my $jobid = shift @_;
my $jobid = shift @_ or return;
my $dec = shift @_ || 1;
my $sql = "UPDATE job SET semaphore_count=semaphore_count-? WHERE job_id=?";
......@@ -401,7 +395,7 @@ sub decrease_semaphore_count_for_jobid { # used in semaphore annihilation or
sub increase_semaphore_count_for_jobid { # used in semaphore propagation
my $self = shift @_;
my $jobid = shift @_;
my $jobid = shift @_ or return;
my $inc = shift @_ || 1;
my $sql = "UPDATE job SET semaphore_count=semaphore_count+? WHERE job_id=?";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment