Commit 4c1bd8c6 authored by Leo Gordon's avatar Leo Gordon
Browse files

added support for dataflow-into-tables

parent 26f9a7da
......@@ -16,6 +16,10 @@ Summary:
Bio::EnsEMBL::Analysis::RunnableDB perl wrapper objects as nodes/blocks in
the graphs but could be adapted more generally.
13 June, 2010 : Leo Gordon
* Added support for dataflow-into-tables, see LongMult example.
10 June, 2010 : Leo Gordon
* A bug preventing users from setting hive_output_dir via pipeline_wide_parameters has been fixed.
......
......@@ -234,23 +234,18 @@ sub fetch_all_failed_jobs {
}
sub fetch_by_url_query
{
my $self = shift;
my $query = shift;
sub fetch_by_url_query {
my ($self, $field_name, $field_value) = @_;
if($field_name eq 'dbID' and $field_value) {
return undef unless($query);
#print("Bio::EnsEMBL::DBSQL::AnalysisAdaptor::fetch_by_url_query : $query\n");
return $self->fetch_by_dbID($field_value);
if((my $p=index($query, "=")) != -1) {
my $type = substr($query,0, $p);
my $value = substr($query,$p+1,length($query));
} else {
return;
if($type eq 'dbID') {
return $self->fetch_by_dbID($value);
}
}
return undef;
}
#
......
......@@ -62,6 +62,7 @@ sub get_available_adaptors {
'AnalysisCtrlRule' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor',
'DataflowRule' => 'Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor',
'ResourceDescription' => 'Bio::EnsEMBL::Hive::DBSQL::ResourceDescriptionAdaptor',
'NakedTable' => 'Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor',
);
return (\%pairs);
}
......
......@@ -143,20 +143,24 @@ sub remove {
Function: Creates and stores a new rule in the DB.
Returns : Bio::EnsEMBL::Hive::DataflowRule
Args[1] : Bio::EnsEMBL::Analysis $from_analysis
Args[2] : Bio::EnsEMBL::Analysis $to_analysis
Args[2] : Bio::EnsEMBL::Analysis OR a hive-style URL $to_analysis_or_url
Args[3] : (optional) int $branch_code
Args[4] : (optional) (Perl structure or string) $input_id_template
=cut
sub create_rule {
my ($self, $from_analysis, $to_analysis, $branch_code, $input_id_template) = @_;
my ($self, $from_analysis, $to_analysis_or_url, $branch_code, $input_id_template) = @_;
return unless($from_analysis and $to_analysis);
return unless($from_analysis and $to_analysis_or_url);
my $rule = Bio::EnsEMBL::Hive::DataflowRule->new(
-from_analysis => $from_analysis,
-to_analysis => $to_analysis,
ref($to_analysis_or_url)
? ( -to_analysis => $to_analysis_or_url )
: ( -to_analysis_url => $to_analysis_or_url ),
-branch_code => $branch_code,
-input_id_template => $input_id_template,
);
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor
=head1 SYNOPSIS
$naked_table_adaptor = $dba->get_NakedTableAdaptor;
$naked_table_adaptor = $naked_table->adaptor;
=head1 DESCRIPTION
This module together with its data container are used to enable dataflow into arbitrary tables (rather than just analysis_job).
NakedTable objects know *where* to dataflow, and NakedTableAdaptor knows *how* to dataflow.
=head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut
package Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor;
use strict;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::NakedTable;
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
sub create_new {
my $self = shift @_;
return Bio::EnsEMBL::Hive::NakedTable->new(@_, -ADAPTOR => $self);
}
sub dataflow {
my ( $self, $naked_table, $data_hash ) = @_;
if(not ref($data_hash)) { # assume it was passed as a string
$data_hash = eval $data_hash;
}
my $table_name = $naked_table->table_name();
my $insertion_method = uc( $naked_table->insertion_method() ); # INSERT, INSERT_IGNORE or REPLACE
$insertion_method =~ s/\s+/_/g;
# By using question marks you can insert true NULLs by setting corresponding values to undefs:
my $sql = "$insertion_method INTO $table_name (".join(', ', keys %$data_hash).') VALUES ('.join(',', (('?') x scalar(keys %$data_hash))).')';
my $sth = $self->prepare( $sql );
$sth->execute( values %$data_hash ); # Perl manual promises that the order of "keys" will be the same as the order of "values", so no need to sort.
my $insert_id = $sth->{'mysql_insertid'}; # capture it just in case
$sth->finish();
return $insert_id;
}
1;
......@@ -35,9 +35,9 @@
and whatever rules match these conditions will generate new jobs with input_ids specified in the dataflow_output_id() call.
If input_id_template happens to contain a non-NULL value, it will be used to generate the corresponding intput_id instead.
Jessica's original remark on the structure of to_analysis_url:
Jessica's remark on the structure of to_analysis_url:
Extended from design of SimpleRule concept to allow the 'to' analysis to be specified with a network savy URL like
mysql://ensadmin:<pass>@ecs2:3361/compara_hive_test?analysis.logic_name='blast_NCBI34'
mysql://ensadmin:<pass>@ecs2:3361/compara_hive_test/analysis?logic_name='blast_NCBI34'
=head1 CONTACT
......@@ -51,7 +51,7 @@ package Bio::EnsEMBL::Hive::DataflowRule;
use strict;
use Bio::EnsEMBL::Utils::Argument; # import 'rearrange()'
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::URLFactory;
#use Bio::EnsEMBL::Hive::URLFactory;
=head2 new
......@@ -229,33 +229,30 @@ sub from_analysis {
=cut
sub to_analysis {
my ($self,$analysis) = @_;
my ($self, $analysis_or_nt) = @_;
if( defined $analysis ) {
unless ($analysis->isa('Bio::EnsEMBL::Analysis')) {
throw(
"to_analysis arg must be a [Bio::EnsEMBL::Analysis]".
"not a [$analysis]");
if( defined $analysis_or_nt ) {
unless ($analysis_or_nt->can('url')) {
throw( "to_analysis arg must support 'url' method, '$analysis_or_nt' does not know how to do it");
}
$self->{'_to_analysis'} = $analysis;
$self->{'_to_analysis'} = $analysis_or_nt;
#if the 'from' and 'to' share the same adaptor, then use a simple logic_name
#for the URL rather than a full network distributed URL
if($self->from_analysis and ($self->from_analysis->adaptor == $analysis->adaptor)) {
$self->{'_to_analysis_url'} = $analysis->logic_name;
my $ref_rule_adaptor = $self->from_analysis->adaptor;
if($analysis_or_nt->can('logic_name') and $self->from_analysis and ($ref_rule_adaptor == $analysis_or_nt->adaptor)) {
$self->{'_to_analysis_url'} = $analysis_or_nt->logic_name;
} else {
$self->{'_to_analysis_url'} = $analysis->url;
$self->{'_to_analysis_url'} = $analysis_or_nt->url($ref_rule_adaptor->db);
}
}
# lazy load the analysis object if I can
if(!defined($self->{'_to_analysis'}) and defined($self->to_analysis_url)) {
my $analyis = Bio::EnsEMBL::Hive::URLFactory->fetch($self->to_analysis_url);
unless($analysis) {
$analysis =
$self->adaptor->db->get_AnalysisAdaptor->fetch_by_logic_name($self->to_analysis_url);
}
$self->{'_to_analysis'} = $analysis;
$self->{'_to_analysis'} = $self->adaptor->db->get_AnalysisAdaptor->fetch_by_logic_name_or_url($self->to_analysis_url);
}
return $self->{'_to_analysis'};
}
......
......@@ -33,6 +33,7 @@ use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Analysis;
use Bio::EnsEMBL::DBSQL::DBConnection;
use Bio::EnsEMBL::DBSQL::AnalysisAdaptor;
use Bio::EnsEMBL::Hive::URLFactory;
#use Bio::EnsEMBL::Pipeline::RunnableDB;
#use Bio::EnsEMBL::Analysis::RunnableDB;
......@@ -136,27 +137,35 @@ sub Bio::EnsEMBL::Analysis::url
return $url;
}
sub Bio::EnsEMBL::DBSQL::AnalysisAdaptor::fetch_by_logic_name_or_url {
my $self = shift @_;
my $logic_name_or_url = shift @_;
sub Bio::EnsEMBL::DBSQL::AnalysisAdaptor::fetch_by_url_query
{
my $self = shift;
my $query = shift;
if($logic_name_or_url =~ /^\w+$/) {
return $self->fetch_by_logic_name($logic_name_or_url);
} elsif($logic_name_or_url =~ /^mysql:/) {
return Bio::EnsEMBL::Hive::URLFactory->fetch($logic_name_or_url, $self->db);
} else {
return 0;
}
}
return undef unless($query);
#print("Bio::EnsEMBL::DBSQL::AnalysisAdaptor::fetch_by_url_query : $query\n");
sub Bio::EnsEMBL::DBSQL::AnalysisAdaptor::fetch_by_url_query {
my ($self, $field_name, $field_value) = @_;
if((my $p=index($query, "=")) != -1) {
my $type = substr($query,0, $p);
my $value = substr($query,$p+1,length($query));
if(!$field_name or !$field_value) {
return;
} elsif($field_name eq 'logic_name') {
return $self->fetch_by_logic_name($field_value);
} elsif($field_name eq 'dbID') {
return $self->fetch_by_dbID($field_value);
if($type eq 'logic_name') {
return $self->fetch_by_logic_name($value);
}
if($type eq 'dbID') {
return $self->fetch_by_dbID($value);
}
}
return undef;
}
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::NakedTable
=head1 SYNOPSIS
=head1 DESCRIPTION
A data container object that links together an adaptor, a table and a preferred insertion method (insert/insert-ignore/replace).
This object is generated from specially designed datalow URLs.
=head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut
package Bio::EnsEMBL::Hive::NakedTable;
use strict;
use Bio::EnsEMBL::Utils::Argument; # import 'rearrange()'
sub new {
my $class = shift @_;
my $self = bless {}, $class;
my ($adaptor, $table_name, $insertion_method) =
rearrange([qw(adaptor table_name insertion_method) ], @_);
$self->adaptor($adaptor) if(defined($adaptor));
$self->table_name($table_name) if(defined($table_name));
$self->insertion_method($insertion_method) if(defined($insertion_method));
return $self;
}
sub adaptor {
my $self = shift @_;
if(@_) {
$self->{'_adaptor'} = shift @_;
}
return $self->{'_adaptor'};
}
sub table_name {
my $self = shift @_;
if(@_) {
$self->{'_table_name'} = shift @_;
}
return $self->{'_table_name'};
}
sub insertion_method {
my $self = shift @_;
if(@_) {
$self->{'_insertion_method'} = shift @_;
}
return $self->{'_insertion_method'} || 'INSERT';
}
sub url {
my $self = shift @_;
my $ref_dba = shift @_; # if reference dba is the same as 'our' dba, a shorter url can be generated
if(my $adaptor = $self->adaptor) {
my $conn_prefix = ($adaptor->db == $ref_dba) ? 'mysql:///' : $adaptor->db->dbc->url();
return $conn_prefix .'/'. $self->table_name() . '?insertion_method=' . $self->insertion_method();
} else {
return;
}
}
sub dataflow {
my ( $self, $data_hash ) = @_;
return $self->adaptor->dataflow($self, $data_hash);
}
1;
......@@ -407,12 +407,12 @@ sub run {
$wait_for = [ $wait_for ] unless(ref($wait_for) eq 'ARRAY'); # force scalar into an arrayref
# create control rules:
foreach my $condition_logic_name (@$wait_for) {
if(my $condition_analysis = $analysis_adaptor->fetch_by_logic_name($condition_logic_name)) {
foreach my $condition_url (@$wait_for) {
if(my $condition_analysis = $analysis_adaptor->fetch_by_logic_name_or_url($condition_url)) {
$ctrl_rule_adaptor->create_rule( $condition_analysis, $analysis);
warn "Created Control rule: $condition_logic_name -| $logic_name\n";
warn "Created Control rule: $condition_url -| $logic_name\n";
} else {
die "Could not fetch analysis '$condition_logic_name' to create a control rule";
die "Could not fetch analysis '$condition_url' to create a control rule";
}
}
......@@ -426,14 +426,14 @@ sub run {
$heirs = { map { ($_ => undef) } @$heirs } if(ref($heirs) eq 'ARRAY'); # now force it into a hash if it wasn't
while(my ($heir_logic_name, $input_id_template) = each %$heirs) {
if(my $heir_analysis = $analysis_adaptor->fetch_by_logic_name($heir_logic_name)) {
$dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis, $branch_code, $input_id_template);
warn "Created DataFlow rule: [$branch_code] $logic_name -> $heir_logic_name"
.($input_id_template ? ' WITH TEMPLATE: '.stringify($input_id_template) : '')."\n";
} else {
die "Could not fetch analysis '$heir_logic_name' to create a dataflow rule";
}
while(my ($heir_url, $input_id_template) = each %$heirs) {
my $heir_analysis = $analysis_adaptor->fetch_by_logic_name_or_url($heir_url);
$dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis || $heir_url, $branch_code, $input_id_template);
warn "Created DataFlow rule: [$branch_code] $logic_name -> $heir_url (". ($heir_analysis ? 'checked' : 'UNCHECKED') .')'
.($input_id_template ? ' WITH TEMPLATE: '.stringify($input_id_template) : '')."\n";
}
}
}
......
......@@ -72,7 +72,7 @@ sub default_options {
'pipeline_name' => 'long_mult', # name used by the beekeeper to prefix job names on the farm
'pipeline_db' => { # connection parameters
-host => 'compara3',
-host => 'compara1',
-port => 3306,
-user => 'ensadmin',
-pass => $self->o('password'), # a rule where a previously undefined parameter is used (which makes either of them obligatory)
......@@ -139,6 +139,9 @@ sub pipeline_analyses {
-input_ids => [
# (jobs for this analysis will be flown_into via branch-2 from 'start' jobs above)
],
-flow_into => {
1 => [ 'mysql:////intermediate_result' ],
},
},
{ -logic_name => 'add_together',
......@@ -148,6 +151,9 @@ sub pipeline_analyses {
# (jobs for this analysis will be flown_into via branch-1 from 'start' jobs above)
],
-wait_for => [ 'part_multiply' ], # we can only start adding when all partial products have been computed
-flow_into => {
1 => [ 'mysql:////final_result' ],
},
},
];
}
......
......@@ -119,6 +119,10 @@ sub pipeline_analyses {
-input_ids => [
# (jobs for this analysis will be flown_into via branch-2 from 'sema_start' jobs above)
],
-flow_into => {
1 => [ 'mysql:////intermediate_result' ],
},
},
{ -logic_name => 'add_together',
......@@ -128,6 +132,9 @@ sub pipeline_analyses {
# (jobs for this analysis will be flown_into via branch-1 from 'sema_start' jobs above)
],
# jobs in this analyses are semaphored, so no need to '-wait_for'
-flow_into => {
1 => [ 'mysql:////final_result' ],
},
},
];
}
......
......@@ -322,35 +322,40 @@ sub dataflow_output_id {
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
my $rules = $self->db->get_DataflowRuleAdaptor->fetch_from_analysis_id_branch_code($self->analysis->dbID, $branch_code);
foreach my $rule (@{$rules}) {
foreach my $output_id (@$output_ids) {
my $this_output_id;
my $template = $rule->input_id_template();
if(defined($template)) {
if($self->can('param_substitute')) {
$this_output_id = $self->param_substitute($template);
} else {
die "In order to use input_id_template your RunnableDB has to be derived from Bio::EnsEMBL::Hive::ProcessWithParams\n";
}
my $substituted_template;
if(my $template = $rule->input_id_template()) {
if($self->can('param_substitute')) {
$substituted_template = $self->param_substitute($template);
} else {
$this_output_id = $output_id;
die "In order to use input_id_template your RunnableDB has to be derived from Bio::EnsEMBL::Hive::ProcessWithParams\n";
}
}
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $this_output_id,
-analysis => $rule->to_analysis,
-input_job_id => $self->input_job->dbID, # creator_job's id
%$create_job_options
)) {
if($semaphored_job_id and $propagate_semaphore) {
$job_adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id ); # propagate the semaphore
}
# only add the ones that were indeed created:
push @output_job_ids, $job_id;
my $target_analysis_or_table = $rule->to_analysis();
foreach my $output_id ($substituted_template ? ($substituted_template) : @$output_ids) {
} elsif($semaphored_job_id and !$propagate_semaphore) {
$job_adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # if we didn't succeed in creating the job, fix the semaphore
if($target_analysis_or_table->can('dataflow')) {
my $insert_id = $target_analysis_or_table->dataflow( $output_id );
} else {
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-analysis => $rule->to_analysis,
-input_job_id => $self->input_job->dbID, # creator_job's id
%$create_job_options
)) {
if($semaphored_job_id and $propagate_semaphore) {
$job_adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id ); # propagate the semaphore
}
# only add the ones that were indeed created:
push @output_job_ids, $job_id;
} elsif($semaphored_job_id and !$propagate_semaphore) {
$job_adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # if we didn't succeed in creating the job, fix the semaphore
}
}
}
}
......
......@@ -72,8 +72,7 @@ sub run { # call the function that will compute the stuff
=head2 write_output
Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
Here we first store the result in the 'final_result' table,
and then also dataflow both original multipliers and the result down the branch-1 (in case further analyses will have to deal with the result).
Dataflows both original multipliers and the final result down branch-1, which will be routed into 'final_result' table.
=cut
......@@ -81,22 +80,10 @@ sub run { # call the function that will compute the stuff
sub write_output { # store and dataflow
my $self = shift @_;
my $a_multiplier = $self->param('a_multiplier');
my $b_multiplier = $self->param('b_multiplier');
my $result = $self->param('result');
# store the result:
my $sql = "REPLACE INTO final_result (a_multiplier, b_multiplier, result) VALUES (?, ?, ?) ";
my $sth = $self->db->dbc->prepare($sql);
$sth->execute( $a_multiplier, $b_multiplier, $result );
$sth->finish();
# In order to make it possible to extend the pipeline,
# dataflow the multipliers together with the result:
$self->dataflow_output_id({
'a_multiplier' => $a_multiplier,
'b_multiplier' => $b_multiplier,
'result' => $result,
'a_multiplier' => $self->param('a_multiplier'),
'b_multiplier' => $self->param('b_multiplier'),
'result' => $self->param('result'),
}, 1);
}
......
......@@ -56,20 +56,18 @@ sub run { # call the recursive function that will compute the stuff
=head2 write_output
Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
Here we store the product in 'intermediate_result' table.
Dataflows the intermediate results down branch 1, which will be routed into 'intermediate_result' table.
=cut
sub write_output { # but this time we have something to store
my $self = shift @_;
my $sql = "REPLACE INTO intermediate_result (a_multiplier, digit, result) VALUES (?, ?, ?) ";
my $sth = $self->db->dbc->prepare($sql);
$sth->execute(
$self->param('a_multiplier'),
$self->param('digit'),
$self->param('result')
);
$self->dataflow_output_id( {
'a_multiplier' => $self->param('a_multiplier'),
'digit' => $self->param('digit'),
'result' => $self->param('result')
}, 1);
}
=head2 _rec_multiply
......