Commit 30c092f5 authored by Leo Gordon's avatar Leo Gordon
Browse files

added schema & API support for accumulated dataflow

parent 9784eb90
=pod
=head1 NAME
Bio::EnsEMBL::Hive::Accumulator
=head1 SYNOPSIS
=head1 DESCRIPTION
A data container object that defines parameters for accumulated dataflow.
This object is generated from specially designed datalow URLs.
=head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut
package Bio::EnsEMBL::Hive::Accumulator;
use strict;
use Scalar::Util ('weaken');
use Bio::EnsEMBL::Utils::Argument; # import 'rearrange()'
sub new {
my $class = shift @_;
my $self = bless {}, $class;
my ($adaptor, $struct_name, $signature_template) =
rearrange([qw(adaptor struct_name signature_template) ], @_);
$self->adaptor($adaptor) if(defined($adaptor));
$self->struct_name($struct_name) if(defined($struct_name));
$self->signature_template($signature_template) if(defined($signature_template));
return $self;
}
sub adaptor {
my $self = shift @_;
if(@_) {
$self->{'_adaptor'} = shift @_;
weaken $self->{'_adaptor'};
}
return $self->{'_adaptor'};
}
sub struct_name {
my $self = shift @_;
if(@_) {
$self->{'_struct_name'} = shift @_;
}
return $self->{'_struct_name'};
}
sub signature_template {
my $self = shift @_;
if(@_) {
$self->{'_signature_template'} = shift @_;
}
return $self->{'_signature_template'};
}
sub url {
my $self = shift @_;
my $ref_dba = shift @_; # if reference dba is the same as 'our' dba, a shorter url can be generated
if(my $adaptor = $self->adaptor) {
my $dbc_prefix = ($adaptor->db == $ref_dba) ? ':///' : $adaptor->db->dbc->url();
return $dbc_prefix .'/accu?'.$self->struct_name(). '=' . $self->signature_template();
} else {
return;
}
}
sub dataflow {
my ( $self, $output_ids, $emitting_job ) = @_;
my $sending_job_id = $emitting_job->dbID();
my $receiving_job_id = $emitting_job->semaphored_job_id() || die "No semaphored job, cannot perform accumulated dataflow";
my $struct_name = $self->struct_name();
my $signature_template = $self->signature_template();
my @rows = ();
foreach my $output_id (@$output_ids) {
my $key_signature = $signature_template;
$key_signature=~s/(\w+)/$output_id->{$1}/eg; # FIXME: could be possibly extended in future to also use $self->param() ?
push @rows, {
'sending_job_id' => $sending_job_id,
'receiving_job_id' => $receiving_job_id,
'struct_name' => $struct_name,
'key_signature' => $key_signature,
'value' => $output_id->{$struct_name},
};
}
$self->adaptor->store( \@rows );
}
1;
......@@ -283,7 +283,7 @@ sub dataflow_output_id {
if($target_analysis_or_table->can('dataflow')) {
$target_analysis_or_table->dataflow( $output_ids_for_this_rule );
$target_analysis_or_table->dataflow( $output_ids_for_this_rule, $self );
} else {
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::DBSQL::AccumulatorAdaptor
=head1 SYNOPSIS
$dba->get_AccumulatorAdaptor->store( \@rows );
=head1 DESCRIPTION
This is currently an "objectless" adaptor for building accumulated structures.
=head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut
package Bio::EnsEMBL::Hive::DBSQL::AccumulatorAdaptor;
use strict;
use base ('Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor');
sub default_table_name {
return 'accu';
}
sub fetch_structures_for_job_id {
my ($self, $receiving_job_id) = @_;
my $sql = 'SELECT struct_name, key_signature, value FROM accu WHERE receiving_job_id=?';
my $sth = $self->prepare( $sql );
$sth->execute( $receiving_job_id );
my %structures = ();
ROW: while(my ($struct_name, $key_signature, $value) = $sth->fetchrow() ) {
my $sptr = \$structures{$struct_name};
while( $key_signature=~/(?:(?:\[(\w*)\])|(?:\{(\w*)\}))/g) {
my ($array_index, $hash_key) = ($1, $2);
if(defined($array_index)) {
unless(length($array_index)) {
$array_index = scalar(@{$$sptr||[]});
}
$sptr = \$$sptr->[$array_index];
} elsif(defined($hash_key)) {
if(length($hash_key)) {
$sptr = \$$sptr->{$hash_key};
} else {
$sptr = \$$sptr->{$value};
$$sptr++;
next ROW;
}
}
}
$$sptr = $value;
}
$sth->finish;
return \%structures;
}
1;
......@@ -70,7 +70,6 @@ sub get_available_adaptors {
my %pairs = (
# Core adaptors extended with Hive stuff:
'MetaContainer' => 'Bio::EnsEMBL::Hive::DBSQL::MetaContainer',
'Analysis' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisAdaptor',
# "new" Hive adaptors (sharing the same fetching/storing code inherited from the BaseAdaptor class) :
'AnalysisCtrlRule' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor',
......@@ -79,12 +78,14 @@ sub get_available_adaptors {
'ResourceClass' => 'Bio::EnsEMBL::Hive::DBSQL::ResourceClassAdaptor',
'LogMessage' => 'Bio::EnsEMBL::Hive::DBSQL::LogMessageAdaptor',
'NakedTable' => 'Bio::EnsEMBL::Hive::DBSQL::NakedTableAdaptor',
'Analysis' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisAdaptor',
'Queen' => 'Bio::EnsEMBL::Hive::Queen',
'AnalysisData' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisDataAdaptor',
# "old" Hive adaptors (having their own fetching/storing code) :
'Queen' => 'Bio::EnsEMBL::Hive::Queen',
'AnalysisJob' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor',
'AnalysisStats' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor',
'AnalysisData' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisDataAdaptor',
'Accumulator' => 'Bio::EnsEMBL::Hive::DBSQL::AccumulatorAdaptor',
);
return \%pairs;
}
......
......@@ -44,10 +44,9 @@ use strict;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::Extensions;
use Bio::EnsEMBL::DBSQL::AnalysisAdaptor;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::Extensions;
use Bio::EnsEMBL::Hive::Accumulator;
use Bio::EnsEMBL::Hive::NakedTable;
#use Data::Dumper;
......@@ -90,7 +89,7 @@ sub fetch {
Bio::EnsEMBL::Hive::URLFactory->new(); # make sure global instance is created
if( my ($conn, $driver, $user, $pass, $host, $port, $dbname, $table_name, $tparam_name, $tparam_value, $conn_param_string) =
$url =~ m{^((\w*)://(?:(\w+)(?:\:([^/\@]*))?\@)?(?:([\w\-\.]+)(?:\:(\d+))?)?/([\w\-]*))(?:/(\w+)(?:\?(\w+)=(\w+))?)?((?:;(\w+)=(\w+))*)$} ) {
$url =~ m{^((\w*)://(?:(\w+)(?:\:([^/\@]*))?\@)?(?:([\w\-\.]+)(?:\:(\d+))?)?/([\w\-]*))(?:/(\w+)(?:\?(\w+)=([\w\[\]\{\}]+))?)?((?:;(\w+)=(\w+))*)$} ) {
my %conn_param = split(/[;=]/, 'type=hive;discon=0'.$conn_param_string );
......@@ -111,6 +110,14 @@ sub fetch {
return $dba->get_AnalysisJobAdaptor->fetch_by_url_query($tparam_name, $tparam_value);
} elsif($table_name eq 'accu') {
return Bio::EnsEMBL::Hive::Accumulator->new(
-adaptor => $dba->get_AccumulatorAdaptor,
-struct_name => $tparam_name,
-signature_template => $tparam_value,
);
} else {
return Bio::EnsEMBL::Hive::NakedTable->new(
......
......@@ -74,11 +74,13 @@ use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::Hive::Extensions;
use Bio::EnsEMBL::Hive::Limiter;
use Bio::EnsEMBL::Hive::Process;
use Bio::EnsEMBL::Hive::DBSQL::AccumulatorAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
use Bio::EnsEMBL::Hive::Utils::RedirectStack;
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
use Bio::EnsEMBL::Hive::Utils ('stringify');
use base ( 'Bio::EnsEMBL::Storable', # inherit dbID(), adaptor() and new() methods
);
......@@ -648,6 +650,7 @@ sub run_one_batch {
my $jobs_done_here = 0;
my $accu_adaptor = $self->adaptor->db->get_AccumulatorAdaptor;
my $max_retry_count = $self->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
$self->adaptor->check_in_worker( $self );
......@@ -675,7 +678,18 @@ sub run_one_batch {
$self->adaptor->db->dbc->query_count(0);
$job_stopwatch->restart();
$job->param_init( $runnable_object->strict_hash_format(), $runnable_object->param_defaults(), $self->adaptor->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() );
$job->param_init(
$runnable_object->strict_hash_format(),
$runnable_object->param_defaults(),
$self->adaptor->db->get_MetaContainer->get_param_hash(),
$self->analysis->parameters(),
$job->input_id(),
$accu_adaptor->fetch_structures_for_job_id( $job->dbID ), # FIXME: or should we pass in the original hash to be extended by pushing?
);
if($self->debug()) {
print "\nunsubstituted_param_hash = ".stringify($job->{'_unsubstituted_param_hash'})."\n";
}
$runnable_object->input_job( $job ); # "take" the job
$job_partial_timing = $runnable_object->life_cycle();
......
......@@ -15,6 +15,8 @@ ALTER TABLE job ADD FOREIGN KEY (prev_job_id)
ALTER TABLE job ADD FOREIGN KEY (semaphored_job_id) REFERENCES job(job_id);
ALTER TABLE log_message ADD FOREIGN KEY (job_id) REFERENCES job(job_id);
ALTER TABLE job_file ADD FOREIGN KEY (job_id) REFERENCES job(job_id);
ALTER TABLE accu ADD FOREIGN KEY (sending_job_id) REFERENCES job(job_id);
ALTER TABLE accu ADD FOREIGN KEY (receiving_job_id) REFERENCES job(job_id);
ALTER TABLE resource_description ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE analysis_base ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
......
......@@ -351,6 +351,30 @@ CREATE TABLE job_file (
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
/**
@table accu
@colour #1D73DA
@desc Accumulator for funneled dataflow.
@column sending_job_id semaphoring job in the "box"
@column receiving_job_id semaphored job outside the "box"
@column struct_name name of the structured parameter
@column key_signature locates the part of the structured parameter
@column value value of the part
*/
CREATE TABLE accu (
sending_job_id int(10),
receiving_job_id int(10) NOT NULL,
struct_name varchar(255) NOT NULL,
key_signature varchar(255) NOT NULL,
value varchar(255)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
/**
@table analysis_data
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment