From b59258556b7dcc8a10d89ba9f722f26b1ee4fde8 Mon Sep 17 00:00:00 2001 From: Leo Gordon <lg4@ebi.ac.uk> Date: Wed, 21 Aug 2013 14:53:11 +0100 Subject: [PATCH] "parameter stack" implementation using two extra fields in job table. Accu content intended for any job_id has preference over Input_id content for the same job. --- modules/Bio/EnsEMBL/Hive/AnalysisJob.pm | 160 +++++++++------ .../EnsEMBL/Hive/DBSQL/AccumulatorAdaptor.pm | 61 +++--- .../EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm | 76 +++++-- .../Hive/PipeConfig/LongMultSt_conf.pm | 194 ++++++++++++++++++ modules/Bio/EnsEMBL/Hive/Worker.pm | 29 ++- sql/patch_2013-08-21.mysql | 12 ++ sql/patch_2013-08-21.pgsql | 15 ++ sql/tables.mysql | 8 +- sql/tables.pgsql | 6 +- sql/tables.sqlite | 4 +- 10 files changed, 448 insertions(+), 117 deletions(-) create mode 100644 modules/Bio/EnsEMBL/Hive/PipeConfig/LongMultSt_conf.pm create mode 100644 sql/patch_2013-08-21.mysql create mode 100644 sql/patch_2013-08-21.pgsql diff --git a/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm b/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm index ae3905f28..fa5fc0eb5 100644 --- a/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm +++ b/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm @@ -44,11 +44,13 @@ sub new { my $self = $class->SUPER::new( @_ ); # deal with Storable stuff - my($analysis_id, $input_id, $worker_id, $status, $retry_count, $completed, $runtime_msec, $query_count, $semaphore_count, $semaphored_job_id) = - rearrange([qw(analysis_id input_id worker_id status retry_count completed runtime_msec query_count semaphore_count semaphored_job_id) ], @_); + my($analysis_id, $input_id, $param_id_stack, $accu_id_stack, $worker_id, $status, $retry_count, $completed, $runtime_msec, $query_count, $semaphore_count, $semaphored_job_id) = + rearrange([qw(analysis_id input_id param_id_stack accu_id_stack worker_id status retry_count completed runtime_msec query_count semaphore_count semaphored_job_id) ], @_); $self->analysis_id($analysis_id) if(defined($analysis_id)); $self->input_id($input_id) if(defined($input_id)); + $self->param_id_stack($param_id_stack) if(defined($param_id_stack)); + $self->accu_id_stack($accu_id_stack) if(defined($accu_id_stack)); $self->worker_id($worker_id) if(defined($worker_id)); $self->status($status) if(defined($status)); $self->retry_count($retry_count) if(defined($retry_count)); @@ -62,91 +64,101 @@ sub new { } -sub input_id { - my $self = shift; - $self->{'_input_id'} = shift if(@_); - return $self->{'_input_id'}; +sub analysis_id { + my $self = shift; + $self->{'_analysis_id'} = shift if(@_); + return $self->{'_analysis_id'}; } - -sub dataflow_rules { # if ever set will prevent the Job from fetching rules from the DB - my $self = shift @_; - my $branch_name_or_code = shift @_; - - my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code); - - $self->{'_dataflow_rules'}{$branch_code} = shift if(@_); - - return $self->{'_dataflow_rules'} - ? ( $self->{'_dataflow_rules'}{$branch_code} || [] ) - : $self->adaptor->db->get_DataflowRuleAdaptor->fetch_all_by_from_analysis_id_and_branch_code($self->analysis_id, $branch_code); +sub input_id { + my $self = shift; + $self->{'_input_id'} = shift if(@_); + return $self->{'_input_id'}; } - -sub worker_id { - my $self = shift; - $self->{'_worker_id'} = shift if(@_); - return $self->{'_worker_id'}; +sub param_id_stack { + my $self = shift; + $self->{'_param_id_stack'} = shift if(@_); + return $self->{'_param_id_stack'}; } - -sub analysis_id { - my $self = shift; - $self->{'_analysis_id'} = shift if(@_); - return $self->{'_analysis_id'}; +sub accu_id_stack { + my $self = shift; + $self->{'_accu_id_stack'} = shift if(@_); + return $self->{'_accu_id_stack'}; } -sub status { - my $self = shift; - $self->{'_status'} = shift if(@_); - return $self->{'_status'}; +sub worker_id { + my $self = shift; + $self->{'_worker_id'} = shift if(@_); + return $self->{'_worker_id'}; } -sub update_status { - my ($self, $status ) = @_; - return unless($self->adaptor); - $self->status($status); - $self->adaptor->update_status($self); +sub status { + my $self = shift; + $self->{'_status'} = shift if(@_); + return $self->{'_status'}; } sub retry_count { - my $self = shift; - $self->{'_retry_count'} = shift if(@_); - $self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'})); - return $self->{'_retry_count'}; + my $self = shift; + $self->{'_retry_count'} = shift if(@_); + $self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'})); + return $self->{'_retry_count'}; } sub completed { - my $self = shift; - $self->{'_completed'} = shift if(@_); - return $self->{'_completed'}; + my $self = shift; + $self->{'_completed'} = shift if(@_); + return $self->{'_completed'}; } sub runtime_msec { - my $self = shift; - $self->{'_runtime_msec'} = shift if(@_); - $self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'})); - return $self->{'_runtime_msec'}; + my $self = shift; + $self->{'_runtime_msec'} = shift if(@_); + $self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'})); + return $self->{'_runtime_msec'}; } sub query_count { - my $self = shift; - $self->{'_query_count'} = shift if(@_); - $self->{'_query_count'} = 0 unless(defined($self->{'_query_count'})); - return $self->{'_query_count'}; + my $self = shift; + $self->{'_query_count'} = shift if(@_); + $self->{'_query_count'} = 0 unless(defined($self->{'_query_count'})); + return $self->{'_query_count'}; } sub semaphore_count { - my $self = shift; - $self->{'_semaphore_count'} = shift if(@_); - $self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'})); - return $self->{'_semaphore_count'}; + my $self = shift; + $self->{'_semaphore_count'} = shift if(@_); + $self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'})); + return $self->{'_semaphore_count'}; } sub semaphored_job_id { - my $self = shift; - $self->{'_semaphored_job_id'} = shift if(@_); - return $self->{'_semaphored_job_id'}; + my $self = shift; + $self->{'_semaphored_job_id'} = shift if(@_); + return $self->{'_semaphored_job_id'}; +} + + +sub update_status { + my ($self, $status ) = @_; + return unless($self->adaptor); + $self->status($status); + $self->adaptor->update_status($self); +} + +sub dataflow_rules { # if ever set will prevent the Job from fetching rules from the DB + my $self = shift @_; + my $branch_name_or_code = shift @_; + + my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code); + + $self->{'_dataflow_rules'}{$branch_code} = shift if(@_); + + return $self->{'_dataflow_rules'} + ? ( $self->{'_dataflow_rules'}{$branch_code} || [] ) + : $self->adaptor->db->get_DataflowRuleAdaptor->fetch_all_by_from_analysis_id_and_branch_code($self->analysis_id, $branch_code); } sub stdout_file { @@ -161,6 +173,14 @@ sub stderr_file { return $self->{'_stderr_file'}; } +sub accu_hash { + my $self = shift; + $self->{'_accu_hash'} = shift if(@_); + $self->{'_accu_hash'} = {} unless(defined($self->{'_accu_hash'})); + return $self->{'_accu_hash'}; +} + + =head2 autoflow Title : autoflow @@ -253,7 +273,21 @@ sub fan_cache { # a self-initializing getter (no setting) sub dataflow_output_id { my ($self, $output_ids, $branch_name_or_code, $create_job_options) = @_; - $output_ids ||= [ $self->input_id() ]; # replicate the input_id in the branch_code's output by default + my $input_id = $self->input_id(); + my $param_id_stack = $self->param_id_stack(); + my $accu_id_stack = $self->accu_id_stack(); + my $hive_use_param_stack = $self->adaptor->db->hive_use_param_stack(); + + if($hive_use_param_stack) { + if($input_id and ($input_id ne '{}')) { # add the parent to the param_id_stack if it had non-trivial extra parameters + $param_id_stack = ($param_id_stack ? $param_id_stack.',' : '').$self->dbID(); + } + if(scalar(keys %{$self->accu_hash()})) { # add the parent to the accu_id_stack if it had "own" accumulator + $accu_id_stack = ($accu_id_stack ? $accu_id_stack.',' : '').$self->dbID(); + } + } + + $output_ids ||= [ $hive_use_param_stack ? {} : $input_id ]; # by default replicate the parameters of the parent in the child $output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY'); # force previously used single values into an arrayref if($create_job_options) { @@ -306,6 +340,8 @@ sub dataflow_output_id { } elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( # if a semaphored funnel job creation succeeded, ... -input_id => $output_ids_for_this_rule->[0], + -param_id_stack => $param_id_stack, + -accu_id_stack => $accu_id_stack, -analysis => $target_analysis_or_table, -prev_job => $self, -semaphore_count => scalar(@$fan_cache), # "pre-increase" the semaphore count before creating the dependent jobs @@ -319,6 +355,8 @@ sub dataflow_output_id { my ($output_id, $fan_analysis) = @$pair; if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( -input_id => $output_id, + -param_id_stack => $param_id_stack, + -accu_id_stack => $accu_id_stack, -analysis => $fan_analysis, -prev_job => $self, -semaphored_job_id => $funnel_job_id, # by passing this parameter we request not to propagate semaphores @@ -341,6 +379,8 @@ sub dataflow_output_id { if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( -input_id => $output_id, + -param_id_stack => $param_id_stack, + -accu_id_stack => $accu_id_stack, -analysis => $target_analysis_or_table, -prev_job => $self, -semaphored_job_id => $self->semaphored_job_id(), # propagate parent's semaphore if any diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AccumulatorAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AccumulatorAdaptor.pm index 06a9885c5..ce7c33a80 100644 --- a/modules/Bio/EnsEMBL/Hive/DBSQL/AccumulatorAdaptor.pm +++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AccumulatorAdaptor.pm @@ -33,41 +33,46 @@ sub default_table_name { } -sub fetch_structures_for_job_id { - my ($self, $receiving_job_id) = @_; - - my $sql = 'SELECT struct_name, key_signature, value FROM accu WHERE receiving_job_id=?'; - my $sth = $self->prepare( $sql ); - $sth->execute( $receiving_job_id ); +sub fetch_structures_for_job_ids { + my ($self, $job_ids_csv, $id_scale, $id_offset) = @_; + $id_scale ||= 1; + $id_offset ||= 0; my %structures = (); - ROW: while(my ($struct_name, $key_signature, $stringified_value) = $sth->fetchrow() ) { - - my $value = destringify($stringified_value); - - my $sptr = \$structures{$struct_name}; - - while( $key_signature=~/(?:(?:\[(\w*)\])|(?:\{(\w*)\}))/g) { - my ($array_index, $hash_key) = ($1, $2); - if(defined($array_index)) { - unless(length($array_index)) { - $array_index = scalar(@{$$sptr||[]}); - } - $sptr = \$$sptr->[$array_index]; - } elsif(defined($hash_key)) { - if(length($hash_key)) { - $sptr = \$$sptr->{$hash_key}; - } else { - $sptr = \$$sptr->{$value}; - $$sptr++; - next ROW; + if( $job_ids_csv ) { + + my $sql = "SELECT receiving_job_id, struct_name, key_signature, value FROM accu WHERE receiving_job_id in ($job_ids_csv)"; + my $sth = $self->prepare( $sql ); + $sth->execute(); + + ROW: while(my ($receiving_job_id, $struct_name, $key_signature, $stringified_value) = $sth->fetchrow_array() ) { + + my $value = destringify($stringified_value); + + my $sptr = \$structures{$receiving_job_id * $id_scale + $id_offset}{$struct_name}; + + while( $key_signature=~/(?:(?:\[(\w*)\])|(?:\{(\w*)\}))/g) { + my ($array_index, $hash_key) = ($1, $2); + if(defined($array_index)) { + unless(length($array_index)) { + $array_index = scalar(@{$$sptr||[]}); + } + $sptr = \$$sptr->[$array_index]; + } elsif(defined($hash_key)) { + if(length($hash_key)) { + $sptr = \$$sptr->{$hash_key}; + } else { + $sptr = \$$sptr->{$value}; + $$sptr++; + next ROW; + } } } + $$sptr = $value; } - $$sptr = $value; + $sth->finish; } - $sth->finish; return \%structures; } diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm index ace5687b8..31ecc44bf 100644 --- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm +++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm @@ -78,8 +78,8 @@ use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor'); sub CreateNewJob { my ($class, @args) = @_; - my ($input_id, $analysis, $prev_job, $prev_job_id, $semaphore_count, $semaphored_job_id, $push_new_semaphore) = - rearrange([qw(input_id analysis prev_job prev_job_id semaphore_count semaphored_job_id push_new_semaphore)], @args); + my ($input_id, $param_id_stack, $accu_id_stack, $analysis, $prev_job, $prev_job_id, $semaphore_count, $semaphored_job_id, $push_new_semaphore) = + rearrange([qw(input_id param_id_stack accu_id_stack analysis prev_job prev_job_id semaphore_count semaphored_job_id push_new_semaphore)], @args); throw("must define input_id") unless($input_id); throw("must define analysis") unless($analysis); @@ -89,16 +89,30 @@ sub CreateNewJob { unless($analysis->adaptor and $analysis->adaptor->db); throw("Please specify prev_job object instead of prev_job_id if available") if ($prev_job_id); # 'obsolete' message - $prev_job_id = $prev_job && $prev_job->dbID(); + $prev_job_id = $prev_job && $prev_job->dbID(); - if(ref($input_id)) { # let's do the Perl hash stringification centrally rather than in many places: - $input_id = stringify($input_id); - } + if(ref($input_id)) { # let's do the Perl hash stringification centrally rather than in many places: + $input_id = stringify($input_id); + } + + if(length($input_id) >= 255) { + print "input_id is '$input_id', length = ".length($input_id)."\n"; + my $extended_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($input_id); + $input_id = "_extended_data_id $extended_data_id"; + } + + if(length($param_id_stack) >= 64) { + print "param_id_stack is '$param_id_stack', length = ".length($param_id_stack)."\n"; + my $extended_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($param_id_stack); + $param_id_stack = "_extended_data_id $extended_data_id"; + } + + if(length($accu_id_stack) >= 64) { + print "accu_id_stack is '$accu_id_stack', length = ".length($accu_id_stack)."\n"; + my $extended_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($accu_id_stack); + $accu_id_stack = "_extended_data_id $extended_data_id"; + } - if(length($input_id) >= 255) { - my $input_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($input_id); - $input_id = "_ext_input_analysis_data_id $input_data_id"; - } $semaphore_count ||= 0; @@ -111,11 +125,11 @@ sub CreateNewJob { $dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($semaphored_job_id and ($dbc->driver ne 'sqlite')); my $sql = qq{$insertion_method INTO job - (input_id, prev_job_id,analysis_id,status,semaphore_count,semaphored_job_id) - VALUES (?,?,?,?,?,?)}; + (input_id, param_id_stack, accu_id_stack, prev_job_id,analysis_id,status,semaphore_count,semaphored_job_id) + VALUES (?,?,?,?,?,?,?,?)}; my $sth = $dbc->prepare($sql); - my @values = ($input_id, $prev_job_id, $analysis_id, $job_status, $semaphore_count, $semaphored_job_id); + my @values = ($input_id, $param_id_stack || '', $accu_id_stack || '', $prev_job_id, $analysis_id, $job_status, $semaphore_count, $semaphored_job_id); my $return_code = $sth->execute(@values) # using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true: @@ -312,6 +326,8 @@ sub _columns { j.prev_job_id j.analysis_id j.input_id + j.param_id_stack + j.accu_id_stack j.worker_id j.status j.retry_count @@ -334,14 +350,25 @@ sub _objs_from_sth { while ($sth->fetch()) { - my $input_id = ($column{'input_id'} =~ /_ext_input_analysis_data_id (\d+)/) + my $input_id = ($column{'input_id'} =~ /_ext(?:\w+)_data_id (\d+)/) ? $self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1) : $column{'input_id'}; + my $param_id_stack = ($column{'param_id_stack'} =~ /_ext(?:\w+)_data_id (\d+)/) + ? $self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1) + : $column{'param_id_stack'}; + + my $accu_id_stack = ($column{'accu_id_stack'} =~ /_ext(?:\w+)_data_id (\d+)/) + ? $self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1) + : $column{'accu_id_stack'}; + + my $job = Bio::EnsEMBL::Hive::AnalysisJob->new( -dbID => $column{'job_id'}, -analysis_id => $column{'analysis_id'}, -input_id => $input_id, + -param_id_stack => $param_id_stack, + -accu_id_stack => $accu_id_stack, -worker_id => $column{'worker_id'}, -status => $column{'status'}, -retry_count => $column{'retry_count'}, @@ -733,5 +760,26 @@ sub balance_semaphores { } +sub fetch_input_ids_for_job_ids { + my ($self, $job_ids_csv, $id_scale, $id_offset) = @_; + $id_scale ||= 1; + $id_offset ||= 0; + + my %input_ids = (); + + if( $job_ids_csv ) { + + my $sql = "SELECT job_id, input_id FROM job WHERE job_id in ($job_ids_csv)"; + my $sth = $self->prepare( $sql ); + $sth->execute(); + + while(my ($job_id, $input_id) = $sth->fetchrow_array() ) { + $input_ids{$job_id * $id_scale + $id_offset} = $input_id; + } + } + return \%input_ids; +} + + 1; diff --git a/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMultSt_conf.pm b/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMultSt_conf.pm new file mode 100644 index 000000000..9a4ef830b --- /dev/null +++ b/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMultSt_conf.pm @@ -0,0 +1,194 @@ + +=pod + +=head1 NAME + + Bio::EnsEMBL::Hive::PipeConfig::LongMultSt_conf; + +=head1 SYNOPSIS + + # Example 1: specifying only the mandatory option (numbers to be multiplied are taken from defaults) +init_pipeline.pl Bio::EnsEMBL::Hive::PipeConfig::LongMultSt_conf -password <mypass> + + # Example 2: specifying the mandatory options as well as overriding the default numbers to be multiplied: +init_pipeline.pl Bio::EnsEMBL::Hive::PipeConfig::LongMultSt_conf -password <mypass> -first_mult 2344556 -second_mult 777666555 + + # Example 3: do not re-create the database, just load another multiplicaton task into an existing one: +init_pipeline.pl Bio::EnsEMBL::Hive::PipeConfig::LongMultSt_conf -job_topup -password <mypass> -first_mult 1111222233334444 -second_mult 38578377835 + + +=head1 DESCRIPTION + + This is an experimental version of LongMult_conf with hive_use_param_stack switched on. + + This is the PipeConfig file for the long multiplication pipeline example. + The main point of this pipeline is to provide an example of how to write Hive Runnables and link them together into a pipeline. + + Please refer to Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf module to understand the interface implemented here. + + + The setting. Let's assume we are given two loooooong numbers to multiply. Reeeeally long. + So long that they do not fit into registers of the CPU and should be multiplied digit-by-digit. + For the purposes of this example we also assume this task is very computationally intensive and has to be done in parallel. + + The long multiplication pipeline consists of three "analyses" (types of tasks): 'take_b_apart', 'part_multiply' and 'add_together' + that we will be using to examplify various features of the Hive. + + * A 'take_b_apart' job takes in two string parameters, 'a_multiplier' and 'b_multiplier', + takes the second one apart into digits, finds what _different_ digits are there, + creates several jobs of the 'part_multiply' analysis and one job of 'add_together' analysis. + + * A 'part_multiply' job takes in 'a_multiplier' and 'digit', multiplies them and accumulates the result in 'partial_product' accumulator. + + * An 'add_together' job waits for the first two analyses to complete, + takes in 'a_multiplier', 'b_multiplier' and 'partial_product' hash and produces the final result in 'final_result' table. + + Please see the implementation details in Runnable modules themselves. + +=head1 CONTACT + + Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions. + +=cut + + +package Bio::EnsEMBL::Hive::PipeConfig::LongMultSt_conf; + +use strict; +use warnings; + +use base ('Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf'); # All Hive databases configuration files should inherit from HiveGeneric, directly or indirectly + + +=head2 default_options + + Description : Implements default_options() interface method of Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf that is used to initialize default options. + In addition to the standard things it defines two options, 'first_mult' and 'second_mult' that are supposed to contain the long numbers to be multiplied. + +=cut + +sub default_options { + my ($self) = @_; + + return { + %{ $self->SUPER::default_options() }, # inherit other stuff from the base class + + 'pipeline_name' => 'long_mult', # name used by the beekeeper to prefix job names on the farm + + 'first_mult' => '9650156169', # the actual numbers to be multiplied can also be specified from the command line + 'second_mult' => '327358788', + + 'take_time' => 1, # how much time (in seconds) should each job take -- to slow things down + }; +} + + +=head2 pipeline_create_commands + + Description : Implements pipeline_create_commands() interface method of Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf that lists the commands that will create and set up the Hive database. + In addition to the standard creation of the database and populating it with Hive tables and procedures it also creates two pipeline-specific tables used by Runnables to communicate. + +=cut + +sub pipeline_create_commands { + my ($self) = @_; + return [ + @{$self->SUPER::pipeline_create_commands}, # inheriting database and hive tables' creation + + # additional tables needed for long multiplication pipeline's operation: + 'db_conn.pl -url '.$self->dbconn_2_url('pipeline_db').' -sql ' + ."'CREATE TABLE final_result (a_multiplier char(40) NOT NULL, b_multiplier char(40) NOT NULL, result char(80) NOT NULL, PRIMARY KEY (a_multiplier, b_multiplier))'", + ]; +} + + +=head2 pipeline_wide_parameters + + Description : Interface method that should return a hash of pipeline_wide_parameter_name->pipeline_wide_parameter_value pairs. + The value doesn't have to be a scalar, can be any Perl structure now (will be stringified and de-stringified automagically). + Please see existing PipeConfig modules for examples. + +=cut + +sub pipeline_wide_parameters { + my ($self) = @_; + return { + %{$self->SUPER::pipeline_wide_parameters}, # here we inherit anything from the base class + + 'take_time' => $self->o('take_time'), + }; +} + + +sub hive_meta_table { + my ($self) = @_; + return { + %{$self->SUPER::hive_meta_table}, # here we inherit anything from the base class + + 'hive_use_param_stack' => 1, # switch on the new param_stack mechanism + }; +} + + +=head2 pipeline_analyses + + Description : Implements pipeline_analyses() interface method of Bio::EnsEMBL::Hive::PipeConfig::HiveGeneric_conf that defines the structure of the pipeline: analyses, jobs, rules, etc. + Here it defines three analyses: + + * 'take_b_apart' with two jobs (multiply 'first_mult' by 'second_mult' and vice versa - to check the commutativity of multiplivation). + Each job will dataflow (create more jobs) via branch #2 into 'part_multiply' and via branch #1 into 'add_together'. + + * 'part_multiply' initially without jobs (they will flow from 'take_b_apart') + + * 'add_together' initially without jobs (they will flow from 'take_b_apart'). + All 'add_together' jobs will wait for completion of 'part_multiply' jobs before their own execution (to ensure all data is available). + + There are two control modes in this pipeline: + A. The default mode is to use the '2' and '1' dataflow rules from 'take_b_apart' analysis and a -wait_for rule in 'add_together' analysis for analysis-wide synchronization. + B. The semaphored mode is to use '2->A' and 'A->1' semaphored dataflow rules from 'take_b_apart' instead, and comment out the analysis-wide -wait_for rule, relying on semaphores. + +=cut + +sub pipeline_analyses { + my ($self) = @_; + return [ + { -logic_name => 'take_b_apart', + -module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::DigitFactory', + -meadow_type=> 'LOCAL', # do not bother the farm with such a simple task (and get it done faster) + -analysis_capacity => 2, # use per-analysis limiter + -input_ids => [ + { 'a_multiplier' => $self->o('first_mult'), 'b_multiplier' => $self->o('second_mult') }, + { 'a_multiplier' => $self->o('second_mult'), 'b_multiplier' => $self->o('first_mult') }, + ], + -flow_into => { + '2->A' => [ 'part_multiply' ], # will create a semaphored fan of jobs; will use param_stack mechanism to pass parameters around + 'A->1' => [ 'add_together' ], # will create a semaphored funnel job to wait for the fan to complete and add the results + }, + }, + + { -logic_name => 'part_multiply', + -module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply', + -analysis_capacity => 4, # use per-analysis limiter + -flow_into => { + 1 => { ':////accu?partial_product={digit}' => { 'a_multiplier' => '#a_multiplier#', 'digit' => '#digit#', 'partial_product' => '#partial_product#' } }, + }, + }, + + { -logic_name => 'add_together', + -module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::AddTogether', +# -analysis_capacity => 0, # this is a way to temporarily block a given analysis + -flow_into => { + 1 => { ':////final_result' => { 'a_multiplier' => '#a_multiplier#', 'b_multiplier' => '#b_multiplier#', 'result' => '#result#' }, + 'last' => undef, + }, + }, + }, + + { -logic_name => 'last', + -module => 'Bio::EnsEMBL::Hive::RunnableDB::Dummy', + } + ]; +} + +1; + diff --git a/modules/Bio/EnsEMBL/Hive/Worker.pm b/modules/Bio/EnsEMBL/Hive/Worker.pm index ddccb2b56..79f58fd16 100644 --- a/modules/Bio/EnsEMBL/Hive/Worker.pm +++ b/modules/Bio/EnsEMBL/Hive/Worker.pm @@ -652,8 +652,9 @@ sub run_one_batch { my $jobs_done_here = 0; - my $accu_adaptor = $self->adaptor->db->get_AccumulatorAdaptor; - my $max_retry_count = $self->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs + my $hive_use_param_stack = $self->adaptor->db->hive_use_param_stack(); + my $accu_adaptor = $self->adaptor->db->get_AccumulatorAdaptor; + my $max_retry_count = $self->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs $self->adaptor->check_in_worker( $self ); $self->adaptor->safe_synchronize_AnalysisStats($self->analysis->stats); @@ -666,6 +667,8 @@ sub run_one_batch { my $job_partial_timing; ONE_BATCH: while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay + + my $job_id = $job->dbID(); $self->worker_say( $job->toString ) if($self->debug); my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new(); @@ -675,21 +678,30 @@ sub run_one_batch { eval { # capture any throw/die $job->incomplete(1); + $job->accu_hash( $accu_adaptor->fetch_structures_for_job_ids( $job_id )->{ $job_id } ); + my $runnable_object = $self->runnable_object(); $self->adaptor->db->dbc->query_count(0); $job_stopwatch->restart(); - $job->param_init( - $runnable_object->strict_hash_format(), + my @params_precedence = ( $runnable_object->param_defaults(), $self->adaptor->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), - $job->input_id(), - $accu_adaptor->fetch_structures_for_job_id( $job->dbID ), # FIXME: or should we pass in the original hash to be extended by pushing? ); - $self->worker_say( 'Job '.$job->dbID." unsubstituted_params= ".stringify($job->{'_unsubstituted_param_hash'}) ) if($self->debug()); + if( $hive_use_param_stack ) { + my $input_ids_hash = $job->adaptor->fetch_input_ids_for_job_ids( $job->param_id_stack, 2, 0 ); # input_ids have lower precedence (FOR EACH ID) + my $accu_hash = $accu_adaptor->fetch_structures_for_job_ids( $job->accu_id_stack, 2, 1 ); # accus have higher precedence (FOR EACH ID) + my %input_id_accu_hash = ( %$input_ids_hash, %$accu_hash ); + push @params_precedence, @input_id_accu_hash{ sort keys %input_id_accu_hash }; # take a slice. Mmm... + } + push @params_precedence, $job->input_id(), $job->accu_hash(); + + $job->param_init( $runnable_object->strict_hash_format(), @params_precedence ); + + $self->worker_say( "Job $job_id unsubstituted_params= ".stringify($job->{'_unsubstituted_param_hash'}) ) if($self->debug()); $runnable_object->input_job( $job ); # "take" the job $job_partial_timing = $runnable_object->life_cycle(); @@ -702,7 +714,6 @@ sub run_one_batch { $job->runtime_msec( $job_stopwatch->get_elapsed ); # whether successful or not $job->query_count( $self->adaptor->db->dbc->query_count ); - my $job_id = $job->dbID(); my $job_completion_line = "Job $job_id : complete"; if($msg_thrown) { # record the message - whether it was a success or failure: @@ -722,7 +733,7 @@ sub run_one_batch { # my $may_retry = defined($job->transient_error) ? $job->transient_error : $self->retry_throwing_jobs; - $job->adaptor->release_and_age_job( $job->dbID, $max_retry_count, $may_retry, $job->runtime_msec ); + $job->adaptor->release_and_age_job( $job_id, $max_retry_count, $may_retry, $job->runtime_msec ); if( $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination or $job->lethal_for_worker ) { # trust the job's expert knowledge diff --git a/sql/patch_2013-08-21.mysql b/sql/patch_2013-08-21.mysql new file mode 100644 index 000000000..878ed2bd9 --- /dev/null +++ b/sql/patch_2013-08-21.mysql @@ -0,0 +1,12 @@ + + -- Add two new fields to job table to support parameter/accu stacks: +ALTER TABLE job ADD COLUMN param_id_stack CHAR(64) NOT NULL DEFAULT '' AFTER input_id; +ALTER TABLE job ADD COLUMN accu_id_stack CHAR(64) NOT NULL DEFAULT '' AFTER param_id_stack; + + -- Extend the unique constraint to include both new fields: +ALTER TABLE job DROP KEY input_id_analysis; +ALTER TABLE job ADD UNIQUE KEY input_id_stacks_analysis (input_id, param_id_stack, accu_id_stack, analysis_id); + + -- UPDATE hive_sql_schema_version +UPDATE hive_meta SET meta_value=52 WHERE meta_key='hive_sql_schema_version' AND meta_value='51'; + diff --git a/sql/patch_2013-08-21.pgsql b/sql/patch_2013-08-21.pgsql new file mode 100644 index 000000000..468fd79a7 --- /dev/null +++ b/sql/patch_2013-08-21.pgsql @@ -0,0 +1,15 @@ + + -- Documentation claims there is no performance gain from using fixed-width CHAR() types, changing them to TEXT: +ALTER TABLE job ALTER COLUMN input_id SET DATA TYPE TEXT; + + -- Add two new fields to job table to support parameter/accu stacks: +ALTER TABLE job ADD COLUMN param_id_stack TEXT NOT NULL DEFAULT ''; +ALTER TABLE job ADD COLUMN accu_id_stack TEXT NOT NULL DEFAULT ''; + + -- Extend the unique constraint to include both new fields: +ALTER TABLE job DROP CONSTRAINT job_input_id_analysis_id_key; +ALTER TABLE job ADD UNIQUE (input_id, param_id_stack, accu_id_stack, analysis_id); + + -- UPDATE hive_sql_schema_version +UPDATE hive_meta SET meta_value=52 WHERE meta_key='hive_sql_schema_version' AND meta_value='51'; + diff --git a/sql/tables.mysql b/sql/tables.mysql index b2322abc9..ba5a948a9 100644 --- a/sql/tables.mysql +++ b/sql/tables.mysql @@ -263,6 +263,8 @@ CREATE TABLE job ( prev_job_id INTEGER DEFAULT NULL, -- the job that created this one using a dataflow rule analysis_id INTEGER NOT NULL, input_id CHAR(255) NOT NULL, + param_id_stack CHAR(64) NOT NULL DEFAULT '', + accu_id_stack CHAR(64) NOT NULL DEFAULT '', worker_id INTEGER DEFAULT NULL, status ENUM('SEMAPHORED','READY','CLAIMED','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL, retry_count INTEGER NOT NULL DEFAULT 0, @@ -273,9 +275,9 @@ CREATE TABLE job ( semaphore_count INTEGER NOT NULL DEFAULT 0, semaphored_job_id INTEGER DEFAULT NULL, - UNIQUE KEY input_id_analysis (input_id, analysis_id), -- to avoid repeating tasks - KEY analysis_status_retry (analysis_id, status, retry_count), -- for claiming jobs - KEY worker_id (worker_id, status) -- for fetching and releasing claimed jobs + UNIQUE KEY input_id_stacks_analysis (input_id, param_id_stack, accu_id_stack, analysis_id), -- to avoid repeating tasks + KEY analysis_status_retry (analysis_id, status, retry_count), -- for claiming jobs + KEY worker_id (worker_id, status) -- for fetching and releasing claimed jobs ) COLLATE=latin1_swedish_ci ENGINE=InnoDB; diff --git a/sql/tables.pgsql b/sql/tables.pgsql index 5e647922d..453dcdba0 100644 --- a/sql/tables.pgsql +++ b/sql/tables.pgsql @@ -267,7 +267,9 @@ CREATE TABLE job ( job_id SERIAL PRIMARY KEY, prev_job_id INTEGER DEFAULT NULL, -- the job that created this one using a dataflow rule analysis_id INTEGER NOT NULL, - input_id CHAR(255) NOT NULL, + input_id TEXT NOT NULL, + param_id_stack TEXT NOT NULL DEFAULT '', + accu_id_stack TEXT NOT NULL DEFAULT '', worker_id INTEGER DEFAULT NULL, status jw_status NOT NULL DEFAULT 'READY', retry_count INTEGER NOT NULL DEFAULT 0, @@ -278,7 +280,7 @@ CREATE TABLE job ( semaphore_count INTEGER NOT NULL DEFAULT 0, semaphored_job_id INTEGER DEFAULT NULL, - UNIQUE (input_id, analysis_id) -- to avoid repeating tasks + UNIQUE (input_id, param_id_stack, accu_id_stack, analysis_id) -- to avoid repeating tasks ); CREATE INDEX ON job (analysis_id, status, retry_count); -- for claiming jobs CREATE INDEX ON job (worker_id, status); -- for fetching and releasing claimed jobs diff --git a/sql/tables.sqlite b/sql/tables.sqlite index f18908746..7b2f66633 100644 --- a/sql/tables.sqlite +++ b/sql/tables.sqlite @@ -260,6 +260,8 @@ CREATE TABLE job ( prev_job_id INTEGER DEFAULT NULL, -- the job that created this one using a dataflow rule analysis_id INTEGER NOT NULL, input_id CHAR(255) NOT NULL, + param_id_stack CHAR(64) NOT NULL DEFAULT '', + accu_id_stack CHAR(64) NOT NULL DEFAULT '', worker_id INTEGER DEFAULT NULL, status TEXT NOT NULL DEFAULT 'READY', /* enum('SEMAPHORED','READY','CLAIMED','COMPILATION','FETCH_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL, */ retry_count INTEGER NOT NULL DEFAULT 0, @@ -270,7 +272,7 @@ CREATE TABLE job ( semaphore_count INTEGER NOT NULL DEFAULT 0, semaphored_job_id INTEGER DEFAULT NULL ); -CREATE UNIQUE INDEX job_input_id_analysis_id_idx ON job (input_id, analysis_id); +CREATE UNIQUE INDEX job_input_id_stacks_analysis_idx ON job (input_id, param_id_stack, accu_id_stack, analysis_id); CREATE INDEX job_analysis_status_retry_idx ON job (analysis_id, status, retry_count); CREATE INDEX job_worker_idx ON job (worker_id); -- GitLab