Commit d87592d6 authored by Leo Gordon's avatar Leo Gordon
Browse files

a non-biological eHive pipeline example with dataflow-, control-rules and dynamic job creation

parent 42e389c4
=pod
=head1 NAME
Bio::EnsEMBL::Hive::RunnableDB::LongMult::AddTogether
=head1 DESCRIPTION
'LongMult::AddTogether' is the final step of the pipeline that, naturally, adds the products together
and stores the result in 'final_result' database table;
=cut
package Bio::EnsEMBL::Hive::RunnableDB::LongMult::AddTogether;
use strict;
use base ('Bio::EnsEMBL::Hive::ProcessWithParams');
sub fetch_input { # fetch all the (relevant) precomputed products
my $self = shift @_;
my $a_multiplier = $self->param('a_multiplier') || die "'a_multiplier' is an obligatory parameter";
my %product_pair = ();
my $sql = "SELECT digit, result FROM intermediate_result WHERE a_multiplier = ?";
my $sth = $self->db->dbc()->prepare($sql);
$sth->execute($a_multiplier);
while (my ($digit, $result)=$sth->fetchrow_array()) {
$product_pair{$digit} = $result;
}
$product_pair{1} = $a_multiplier;
$product_pair{0} = 0;
$self->param('product_pair', \%product_pair);
return 1;
}
sub run { # call the function that will compute the stuff
my $self = shift @_;
my $b_multiplier = $self->param('b_multiplier') || die "'b_multiplier' is an obligatory parameter";
my $product_pair = $self->param('product_pair');
$self->param('result', add_together($b_multiplier, $product_pair));
}
sub write_output { # store the final result
my $self = shift @_;
my $sql = "REPLACE INTO final_result (a_multiplier, b_multiplier, result) VALUES (?, ?, ?) ";
my $sth = $self->db->dbc->prepare($sql);
$sth->execute(
$self->param('a_multiplier'),
$self->param('b_multiplier'),
$self->param('result')
);
return 1;
}
######################### do the maths ###############
sub add_together {
my ($b_multiplier, $product_pair) = @_;
my @accu = ();
my @b_digits = reverse split(//, $b_multiplier);
foreach my $b_index (0..(@b_digits-1)) {
my $b_digit = $b_digits[$b_index];
my $product = $product_pair->{$b_digit};
my @p_digits = reverse split(//, $product);
foreach my $p_index (0..(@p_digits-1)) {
$accu[$b_index+$p_index] += $p_digits[$p_index];
}
}
foreach my $a_index (0..(@accu-1)) {
my $a_digit = $accu[$a_index];
my $carry = int($a_digit/10);
$accu[$a_index] = $a_digit % 10;
$accu[$a_index+1] += $carry;
}
# get rid of the leading zero
unless($accu[@accu-1]) {
pop @accu;
}
return join('', reverse @accu);
}
1;
=pod
=head1 NAME
Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply
=head1 DESCRIPTION
'LongMult::PartMultiply' has a separate task of multiplying 'a_multiplier' by the given 'digit',
then it stores the result into the 'intermetiate_result' database table.
=cut
package Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply;
use strict;
use base ('Bio::EnsEMBL::Hive::ProcessWithParams');
sub fetch_input { # again, nothing to fetch
my $self = shift @_;
return 1;
}
sub run { # call the recursive function that will compute the stuff
my $self = shift @_;
my $a_multiplier = $self->param('a_multiplier') || die "'a_multiplier' is an obligatory parameter";
my $digit = $self->param('digit') || die "'digit' is an obligatory parameter";
$self->param('result', rec_multiply($a_multiplier, $digit, 0) || 0);
}
sub write_output { # but this time we have something to store
my $self = shift @_;
my $sql = "REPLACE INTO intermediate_result (a_multiplier, digit, result) VALUES (?, ?, ?) ";
my $sth = $self->db->dbc->prepare($sql);
$sth->execute(
$self->param('a_multiplier'),
$self->param('digit'),
$self->param('result')
);
return 1;
}
######################### long multiplication ###############
sub rec_multiply {
my ($a_multiplier, $digit, $carry) = @_;
# recursion end:
unless($a_multiplier) {
return ($carry || '');
}
# recursion step:
if($a_multiplier=~/^(\d*)(\d)$/) {
my ($prefix, $last_digit) = ($1, $2);
my $this_product = $last_digit*$digit+$carry;
my $this_result = $this_product % 10;
my $this_carry = int($this_product / 10);
return rec_multiply($prefix, $digit, $this_carry).$this_result;
} else {
die "'a_multiplier' has to be a decimal number";
}
}
1;
############################################################################################################################
#
# Bio::EnsEMBL::Hive::RunnableDB::LongMult is an example eHive pipeline that demonstates the following features:
#
# A) A pipeline can have multiple analyses (this one has three: 'start', 'part_multiply' and 'add_together').
#
# B) A job of one analysis can create jobs of another analysis (one 'start' job creates up to 8 'part_multiply' jobs).
#
# C) A job of one analysis can "flow the data" into another analysis (a 'start' job "flows into" an 'add_together' job).
#
# D) Execution of one analysis can be blocked until all jobs of another analysis have been successfully completed
# ('add_together' is blocked both by 'start' and 'part_multiply').
#
# E) As filesystems are frequently a bottleneck for big pipelines, it is advised that eHive processes store intermediate
# and final results in a database (in this pipeline, 'intermediate_result' and 'final_result' tables are used).
#
############################################################################################################################
# 1. Create an empty database:
mysql --defaults-group-suffix=_compara1 -e 'DROP DATABASE IF EXISTS long_mult_test'
mysql --defaults-group-suffix=_compara1 -e 'CREATE DATABASE long_mult_test'
# 2. Create eHive infrastructure:
mysql --defaults-group-suffix=_compara1 long_mult_test <~lg4/work/ensembl-hive/sql/tables.sql
# 3. Create analyses/control_rules/dataflow_rules of the LongMult pipeline:
mysql --defaults-group-suffix=_compara1 long_mult_test <~lg4/work/ensembl-hive/sql/create_long_mult.sql
# 4. "Load" the pipeline with a multiplication task:
mysql --defaults-group-suffix=_compara1 long_mult_test <~lg4/work/ensembl-hive/sql/load_long_mult.sql
#
# or you can add your own task(s). Several tasks can be added at once:
mysql --defaults-group-suffix=_compara1 long_mult_test <<EoF
INSERT INTO analysis_job (analysis_id, input_id) VALUES ( 1, "{ 'a_multiplier' => '9750516269', 'b_multiplier' => '327358788' }");
INSERT INTO analysis_job (analysis_id, input_id) VALUES ( 1, "{ 'a_multiplier' => '327358788', 'b_multiplier' => '9750516269' }");
EoF
# 5. Initialize the newly created eHive for the first time:
beekeeper.pl -url mysql://ensadmin:ensembl@compara1/long_mult_test -sync
# 6. You can either execute three individual workers (each picking one analysis of the pipeline):
runWorker.pl -url mysql://ensadmin:ensembl@compara1/long_mult_test
#
#
# ... or run an automatic loop that will run workers for you:
beekeeper.pl -url mysql://ensadmin:ensembl@compara1/long_mult_test -loop
# 7. The results of the computations are to be found in 'final_result' table:
mysql --defaults-group-suffix=_compara1 long_mult_test -e 'SELECT * FROM final_result'
# 8. You can add more multiplication tasks by repeating from step 4.
=pod
=head1 NAME
Bio::EnsEMBL::Hive::RunnableDB::LongMult::Start
=head1 DESCRIPTION
'LongMult::Start' is the first step of the LongMult example pipeline that multiplies two long numbers.
It takes apart the second multiplier and creates several 'LongMult::PartMultiply' jobs
that correspond to the different digits of the second multiplier.
It also "flows into" one 'LongMult::AddTogether' job that will wait until 'LongMult::PartMultiply' jobs
complete and will arrive at the final result.
=cut
package Bio::EnsEMBL::Hive::RunnableDB::LongMult::Start;
use strict;
use base ('Bio::EnsEMBL::Hive::ProcessWithParams');
sub fetch_input { # this time we have nothing to fetch
my $self = shift @_;
return 1;
}
sub run { # following the 'divide and conquer' principle, out job is to create jobs:
my $self = shift @_;
my $a_multiplier = $self->param('a_multiplier') || die "'a_multiplier' is an obligatory parameter";
my $b_multiplier = $self->param('b_multiplier') || die "'b_multiplier' is an obligatory parameter";
my %digit_hash = ();
foreach my $digit (split(//,$b_multiplier)) {
next if (($digit eq '0') or ($digit eq '1'));
$digit_hash{$digit}++;
}
my $pm_analysis = $self->db->get_AnalysisAdaptor()->fetch_by_logic_name('part_multiply');
my $current_job_id = $self->input_job->dbID();
foreach my $digit (keys %digit_hash) {
Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob (
-input_id => "{ 'a_multiplier' => '$a_multiplier', 'digit' => '$digit' }",
-analysis => $pm_analysis,
-input_job_id => $current_job_id,
);
}
}
sub write_output { # and we have nothing to write out
my $self = shift @_;
$self->dataflow_output_id($self->input_id); # flow into an 'add_together' job
return 1;
}
1;
# create the 3 analyses we are going to use:
INSERT INTO analysis (created, logic_name, module) VALUES (NOW(), 'start', 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::Start');
INSERT INTO analysis (created, logic_name, module) VALUES (NOW(), 'part_multiply', 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply');
INSERT INTO analysis (created, logic_name, module) VALUES (NOW(), 'add_together', 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::AddTogether');
# link the analyses with control- and dataflow-rules:
INSERT INTO analysis_ctrl_rule (condition_analysis_url, ctrled_analysis_id) VALUES ('start', (SELECT analysis_id FROM analysis WHERE logic_name='add_together'));
INSERT INTO analysis_ctrl_rule (condition_analysis_url, ctrled_analysis_id) VALUES ('part_multiply', (SELECT analysis_id FROM analysis WHERE logic_name='add_together'));
INSERT INTO dataflow_rule (from_analysis_id, to_analysis_url) VALUES ((SELECT analysis_id FROM analysis WHERE logic_name='start'), 'add_together');
# create a table for holding intermediate results (written by 'part_multiply' and read by 'add_together')
CREATE TABLE intermediate_result (
a_multiplier char(40) NOT NULL,
digit tinyint NOT NULL,
result char(41) NOT NULL,
PRIMARY KEY (a_multiplier, digit)
);
# create a table for holding final results (written by 'add_together')
CREATE TABLE final_result (
a_multiplier char(40) NOT NULL,
b_multiplier char(40) NOT NULL,
result char(80) NOT NULL,
PRIMARY KEY (a_multiplier, b_multiplier)
);
# To multiply two long numbers using the long_mult pipeline
# we have to create the 'start' job and provide the two multipliers:
INSERT INTO analysis_job (analysis_id, input_id) VALUES (
(SELECT analysis_id FROM analysis WHERE logic_name='start'),
"{ 'a_multiplier' => '123456789', 'b_multiplier' => '90319' }");
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment