my ($self, $output_ids, $branch_name_or_code, $create_job_options) = @_;
my $input_id = $self->input_id();
my $param_id_stack = $self->param_id_stack();
my $accu_id_stack = $self->accu_id_stack();
my $job_adaptor = $self->adaptor();
my $hive_use_param_stack = $job_adaptor && $job_adaptor->db->hive_use_param_stack();
if($hive_use_param_stack) {
if($input_id and ($input_id ne
'{}')) { # add the parent to the
param_id_stack if it had non-trivial extra parameters
$param_id_stack = ($param_id_stack ? $param_id_stack.',' : '').$self->dbID();
}
if(scalar(keys %{$self->accu_hash()})) { # add the parent to the
accu_id_stack if it had
"own" accumulator
$accu_id_stack = ($accu_id_stack ? $accu_id_stack.',' : '').$self->dbID();
}
}
$output_ids ||= [ $hive_use_param_stack ? {} : $input_id ]; # by default replicate the parameters of the parent in the child
$output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY'); # force previously used single values into an arrayref
if($create_job_options) {
die "Please consider configuring semaphored dataflow from PipeConfig rather than setting it up manually";
}
# map branch names to numbers:
# if branch_code is set to 1 (explicitly or impliticly), turn off automatic dataflow:
$self->autoflow(0) if($branch_code == 1);
my @output_job_ids = ();
# sort rules to make sure the fan rules come before funnel rules for the same branch_code:
foreach my $rule (sort {($b->funnel_dataflow_rule_id||0) <=> ($a->funnel_dataflow_rule_id||0)} @{ $self->dataflow_rules( $branch_code ) }) {
# parameter substitution into input_id_template is rule-specific
my $output_ids_for_this_rule;
if(my $template_string = $rule->input_id_template()) {
my $template_hash = destringify($template_string);
$output_ids_for_this_rule = [ map { $self->param_substitute($template_hash, $_) } @$output_ids ];
} else {
$output_ids_for_this_rule = $output_ids;
}
my $target_analysis_or_table = $rule->to_analysis();
if($target_analysis_or_table->can('dataflow')) {
$target_analysis_or_table->dataflow( $output_ids_for_this_rule, $self );
} elsif(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) { # members of a semaphored fan will have to wait in cache until the funnel is created:
my $fan_cache_this_branch = $self->fan_cache()->{$funnel_dataflow_rule_id} ||= [];
'prev_job' => $self,
'analysis' => $target_analysis_or_table, # expecting an Analysis
'input_id' => $_,
'param_id_stack' => $param_id_stack,
'accu_id_stack' => $accu_id_stack,
) } @$output_ids_for_this_rule;
} else { # either a semaphored funnel or a non-semaphored dataflow:
my $fan_jobs =
delete $self->fan_cache()->{$rule->
dbID}; # clear the cache at the same time
if($fan_jobs && @$fan_jobs) { # a semaphored funnel
if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
$self->transient_error(0);
die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
} else {
'prev_job' => $self,
'analysis' => $target_analysis_or_table, # expecting an Analysis
'input_id' => $output_ids_for_this_rule->[0],
'param_id_stack' => $param_id_stack,
'accu_id_stack' => $accu_id_stack,
'semaphore_count' => scalar(@$fan_jobs), # "pre-increase" the semaphore count before creating the dependent jobs
'semaphored_job_id' => $self->semaphored_job_id(), # propagate parent's semaphore if any
);
my ($funnel_job_id) = @{ $job_adaptor->store_jobs_and_adjust_counters( [ $funnel_job ], 0) };
if($funnel_job_id) { # if a semaphored funnel job creation succeeded, then store the fan out of the cache:
foreach my $fan_job (@$fan_jobs) { # set the funnel in every fan's job:
$fan_job->semaphored_job_id( $funnel_job_id );
}
push @output_job_ids, $funnel_job_id, @{ $job_adaptor->store_jobs_and_adjust_counters( $fan_jobs, 1) };
}
}
} else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
'prev_job' => $self,
'analysis' => $target_analysis_or_table, # expecting an Analysis
'input_id' => $_,
'param_id_stack' => $param_id_stack,
'accu_id_stack' => $accu_id_stack,
'semaphored_job_id' => $self->semaphored_job_id(), # propagate parent's semaphore if any
) } @$output_ids_for_this_rule;
push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0) };
}
} # /if
} # /foreach my $rule
return \@output_job_ids;
}