Skip to content
Snippets Groups Projects
Commit d045a873 authored by Leo Gordon's avatar Leo Gordon
Browse files

Dataflowing minimal information out of Runnables, relying on templates in...

Dataflowing minimal information out of Runnables, relying on templates in PipeConfig file to extend it if needed
parent 711ca555
No related branches found
No related tags found
No related merge requests found
......@@ -150,7 +150,7 @@ sub pipeline_analyses {
],
-flow_into => {
'2->A' => { 'part_multiply' => { 'a_multiplier' => '#a_multiplier#', 'digit' => '#digit#' } }, # will create a semaphored fan of jobs; will use a template to top-up the hashes
'A->1' => [ 'add_together' ], # will create a semaphored funnel job to wait for the fan to complete and add the results
'A->1' => [ 'add_together' ], # will create a semaphored funnel job to wait for the fan to complete and add the results
},
},
......@@ -158,7 +158,7 @@ sub pipeline_analyses {
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply',
-analysis_capacity => 4, # use per-analysis limiter
-flow_into => {
1 => [ ':////accu?partial_product={digit}' ],
1 => { ':////accu?partial_product={digit}' => { 'a_multiplier' => '#a_multiplier#', 'digit' => '#digit#', 'partial_product' => '#partial_product#' } },
},
},
......@@ -166,7 +166,7 @@ sub pipeline_analyses {
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::AddTogether',
# -analysis_capacity => 0, # this is a way to temporarily block a given analysis
-flow_into => {
1 => [ ':////final_result' ],
1 => { ':////final_result' => { 'a_multiplier' => '#a_multiplier#', 'b_multiplier' => '#b_multiplier#', 'result' => '#result#' } },
},
},
];
......
......@@ -91,8 +91,6 @@ sub write_output { # store and dataflow
my $self = shift @_;
$self->dataflow_output_id({
'a_multiplier' => $self->param('a_multiplier'),
'b_multiplier' => $self->param('b_multiplier'),
'result' => $self->param('result'),
}, 1);
}
......
......@@ -65,11 +65,11 @@ sub fetch_input {
$digit_hash{$digit}++;
}
# output_ids of partial multiplications to be computed:
my @output_ids = map { { 'digit' => $_ } } keys %digit_hash;
# parameter hashes of partial multiplications to be computed:
my @sub_tasks = map { { 'digit' => $_ } } keys %digit_hash;
# store them for future use:
$self->param('output_ids', \@output_ids);
$self->param('sub_tasks', \@sub_tasks);
}
......@@ -98,15 +98,15 @@ sub run {
sub write_output { # nothing to write out, but some dataflow to perform:
my $self = shift @_;
my $output_ids = $self->param('output_ids');
my $sub_tasks = $self->param('sub_tasks');
# "fan out" into branch#2 first
$self->dataflow_output_id($output_ids, 2);
# "fan out" into branch#2 first, branch#1 will be created if we wire it (and we do)
$self->dataflow_output_id($sub_tasks, 2);
$self->warning(scalar(@$output_ids).' multiplication jobs have been created'); # warning messages get recorded into 'log_message' table
$self->warning(scalar(@$sub_tasks).' multiplication jobs have been created'); # warning messages get recorded into 'log_message' table
# then flow into the branch#1 funnel; input_id would flow into branch#1 by default anyway, but we request it here explicitly:
$self->dataflow_output_id($self->input_id, 1);
## extra information sent to the funnel will extend its stack:
# $self->dataflow_output_id( { 'different_digits' => scalar(@$sub_tasks) } , 1);
}
1;
......
......@@ -83,8 +83,6 @@ sub write_output { # but this time we have something to store
my $self = shift @_;
$self->dataflow_output_id( {
'a_multiplier' => $self->param('a_multiplier'),
'digit' => $self->param('digit'),
'partial_product' => $self->param('partial_product')
}, 1);
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment