diff --git a/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm b/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm index bb89c235df88812a4f49a9384c77314aa3453920..04b6b95c9af990618767f64256b94366396f935e 100755 --- a/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm +++ b/modules/Bio/EnsEMBL/Hive/AnalysisJob.pm @@ -271,7 +271,9 @@ sub dataflow_output_id { $self->autoflow(0) if($branch_code == 1); my @output_job_ids = (); - foreach my $rule (@{ $self->dataflow_rules( $branch_name_or_code ) }) { + + # sort rules to make sure the fan rules come before funnel rules for the same branch_code: + foreach my $rule (sort {($b->funnel_dataflow_rule_id||0) <=> ($a->funnel_dataflow_rule_id||0)} @{ $self->dataflow_rules( $branch_code ) }) { # parameter substitution into input_id_template is rule-specific my $output_ids_for_this_rule; @@ -289,21 +291,21 @@ sub dataflow_output_id { } else { - if(my $funnel_branch_code = $rule->funnel_branch_code()) { # a semaphored fan: they will have to wait in cache until the funnel is created + if(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) { # a semaphored fan: they will have to wait in cache until the funnel is created - my $fan_cache_this_branch = $self->fan_cache()->{$funnel_branch_code} ||= []; + my $fan_cache_this_branch = $self->fan_cache()->{$funnel_dataflow_rule_id} ||= []; push @$fan_cache_this_branch, map { [$_, $target_analysis_or_table] } @$output_ids_for_this_rule; } else { - my $fan_cache = $self->fan_cache()->{$branch_code}; + my $fan_cache = delete $self->fan_cache()->{$rule->dbID}; # clear the cache at the same time if($fan_cache && @$fan_cache) { # a semaphored funnel my $funnel_job_id; - if( (my $funnel_job_number = scalar(@$output_ids_for_this_rule)) !=1 ) { + if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) { $self->transient_error(0); - die "Asked to dataflow into $funnel_job_number funnel jobs instead of 1"; + die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1"; } elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( # if a semaphored funnel job creation succeeded, -input_id => $output_ids_for_this_rule->[0], @@ -328,8 +330,6 @@ sub dataflow_output_id { die "Could not create a funnel job"; } - delete $self->fan_cache()->{$branch_code}; # clear the cache - } else { # non-semaphored dataflow (but potentially propagating any existing semaphores) foreach my $output_id ( @$output_ids_for_this_rule ) { diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/DataflowRuleAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/DataflowRuleAdaptor.pm index 76efe6b24a3e1130851c32cb13b5105049bb0006..e5fcffa0cfdfc5bb81c5efe14c7a39bf45e403cb 100755 --- a/modules/Bio/EnsEMBL/Hive/DBSQL/DataflowRuleAdaptor.pm +++ b/modules/Bio/EnsEMBL/Hive/DBSQL/DataflowRuleAdaptor.pm @@ -112,7 +112,7 @@ sub fetch_all_by_from_analysis_id_and_branch_code { =cut sub create_rule { - my ($self, $from_analysis, $to_analysis_or_url, $branch_name_or_code, $input_id_template, $funnel_branch_name_or_code) = @_; + my ($self, $from_analysis, $to_analysis_or_url, $branch_name_or_code, $input_id_template, $funnel_dataflow_rule_id) = @_; return unless($from_analysis and $to_analysis_or_url); @@ -125,7 +125,7 @@ sub create_rule { -branch_code => $self->branch_name_2_code($branch_name_or_code), -input_id_template => (ref($input_id_template) ? stringify($input_id_template) : $input_id_template), - -funnel_branch_code => $self->branch_name_2_code($funnel_branch_name_or_code, 1), + -funnel_dataflow_rule_id => $funnel_dataflow_rule_id, ); return $self->store($rule, 1); # avoid redundancy diff --git a/modules/Bio/EnsEMBL/Hive/DataflowRule.pm b/modules/Bio/EnsEMBL/Hive/DataflowRule.pm index 027fa7255cd555b1cf4cf4d5615a159e6bf32602..c52edd1b3c9b28cbb43047d9f735f13acd479407 100755 --- a/modules/Bio/EnsEMBL/Hive/DataflowRule.pm +++ b/modules/Bio/EnsEMBL/Hive/DataflowRule.pm @@ -23,7 +23,7 @@ dataflow_rule_id int(10) unsigned NOT NULL AUTO_INCREMENT, from_analysis_id int(10) unsigned NOT NULL, branch_code int(10) default 1 NOT NULL, - funnel_branch_code int(10) default NULL, + funnel_dataflow_rule_id int(10) unsigned default NULL, to_analysis_url varchar(255) default '' NOT NULL, input_id_template TEXT DEFAULT NULL, @@ -67,8 +67,8 @@ sub new { my $class = shift @_; my $self = bless {}, $class; - my ( $dbID, $adaptor, $fromAnalysis, $toAnalysis, $from_analysis_id, $branch_code, $funnel_branch_code, $to_analysis_url, $input_id_template ) = - rearrange( [ qw (DBID ADAPTOR FROM_ANALYSIS TO_ANALYSIS FROM_ANALYSIS_ID BRANCH_CODE FUNNEL_BRANCH_CODE TO_ANALYSIS_URL INPUT_ID_TEMPLATE) ], @_ ); + my ( $dbID, $adaptor, $fromAnalysis, $toAnalysis, $from_analysis_id, $branch_code, $funnel_dataflow_rule_id, $to_analysis_url, $input_id_template ) = + rearrange( [ qw (DBID ADAPTOR FROM_ANALYSIS TO_ANALYSIS FROM_ANALYSIS_ID BRANCH_CODE FUNNEL_DATAFLOW_RULE_ID TO_ANALYSIS_URL INPUT_ID_TEMPLATE) ], @_ ); # database persistence: $self->dbID( $dbID ) if(defined($dbID)); @@ -82,7 +82,7 @@ sub new { $self->from_analysis_id($from_analysis_id) if(defined($from_analysis_id)); $self->to_analysis_url($to_analysis_url) if(defined($to_analysis_url)); $self->branch_code($branch_code) if(defined($branch_code)); - $self->funnel_branch_code($funnel_branch_code) if(defined($funnel_branch_code)); + $self->funnel_dataflow_rule_id($funnel_dataflow_rule_id) if(defined($funnel_dataflow_rule_id)); $self->input_id_template($input_id_template) if(defined($input_id_template)); return $self; @@ -133,19 +133,19 @@ sub branch_code { return $self->{'_branch_code'}; } -=head2 funnel_branch_code +=head2 funnel_dataflow_rule_id - Function: getter/setter method for the funnel_branch_code of the dataflow rule + Function: getter/setter method for the funnel_dataflow_rule_id of the dataflow rule =cut -sub funnel_branch_code { +sub funnel_dataflow_rule_id { my $self = shift @_; if(@_) { # setter mode - $self->{'_funnel_branch_code'} = shift @_; + $self->{'_funnel_dataflow_rule_id'} = shift @_; } - return $self->{'_funnel_branch_code'}; + return $self->{'_funnel_dataflow_rule_id'}; } =head2 input_id_template diff --git a/modules/Bio/EnsEMBL/Hive/PipeConfig/HiveGeneric_conf.pm b/modules/Bio/EnsEMBL/Hive/PipeConfig/HiveGeneric_conf.pm index b155aa5aec53b094749dbc1012e6d59194086545..e06c717caed8754334dabce1f355c0cc06e0a471 100644 --- a/modules/Bio/EnsEMBL/Hive/PipeConfig/HiveGeneric_conf.pm +++ b/modules/Bio/EnsEMBL/Hive/PipeConfig/HiveGeneric_conf.pm @@ -440,12 +440,37 @@ sub run { $flow_into ||= {}; $flow_into = { 1 => $flow_into } unless(ref($flow_into) eq 'HASH'); # force non-hash into a hash - foreach my $branch_tag (keys %$flow_into) { - my ($branch_name_or_code, $funnel_branch_name_or_code) = split(/:/, $branch_tag); - my $heirs = $flow_into->{$branch_tag}; + my %group_tag_to_funnel_dataflow_rule_id = (); - $heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first + my $semaphore_sign = '->'; + + my @all_branch_tags = keys %$flow_into; + foreach my $branch_tag ((grep {/^[A-Z]$semaphore_sign/} @all_branch_tags), (grep {/$semaphore_sign[A-Z]$/} @all_branch_tags), (grep {!/$semaphore_sign/} @all_branch_tags)) { + + my ($branch_name_or_code, $group_role, $group_tag); + + if($branch_tag=~/^([A-Z])$semaphore_sign(-?\w+)$/) { + ($branch_name_or_code, $group_role, $group_tag) = ($2, 'funnel', $1); + } elsif($branch_tag=~/^(-?\w+)$semaphore_sign([A-Z])$/) { + ($branch_name_or_code, $group_role, $group_tag) = ($1, 'fan', $2); + } elsif($branch_tag=~/^(-?\w+)$/) { + ($branch_name_or_code, $group_role, $group_tag) = ($1, ''); + } elsif($branch_tag=~/:/) { + die "Please use newer '2${semaphore_sign}A' and 'A${semaphore_sign}1' notation instead of '2:1' and '1'\n"; + } else { + die "Error parsing the group tag '$branch_tag'\n"; + } + my $funnel_dataflow_rule_id = undef; # NULL by default + + if($group_role eq 'fan') { + unless($funnel_dataflow_rule_id = $group_tag_to_funnel_dataflow_rule_id{$group_tag}) { + die "No funnel dataflow_rule defined for group '$group_tag'\n"; + } + } + + my $heirs = $flow_into->{$branch_tag}; + $heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first $heirs = { map { ($_ => undef) } @$heirs } if(ref($heirs) eq 'ARRAY'); # now force it into a hash if it wasn't while(my ($heir_url, $input_id_template_list) = each %$heirs) { @@ -456,7 +481,15 @@ sub run { my $heir_analysis = $analysis_adaptor->fetch_by_logic_name_or_url($heir_url); - $dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis || $heir_url, $branch_name_or_code, $input_id_template, $funnel_branch_name_or_code); + my $rule = $dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis || $heir_url, $branch_name_or_code, $input_id_template, $funnel_dataflow_rule_id); + + if($group_role eq 'funnel') { + if($group_tag_to_funnel_dataflow_rule_id{$group_tag}) { + die "More than one funnel dataflow_rule defined for group '$group_tag'\n"; + } else { + $group_tag_to_funnel_dataflow_rule_id{$group_tag} = $rule->dbID(); + } + } warn "DataFlow rule: [$branch_tag] $logic_name -> $heir_url" .($input_id_template ? ' WITH TEMPLATE: '.stringify($input_id_template) : '')."\n"; diff --git a/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMult_conf.pm b/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMult_conf.pm index db0017603e000d8f5cdca767c06c9c5f5b1acdf1..0bc226f1eeb4e71db83d4efa3be7aaf86699f5b8 100644 --- a/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMult_conf.pm +++ b/modules/Bio/EnsEMBL/Hive/PipeConfig/LongMult_conf.pm @@ -112,8 +112,8 @@ sub pipeline_create_commands { All 'add_together' jobs will wait for completion of 'part_multiply' jobs before their own execution (to ensure all data is available). There are two control modes in this pipeline: - A. The default mode is to use the '2' dataflow rule from 'start' analysis and a -wait_for rule in 'add_together' analysis for analysis-wide synchronization. - B. The semaphored mode is to use '2:1' semaphored dataflow rule from 'start' instead, and comment out the analysis-wide -wait_for rule, relying on semaphores. + A. The default mode is to use the '2' and '1' dataflow rules from 'start' analysis and a -wait_for rule in 'add_together' analysis for analysis-wide synchronization. + B. The semaphored mode is to use '2->A' and 'A->1' semaphored dataflow rules from 'start' instead, and comment out the analysis-wide -wait_for rule, relying on semaphores. =cut @@ -129,8 +129,9 @@ sub pipeline_analyses { ], -flow_into => { 2 => [ 'part_multiply' ], # will create a fan of jobs - # '2:1' => [ 'part_multiply' ], # will create a semaphored fan of jobs (comment out the -wait_for rule from 'add_together') 1 => [ 'add_together' ], # will create a funnel job to wait for the fan to complete and add the results +# '2->A' => [ 'part_multiply' ], # will create a semaphored fan of jobs (comment out the -wait_for rule from 'add_together') +# 'A->1' => [ 'add_together' ], # will create a semaphored funnel job to wait for the fan to complete and add the results }, }, diff --git a/modules/Bio/EnsEMBL/Hive/Utils/Graph.pm b/modules/Bio/EnsEMBL/Hive/Utils/Graph.pm index cff0301fda9fc14ea35317e29f9e0ce85344ca90..105b0c0a6335ea6bac5a7284dad1e4519e99ab35 100644 --- a/modules/Bio/EnsEMBL/Hive/Utils/Graph.pm +++ b/modules/Bio/EnsEMBL/Hive/Utils/Graph.pm @@ -30,7 +30,7 @@ $Author: lg4 $ =head1 VERSION -$Revision: 1.7 $ +$Revision: 1.8 $ =cut @@ -209,9 +209,9 @@ sub _control_rules { } sub _midpoint_name { - my $dfr = shift @_; + my $rule_id = shift @_; - return 'dfr_'.$dfr->dbID().'_mp'; + return 'dfr_'.$rule_id.'_mp'; } sub _dataflow_rules { @@ -220,38 +220,9 @@ sub _dataflow_rules { my $config = $self->config()->{Colours}->{Flows}; my $dataflow_rules = $self->dba()->get_DataflowRuleAdaptor()->fetch_all(); - my %funnel_to_fan_list = (); - - # pre-create midpoint nodes - foreach my $rule (@{$dataflow_rules}) { - my $from_analysis_id = $rule->from_analysis_id(); - my $funnel_branch_code = $rule->funnel_branch_code(); - my $midpoint_name = _midpoint_name($rule); - $graph->add_node( - $midpoint_name, - label => '', - defined($funnel_branch_code) - ? ( - shape => 'circle', - fixedsize => 1, - width => 0.1, - height => 0.1, - ) : ( - shape => 'point', - fixedsize => 1, - width => 0.01, - height => 0.01, - ), - color => $config->{data}, - ); - if($funnel_branch_code) { - push @{$funnel_to_fan_list{$from_analysis_id}{$funnel_branch_code}}, $midpoint_name; - } - } - foreach my $rule (@{$dataflow_rules}) { - my ($from_analysis_id, $branch_code, $funnel_branch_code, $to) = ($rule->from_analysis_id(), $rule->branch_code(), $rule->funnel_branch_code(), $rule->to_analysis()); + my ($from_analysis_id, $branch_code, $funnel_dataflow_rule_id, $to) = ($rule->from_analysis_id(), $rule->branch_code(), $rule->funnel_dataflow_rule_id(), $rule->to_analysis()); my $to_node; #If we've been told to flow from an analysis to a table or external source we need @@ -266,7 +237,7 @@ sub _dataflow_rules { next; } - my $midpoint_name = _midpoint_name($rule); + my $midpoint_name = _midpoint_name($rule->dbID); $graph->add_edge($from_analysis_id => $midpoint_name, color => $config->{data}, @@ -274,18 +245,33 @@ sub _dataflow_rules { label => '#'.$branch_code, fontname => $self->config()->{Fonts}->{edge}, ); + $graph->add_node( + $midpoint_name, + label => '', + defined($funnel_dataflow_rule_id) + ? ( + shape => 'circle', + fixedsize => 1, + width => 0.1, + height => 0.1, + ) : ( + shape => 'point', + fixedsize => 1, + width => 0.01, + height => 0.01, + ), + color => $config->{data}, + ); $graph->add_edge($midpoint_name => $to_node, color => $config->{data}, ); - if($funnel_to_fan_list{$from_analysis_id}{$branch_code}) { - foreach my $fan_midpoint (@{$funnel_to_fan_list{$from_analysis_id}{$branch_code}}) { - $graph->add_edge($fan_midpoint => $midpoint_name, + if($funnel_dataflow_rule_id) { + $graph->add_edge( $midpoint_name => _midpoint_name($funnel_dataflow_rule_id), color => $config->{semablock}, fontname => $self->config()->{Fonts}->{edge}, style => 'dashed', arrowhead => 'tee', - ); - } + ); } } } diff --git a/modules/Bio/EnsEMBL/Hive/Worker.pm b/modules/Bio/EnsEMBL/Hive/Worker.pm index 095d28f20fa0c849e1da80dae3051adf5c484947..cf73748160a6d7055f9c29df0d431fb55767514a 100755 --- a/modules/Bio/EnsEMBL/Hive/Worker.pm +++ b/modules/Bio/EnsEMBL/Hive/Worker.pm @@ -685,10 +685,10 @@ sub run_module_with_job { print("\n!!! *no* WRITE_OUTPUT and *no* AUTOFLOW\n") if($self->debug); } - my @zombie_funnel_branches = keys %{$job->fan_cache}; - if( scalar(@zombie_funnel_branches) ) { + my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache}; + if( scalar(@zombie_funnel_dataflow_rule_ids) ) { $job->transient_error(0); - die "There are cached semaphored fans for which a funnel job (branch ".join(',',@zombie_funnel_branches).") has never been dataflown"; + die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown"; } $job->query_count($self->queen->dbc->query_count); diff --git a/sql/patch_2011-11-29.sql b/sql/patch_2011-11-29.sql new file mode 100644 index 0000000000000000000000000000000000000000..17336d715348a0cb50102dc590d5ed679a16b0bc --- /dev/null +++ b/sql/patch_2011-11-29.sql @@ -0,0 +1,8 @@ +# Substitute funnel_branch_code column by funnel_dataflow_rule_id column +# +# Please note: this patch will *not* magically convert any data, just patch the schema. +# If you had any semaphored funnels done the old way, you'll have to convert them manually. + +ALTER TABLE dataflow_rule DROP COLUMN funnel_branch_code; + +ALTER TABLE dataflow_rule ADD COLUMN funnel_dataflow_rule_id int(10) unsigned default NULL; diff --git a/sql/tables.sql b/sql/tables.sql index 83b43e2adb271cc953cbff5be090a83ae7186d47..3c3c8e1479f270fa505d32aec567ee6fe0332264 100644 --- a/sql/tables.sql +++ b/sql/tables.sql @@ -147,7 +147,7 @@ CREATE TABLE worker ( -- dataflow_rule_id - internal ID -- from_analysis_id - foreign key to analysis table analysis_id -- branch_code - branch_code of the fan --- funnel_branch_code - branch_code of the semaphored funnel (is NULL by default, which means dataflow is not semaphored) +-- funnel_dataflow_fule_id - dataflow_rule_id of the semaphored funnel (is NULL by default, which means dataflow is not semaphored) -- to_analysis_url - foreign key to net distributed analysis logic_name reference -- input_id_template - a template for generating a new input_id (not necessarily a hashref) in this dataflow; if undefined is kept original @@ -155,12 +155,12 @@ CREATE TABLE dataflow_rule ( dataflow_rule_id int(10) unsigned NOT NULL AUTO_INCREMENT, from_analysis_id int(10) unsigned NOT NULL, branch_code int(10) default 1 NOT NULL, - funnel_branch_code int(10) default NULL, + funnel_dataflow_rule_id int(10) unsigned default NULL, to_analysis_url varchar(255) default '' NOT NULL, input_id_template TEXT DEFAULT NULL, PRIMARY KEY (dataflow_rule_id), - UNIQUE KEY (from_analysis_id, to_analysis_url, branch_code, input_id_template(512)) + UNIQUE KEY (from_analysis_id, branch_code, funnel_dataflow_rule_id, to_analysis_url, input_id_template(512)) ) COLLATE=latin1_swedish_ci ENGINE=InnoDB; diff --git a/sql/tables.sqlite b/sql/tables.sqlite index 65227489a8241a51bb41e42f3fabd823e524d856..c5fbb735f1906d98e64929f95839d0f7433d5e05 100644 --- a/sql/tables.sqlite +++ b/sql/tables.sqlite @@ -134,7 +134,7 @@ CREATE INDEX IF NOT EXISTS analysis_id_status_idx ON worker (analysis_id, -- dataflow_rule_id - internal ID -- from_analysis_id - foreign key to analysis table analysis_id -- branch_code - branch_code of the fan --- funnel_branch_code - branch_code of the semaphored funnel (is NULL by default, which means dataflow is not semaphored) +-- funnel_dataflow_rule_id - dataflow_rule_id of the semaphored funnel (is NULL by default, which means dataflow is not semaphored) -- to_analysis_url - foreign key to net distributed analysis logic_name reference -- input_id_template - a template for generating a new input_id (not necessarily a hashref) in this dataflow; if undefined is kept original @@ -143,11 +143,11 @@ CREATE TABLE dataflow_rule ( dataflow_rule_id INTEGER PRIMARY KEY AUTOINCREMENT, from_analysis_id INTEGER NOT NULL, branch_code int(10) default 1 NOT NULL, - funnel_branch_code int(10) default NULL, + funnel_dataflow_rule_id INTEGER default NULL, to_analysis_url varchar(255) default '' NOT NULL, input_id_template TEXT DEFAULT NULL ); -CREATE UNIQUE INDEX IF NOT EXISTS from_to_branch_template_idx ON dataflow_rule (from_analysis_id, to_analysis_url, branch_code, input_id_template); +CREATE UNIQUE INDEX IF NOT EXISTS from_to_branch_template_idx ON dataflow_rule (from_analysis_id, branch_code, funnel_dataflow_rule_id, to_analysis_url, input_id_template); -- ---------------------------------------------------------------------------------