Commit 64bb3c01 authored by Leo Gordon's avatar Leo Gordon
Browse files

Modified the LongMult example to use accumulated dataflow

parent 30c092f5
......@@ -14,7 +14,7 @@
('add_together' is blocked both by 'part_multiply').
D) As filesystems are frequently a bottleneck for big pipelines, it is advised that eHive processes store intermediate
and final results in a database (in this pipeline, 'intermediate_result' and 'final_result' tables are used).
and final results in a database (in this pipeline, 'accu' and 'final_result' tables are used).
4.2 The pipeline is defined in 4 files:
......@@ -74,15 +74,15 @@
5.5 Go to mysql window again and check the contents of job table. Keep checking as the worker may spend some time in 'pending' state.
After the first worker is done you will see that 'take_b_apart' jobs are now done and new 'part_multiply' and 'add_together' jobs have been created.
Also check the contents of 'intermediate_result' table, it should be empty at that moment:
Also check the contents of 'accu' table, it should be empty at that moment:
MySQL> SELECT * from intermediate_result;
MySQL> SELECT * from accu;
Go back to the beekeeper window and run the 'beekeeper.pl ... -run' for the second time.
It will submit another worker to the farm that will at some point get the 'part_multiply' jobs.
5.6 Now check both 'job' and 'intermediate_result' tables again.
At some moment 'part_multiply' jobs will have been completed and the results will go into 'intermediate_result' table;
5.6 Now check both 'job' and 'accu' tables again.
At some moment 'part_multiply' jobs will have been completed and the results will go into 'accu' table;
'add_together' jobs are still to be done.
Check the contents of 'final_result' table (should be empty) and run the third and the last round of 'beekeeper.pl ... -run'
......
......@@ -36,10 +36,10 @@ init_pipeline.pl Bio::EnsEMBL::Hive::PipeConfig::LongMult_conf -job_topup -passw
takes the second one apart into digits, finds what _different_ digits are there,
creates several jobs of the 'part_multiply' analysis and one job of 'add_together' analysis.
* A 'part_multiply' job takes in 'a_multiplier' and 'digit', multiplies them and records the result in 'intermediate_result' table.
* A 'part_multiply' job takes in 'a_multiplier' and 'digit', multiplies them and accumulates the result in 'partial_product' accumulator.
* An 'add_together' job waits for the first two analyses to complete,
takes in 'a_multiplier', 'b_multiplier' and 'intermediate_result' table and produces the final result in 'final_result' table.
takes in 'a_multiplier', 'b_multiplier' and 'partial_product' hash and produces the final result in 'final_result' table.
Please see the implementation details in Runnable modules themselves.
......@@ -94,7 +94,6 @@ sub pipeline_create_commands {
@{$self->SUPER::pipeline_create_commands}, # inheriting database and hive tables' creation
# additional tables needed for long multiplication pipeline's operation:
$self->db_execute_command('pipeline_db', 'CREATE TABLE intermediate_result (a_multiplier char(40) NOT NULL, digit tinyint NOT NULL, result char(41) NOT NULL, PRIMARY KEY (a_multiplier, digit))'),
$self->db_execute_command('pipeline_db', 'CREATE TABLE final_result (a_multiplier char(40) NOT NULL, b_multiplier char(40) NOT NULL, result char(80) NOT NULL, PRIMARY KEY (a_multiplier, b_multiplier))'),
];
}
......@@ -158,7 +157,7 @@ sub pipeline_analyses {
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply',
-analysis_capacity => 4, # use per-analysis limiter
-flow_into => {
1 => [ ':////intermediate_result' ],
1 => [ ':////accu?partial_product={digit}' ],
},
},
......
......@@ -20,14 +20,13 @@ and stores the result in 'final_result' database table.
package Bio::EnsEMBL::Hive::RunnableDB::LongMult::AddTogether;
use strict;
use Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor; # to fetch data from 'intermediate_result' table
use base ('Bio::EnsEMBL::Hive::Process');
=head2 fetch_input
Description : Implements fetch_input() interface method of Bio::EnsEMBL::Hive::Process that is used to read in parameters and load data.
Here all relevant partial products are fetched from the 'intermediate_result' table and stored in a hash for future use.
Here all relevant partial products are fetched from the 'partial_product' accumulator and stored in a hash for future use.
param('a_multiplier'): The first long number (a string of digits - doesn't have to fit a register).
......@@ -40,16 +39,11 @@ use base ('Bio::EnsEMBL::Hive::Process');
sub fetch_input { # fetch all the (relevant) precomputed products
my $self = shift @_;
my $a_multiplier = $self->param_required('a_multiplier');
my $a_multiplier = $self->param_required('a_multiplier');
my $partial_product = $self->param('partial_product');
my $adaptor = $self->db->get_NakedTableAdaptor();
$adaptor->table_name( 'intermediate_result' );
my $product_pair = $adaptor->fetch_by_a_multiplier_HASHED_FROM_digit_TO_result( $a_multiplier );
$product_pair->{1} = $a_multiplier;
$product_pair->{0} = 0;
$self->param('product_pair', $product_pair);
$partial_product->{1} = $a_multiplier;
$partial_product->{0} = 0;
}
=head2 run
......@@ -62,10 +56,10 @@ sub fetch_input { # fetch all the (relevant) precomputed products
sub run { # call the function that will compute the stuff
my $self = shift @_;
my $b_multiplier = $self->param_required('b_multiplier');
my $product_pair = $self->param('product_pair');
my $b_multiplier = $self->param_required('b_multiplier');
my $partial_product = $self->param('partial_product');
$self->param('result', _add_together($b_multiplier, $product_pair));
$self->param('result', _add_together($b_multiplier, $partial_product));
sleep( $self->param('take_time') );
}
......@@ -95,14 +89,14 @@ sub write_output { # store and dataflow
=cut
sub _add_together {
my ($b_multiplier, $product_pair) = @_;
my ($b_multiplier, $partial_product) = @_;
my @accu = ();
my @b_digits = reverse split(//, $b_multiplier);
foreach my $b_index (0..(@b_digits-1)) {
my $b_digit = $b_digits[$b_index];
my $product = $product_pair->{$b_digit};
my $product = $partial_product->{$b_digit};
my @p_digits = reverse split(//, $product);
foreach my $p_index (0..(@p_digits-1)) {
......
......@@ -13,7 +13,7 @@ to understand how this particular example pipeline is configured and ran.
=head1 DESCRIPTION
'LongMult::PartMultiply' has a separate task of multiplying 'a_multiplier' by the given 'digit',
then it stores the result into the 'intermetiate_result' database table.
then it passes its partial_product on.
=cut
......@@ -52,7 +52,7 @@ sub run { # call the recursive function that will compute the stuff
my $a_multiplier = $self->param_required('a_multiplier');
my $digit = $self->param_required('digit');
$self->param('result', _rec_multiply($a_multiplier, $digit, 0) || 0);
$self->param('partial_product', _rec_multiply($a_multiplier, $digit, 0) || 0);
sleep( $self->param('take_time') );
}
......@@ -60,7 +60,7 @@ sub run { # call the recursive function that will compute the stuff
=head2 write_output
Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
Dataflows the intermediate results down branch 1, which will be routed into 'intermediate_result' table.
Dataflows the intermediate results down branch 1, which will be routed into 'partial_product' accumulator.
=cut
......@@ -68,9 +68,9 @@ sub write_output { # but this time we have something to store
my $self = shift @_;
$self->dataflow_output_id( {
'a_multiplier' => $self->param('a_multiplier'),
'digit' => $self->param('digit'),
'result' => $self->param('result')
'a_multiplier' => $self->param('a_multiplier'),
'digit' => $self->param('digit'),
'partial_product' => $self->param('partial_product')
}, 1);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment