Commit b5925855 authored by Leo Gordon's avatar Leo Gordon
Browse files

"parameter stack" implementation using two extra fields in job table. Accu...

"parameter stack" implementation using two extra fields in job table. Accu content intended for any job_id has preference over Input_id content for the same job.
parent b9a49d69
......@@ -44,11 +44,13 @@ sub new {
my $self = $class->SUPER::new( @_ ); # deal with Storable stuff
my($analysis_id, $input_id, $worker_id, $status, $retry_count, $completed, $runtime_msec, $query_count, $semaphore_count, $semaphored_job_id) =
rearrange([qw(analysis_id input_id worker_id status retry_count completed runtime_msec query_count semaphore_count semaphored_job_id) ], @_);
my($analysis_id, $input_id, $param_id_stack, $accu_id_stack, $worker_id, $status, $retry_count, $completed, $runtime_msec, $query_count, $semaphore_count, $semaphored_job_id) =
rearrange([qw(analysis_id input_id param_id_stack accu_id_stack worker_id status retry_count completed runtime_msec query_count semaphore_count semaphored_job_id) ], @_);
$self->analysis_id($analysis_id) if(defined($analysis_id));
$self->input_id($input_id) if(defined($input_id));
$self->param_id_stack($param_id_stack) if(defined($param_id_stack));
$self->accu_id_stack($accu_id_stack) if(defined($accu_id_stack));
$self->worker_id($worker_id) if(defined($worker_id));
$self->status($status) if(defined($status));
$self->retry_count($retry_count) if(defined($retry_count));
......@@ -62,91 +64,101 @@ sub new {
}
sub input_id {
my $self = shift;
$self->{'_input_id'} = shift if(@_);
return $self->{'_input_id'};
sub analysis_id {
my $self = shift;
$self->{'_analysis_id'} = shift if(@_);
return $self->{'_analysis_id'};
}
sub dataflow_rules { # if ever set will prevent the Job from fetching rules from the DB
my $self = shift @_;
my $branch_name_or_code = shift @_;
my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);
$self->{'_dataflow_rules'}{$branch_code} = shift if(@_);
return $self->{'_dataflow_rules'}
? ( $self->{'_dataflow_rules'}{$branch_code} || [] )
: $self->adaptor->db->get_DataflowRuleAdaptor->fetch_all_by_from_analysis_id_and_branch_code($self->analysis_id, $branch_code);
sub input_id {
my $self = shift;
$self->{'_input_id'} = shift if(@_);
return $self->{'_input_id'};
}
sub worker_id {
my $self = shift;
$self->{'_worker_id'} = shift if(@_);
return $self->{'_worker_id'};
sub param_id_stack {
my $self = shift;
$self->{'_param_id_stack'} = shift if(@_);
return $self->{'_param_id_stack'};
}
sub analysis_id {
my $self = shift;
$self->{'_analysis_id'} = shift if(@_);
return $self->{'_analysis_id'};
sub accu_id_stack {
my $self = shift;
$self->{'_accu_id_stack'} = shift if(@_);
return $self->{'_accu_id_stack'};
}
sub status {
my $self = shift;
$self->{'_status'} = shift if(@_);
return $self->{'_status'};
sub worker_id {
my $self = shift;
$self->{'_worker_id'} = shift if(@_);
return $self->{'_worker_id'};
}
sub update_status {
my ($self, $status ) = @_;
return unless($self->adaptor);
$self->status($status);
$self->adaptor->update_status($self);
sub status {
my $self = shift;
$self->{'_status'} = shift if(@_);
return $self->{'_status'};
}
sub retry_count {
my $self = shift;
$self->{'_retry_count'} = shift if(@_);
$self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'}));
return $self->{'_retry_count'};
my $self = shift;
$self->{'_retry_count'} = shift if(@_);
$self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'}));
return $self->{'_retry_count'};
}
sub completed {
my $self = shift;
$self->{'_completed'} = shift if(@_);
return $self->{'_completed'};
my $self = shift;
$self->{'_completed'} = shift if(@_);
return $self->{'_completed'};
}
sub runtime_msec {
my $self = shift;
$self->{'_runtime_msec'} = shift if(@_);
$self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
return $self->{'_runtime_msec'};
my $self = shift;
$self->{'_runtime_msec'} = shift if(@_);
$self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
return $self->{'_runtime_msec'};
}
sub query_count {
my $self = shift;
$self->{'_query_count'} = shift if(@_);
$self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
return $self->{'_query_count'};
my $self = shift;
$self->{'_query_count'} = shift if(@_);
$self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
return $self->{'_query_count'};
}
sub semaphore_count {
my $self = shift;
$self->{'_semaphore_count'} = shift if(@_);
$self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'}));
return $self->{'_semaphore_count'};
my $self = shift;
$self->{'_semaphore_count'} = shift if(@_);
$self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'}));
return $self->{'_semaphore_count'};
}
sub semaphored_job_id {
my $self = shift;
$self->{'_semaphored_job_id'} = shift if(@_);
return $self->{'_semaphored_job_id'};
my $self = shift;
$self->{'_semaphored_job_id'} = shift if(@_);
return $self->{'_semaphored_job_id'};
}
sub update_status {
my ($self, $status ) = @_;
return unless($self->adaptor);
$self->status($status);
$self->adaptor->update_status($self);
}
sub dataflow_rules { # if ever set will prevent the Job from fetching rules from the DB
my $self = shift @_;
my $branch_name_or_code = shift @_;
my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);
$self->{'_dataflow_rules'}{$branch_code} = shift if(@_);
return $self->{'_dataflow_rules'}
? ( $self->{'_dataflow_rules'}{$branch_code} || [] )
: $self->adaptor->db->get_DataflowRuleAdaptor->fetch_all_by_from_analysis_id_and_branch_code($self->analysis_id, $branch_code);
}
sub stdout_file {
......@@ -161,6 +173,14 @@ sub stderr_file {
return $self->{'_stderr_file'};
}
sub accu_hash {
my $self = shift;
$self->{'_accu_hash'} = shift if(@_);
$self->{'_accu_hash'} = {} unless(defined($self->{'_accu_hash'}));
return $self->{'_accu_hash'};
}
=head2 autoflow
Title : autoflow
......@@ -253,7 +273,21 @@ sub fan_cache { # a self-initializing getter (no setting)
sub dataflow_output_id {
my ($self, $output_ids, $branch_name_or_code, $create_job_options) = @_;
$output_ids ||= [ $self->input_id() ]; # replicate the input_id in the branch_code's output by default
my $input_id = $self->input_id();
my $param_id_stack = $self->param_id_stack();
my $accu_id_stack = $self->accu_id_stack();
my $hive_use_param_stack = $self->adaptor->db->hive_use_param_stack();
if($hive_use_param_stack) {
if($input_id and ($input_id ne '{}')) { # add the parent to the param_id_stack if it had non-trivial extra parameters
$param_id_stack = ($param_id_stack ? $param_id_stack.',' : '').$self->dbID();
}
if(scalar(keys %{$self->accu_hash()})) { # add the parent to the accu_id_stack if it had "own" accumulator
$accu_id_stack = ($accu_id_stack ? $accu_id_stack.',' : '').$self->dbID();
}
}
$output_ids ||= [ $hive_use_param_stack ? {} : $input_id ]; # by default replicate the parameters of the parent in the child
$output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY'); # force previously used single values into an arrayref
if($create_job_options) {
......@@ -306,6 +340,8 @@ sub dataflow_output_id {
} elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( # if a semaphored funnel job creation succeeded, ...
-input_id => $output_ids_for_this_rule->[0],
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
-analysis => $target_analysis_or_table,
-prev_job => $self,
-semaphore_count => scalar(@$fan_cache), # "pre-increase" the semaphore count before creating the dependent jobs
......@@ -319,6 +355,8 @@ sub dataflow_output_id {
my ($output_id, $fan_analysis) = @$pair;
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
-analysis => $fan_analysis,
-prev_job => $self,
-semaphored_job_id => $funnel_job_id, # by passing this parameter we request not to propagate semaphores
......@@ -341,6 +379,8 @@ sub dataflow_output_id {
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
-analysis => $target_analysis_or_table,
-prev_job => $self,
-semaphored_job_id => $self->semaphored_job_id(), # propagate parent's semaphore if any
......
......@@ -33,41 +33,46 @@ sub default_table_name {
}
sub fetch_structures_for_job_id {
my ($self, $receiving_job_id) = @_;
my $sql = 'SELECT struct_name, key_signature, value FROM accu WHERE receiving_job_id=?';
my $sth = $self->prepare( $sql );
$sth->execute( $receiving_job_id );
sub fetch_structures_for_job_ids {
my ($self, $job_ids_csv, $id_scale, $id_offset) = @_;
$id_scale ||= 1;
$id_offset ||= 0;
my %structures = ();
ROW: while(my ($struct_name, $key_signature, $stringified_value) = $sth->fetchrow() ) {
my $value = destringify($stringified_value);
my $sptr = \$structures{$struct_name};
while( $key_signature=~/(?:(?:\[(\w*)\])|(?:\{(\w*)\}))/g) {
my ($array_index, $hash_key) = ($1, $2);
if(defined($array_index)) {
unless(length($array_index)) {
$array_index = scalar(@{$$sptr||[]});
}
$sptr = \$$sptr->[$array_index];
} elsif(defined($hash_key)) {
if(length($hash_key)) {
$sptr = \$$sptr->{$hash_key};
} else {
$sptr = \$$sptr->{$value};
$$sptr++;
next ROW;
if( $job_ids_csv ) {
my $sql = "SELECT receiving_job_id, struct_name, key_signature, value FROM accu WHERE receiving_job_id in ($job_ids_csv)";
my $sth = $self->prepare( $sql );
$sth->execute();
ROW: while(my ($receiving_job_id, $struct_name, $key_signature, $stringified_value) = $sth->fetchrow_array() ) {
my $value = destringify($stringified_value);
my $sptr = \$structures{$receiving_job_id * $id_scale + $id_offset}{$struct_name};
while( $key_signature=~/(?:(?:\[(\w*)\])|(?:\{(\w*)\}))/g) {
my ($array_index, $hash_key) = ($1, $2);
if(defined($array_index)) {
unless(length($array_index)) {
$array_index = scalar(@{$$sptr||[]});
}
$sptr = \$$sptr->[$array_index];
} elsif(defined($hash_key)) {
if(length($hash_key)) {
$sptr = \$$sptr->{$hash_key};
} else {
$sptr = \$$sptr->{$value};
$$sptr++;
next ROW;
}
}
}
$$sptr = $value;
}
$$sptr = $value;
$sth->finish;
}
$sth->finish;
return \%structures;
}
......
......@@ -78,8 +78,8 @@ use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
sub CreateNewJob {
my ($class, @args) = @_;
my ($input_id, $analysis, $prev_job, $prev_job_id, $semaphore_count, $semaphored_job_id, $push_new_semaphore) =
rearrange([qw(input_id analysis prev_job prev_job_id semaphore_count semaphored_job_id push_new_semaphore)], @args);
my ($input_id, $param_id_stack, $accu_id_stack, $analysis, $prev_job, $prev_job_id, $semaphore_count, $semaphored_job_id, $push_new_semaphore) =
rearrange([qw(input_id param_id_stack accu_id_stack analysis prev_job prev_job_id semaphore_count semaphored_job_id push_new_semaphore)], @args);
throw("must define input_id") unless($input_id);
throw("must define analysis") unless($analysis);
......@@ -89,16 +89,30 @@ sub CreateNewJob {
unless($analysis->adaptor and $analysis->adaptor->db);
throw("Please specify prev_job object instead of prev_job_id if available") if ($prev_job_id); # 'obsolete' message
$prev_job_id = $prev_job && $prev_job->dbID();
$prev_job_id = $prev_job && $prev_job->dbID();
if(ref($input_id)) { # let's do the Perl hash stringification centrally rather than in many places:
$input_id = stringify($input_id);
}
if(ref($input_id)) { # let's do the Perl hash stringification centrally rather than in many places:
$input_id = stringify($input_id);
}
if(length($input_id) >= 255) {
print "input_id is '$input_id', length = ".length($input_id)."\n";
my $extended_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($input_id);
$input_id = "_extended_data_id $extended_data_id";
}
if(length($param_id_stack) >= 64) {
print "param_id_stack is '$param_id_stack', length = ".length($param_id_stack)."\n";
my $extended_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($param_id_stack);
$param_id_stack = "_extended_data_id $extended_data_id";
}
if(length($accu_id_stack) >= 64) {
print "accu_id_stack is '$accu_id_stack', length = ".length($accu_id_stack)."\n";
my $extended_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($accu_id_stack);
$accu_id_stack = "_extended_data_id $extended_data_id";
}
if(length($input_id) >= 255) {
my $input_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($input_id);
$input_id = "_ext_input_analysis_data_id $input_data_id";
}
$semaphore_count ||= 0;
......@@ -111,11 +125,11 @@ sub CreateNewJob {
$dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($semaphored_job_id and ($dbc->driver ne 'sqlite'));
my $sql = qq{$insertion_method INTO job
(input_id, prev_job_id,analysis_id,status,semaphore_count,semaphored_job_id)
VALUES (?,?,?,?,?,?)};
(input_id, param_id_stack, accu_id_stack, prev_job_id,analysis_id,status,semaphore_count,semaphored_job_id)
VALUES (?,?,?,?,?,?,?,?)};
my $sth = $dbc->prepare($sql);
my @values = ($input_id, $prev_job_id, $analysis_id, $job_status, $semaphore_count, $semaphored_job_id);
my @values = ($input_id, $param_id_stack || '', $accu_id_stack || '', $prev_job_id, $analysis_id, $job_status, $semaphore_count, $semaphored_job_id);
my $return_code = $sth->execute(@values)
# using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
......@@ -312,6 +326,8 @@ sub _columns {
j.prev_job_id
j.analysis_id
j.input_id
j.param_id_stack
j.accu_id_stack
j.worker_id
j.status
j.retry_count
......@@ -334,14 +350,25 @@ sub _objs_from_sth {
while ($sth->fetch()) {
my $input_id = ($column{'input_id'} =~ /_ext_input_analysis_data_id (\d+)/)
my $input_id = ($column{'input_id'} =~ /_ext(?:\w+)_data_id (\d+)/)
? $self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1)
: $column{'input_id'};
my $param_id_stack = ($column{'param_id_stack'} =~ /_ext(?:\w+)_data_id (\d+)/)
? $self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1)
: $column{'param_id_stack'};
my $accu_id_stack = ($column{'accu_id_stack'} =~ /_ext(?:\w+)_data_id (\d+)/)
? $self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1)
: $column{'accu_id_stack'};
my $job = Bio::EnsEMBL::Hive::AnalysisJob->new(
-dbID => $column{'job_id'},
-analysis_id => $column{'analysis_id'},
-input_id => $input_id,
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
-worker_id => $column{'worker_id'},
-status => $column{'status'},
-retry_count => $column{'retry_count'},
......@@ -733,5 +760,26 @@ sub balance_semaphores {
}
sub fetch_input_ids_for_job_ids {
my ($self, $job_ids_csv, $id_scale, $id_offset) = @_;
$id_scale ||= 1;
$id_offset ||= 0;
my %input_ids = ();
if( $job_ids_csv ) {
my $sql = "SELECT job_id, input_id FROM job WHERE job_id in ($job_ids_csv)";
my $sth = $self->prepare( $sql );
$sth->execute();
while(my ($job_id, $input_id) = $sth->fetchrow_array() ) {
$input_ids{$job_id * $id_scale + $id_offset} = $input_id;
}
}
return \%input_ids;
}
1;
=pod
=head1 NAME
Bio::EnsEMBL::Hive::PipeConfig::LongMultSt_conf;
=head1 SYNOPSIS
# Example 1: specifying only the mandatory option (numbers to be multiplied are taken from defaults)
init_pipeline.pl Bio::EnsEMBL::Hive::PipeConfig::LongMultSt_conf -password <mypass>
# Example 2: specifying the mandatory options as well as overriding the default numbers to be multiplied:
init_pipeline.pl Bio::EnsEMBL::Hive::PipeConfig::LongMultSt_conf -password <mypass> -first_mult 2344556 -second_mult 777666555
# Example 3: do not re-create the database, just load another multiplicaton task into an existing one:
init_pipeline.pl Bio::EnsEMBL::Hive::PipeConfig::LongMultSt_conf -job_topup -password <mypass> -first_mult 1111222233334444 -second_mult 38578377835
=head1 DESCRIPTION
This is an experimental version of LongMult_conf with hive_use_param_stack switched on.
This is the PipeConfig file for the long multiplication pipeline example.
The main point of this pipeline is to provide an example of how to write Hive Runnables and link them together into a pipeline.
Please refer to Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf module to understand the interface implemented here.
The setting. Let's assume we are given two loooooong numbers to multiply. Reeeeally long.
So long that they do not fit into registers of the CPU and should be multiplied digit-by-digit.
For the purposes of this example we also assume this task is very computationally intensive and has to be done in parallel.
The long multiplication pipeline consists of three "analyses" (types of tasks): 'take_b_apart', 'part_multiply' and 'add_together'
that we will be using to examplify various features of the Hive.
* A 'take_b_apart' job takes in two string parameters, 'a_multiplier' and 'b_multiplier',
takes the second one apart into digits, finds what _different_ digits are there,
creates several jobs of the 'part_multiply' analysis and one job of 'add_together' analysis.
* A 'part_multiply' job takes in 'a_multiplier' and 'digit', multiplies them and accumulates the result in 'partial_product' accumulator.
* An 'add_together' job waits for the first two analyses to complete,
takes in 'a_multiplier', 'b_multiplier' and 'partial_product' hash and produces the final result in 'final_result' table.
Please see the implementation details in Runnable modules themselves.
=head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut
package Bio::EnsEMBL::Hive::PipeConfig::LongMultSt_conf;
use strict;
use warnings;
use base ('Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf'); # All Hive databases configuration files should inherit from HiveGeneric, directly or indirectly
=head2 default_options
Description : Implements default_options() interface method of Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf that is used to initialize default options.
In addition to the standard things it defines two options, 'first_mult' and 'second_mult' that are supposed to contain the long numbers to be multiplied.
=cut
sub default_options {
my ($self) = @_;
return {
%{ $self->SUPER::default_options() }, # inherit other stuff from the base class
'pipeline_name' => 'long_mult', # name used by the beekeeper to prefix job names on the farm
'first_mult' => '9650156169', # the actual numbers to be multiplied can also be specified from the command line
'second_mult' => '327358788',
'take_time' => 1, # how much time (in seconds) should each job take -- to slow things down
};
}
=head2 pipeline_create_commands
Description : Implements pipeline_create_commands() interface method of Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf that lists the commands that will create and set up the Hive database.
In addition to the standard creation of the database and populating it with Hive tables and procedures it also creates two pipeline-specific tables used by Runnables to communicate.
=cut
sub pipeline_create_commands {
my ($self) = @_;
return [
@{$self->SUPER::pipeline_create_commands}, # inheriting database and hive tables' creation
# additional tables needed for long multiplication pipeline's operation:
'db_conn.pl -url '.$self->dbconn_2_url('pipeline_db').' -sql '
."'CREATE TABLE final_result (a_multiplier char(40) NOT NULL, b_multiplier char(40) NOT NULL, result char(80) NOT NULL, PRIMARY KEY (a_multiplier, b_multiplier))'",
];
}
=head2 pipeline_wide_parameters
Description : Interface method that should return a hash of pipeline_wide_parameter_name->pipeline_wide_parameter_value pairs.
The value doesn't have to be a scalar, can be any Perl structure now (will be stringified and de-stringified automagically).
Please see existing PipeConfig modules for examples.
=cut
sub pipeline_wide_parameters {
my ($self) = @_;
return {
%{$self->SUPER::pipeline_wide_parameters}, # here we inherit anything from the base class
'take_time' => $self->o('take_time'),
};
}
sub hive_meta_table {
my ($self) = @_;
return {
%{$self->SUPER::hive_meta_table}, # here we inherit anything from the base class
'hive_use_param_stack' => 1, # switch on the new param_stack mechanism
};
}
=head2 pipeline_analyses
Description : Implements pipeline_analyses() interface method of Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf that defines the structure of the pipeline: analyses, jobs, rules, etc.
Here it defines three analyses:
* 'take_b_apart' with two jobs (multiply 'first_mult' by 'second_mult' and vice versa - to check the commutativity of multiplivation).
Each job will dataflow (create more jobs) via branch #2 into 'part_multiply' and via branch #1 into 'add_together'.
* 'part_multiply' initially without jobs (they will flow from 'take_b_apart')
* 'add_together' initially without jobs (they will flow from 'take_b_apart').
All 'add_together' jobs will wait for completion of 'part_multiply' jobs before their own execution (to ensure all data is available).
There are two control modes in this pipeline:
A. The default mode is to use the '2' and '1' dataflow rules from 'take_b_apart' analysis and a -wait_for rule in 'add_together' analysis for analysis-wide synchronization.
B. The semaphored mode is to use '2->A' and 'A->1' semaphored dataflow rules from 'take_b_apart' instead, and comment out the analysis-wide -wait_for rule, relying on semaphores.