Commit 4ce3539f authored by Leo Gordon's avatar Leo Gordon
Browse files

An extension to the dataflow-rule-driven semaphores ('2->A', '3->A' and 'A->1' notation)

parent 8ff11761
......@@ -271,7 +271,9 @@ sub dataflow_output_id {
$self->autoflow(0) if($branch_code == 1);
my @output_job_ids = ();
foreach my $rule (@{ $self->dataflow_rules( $branch_name_or_code ) }) {
# sort rules to make sure the fan rules come before funnel rules for the same branch_code:
foreach my $rule (sort {($b->funnel_dataflow_rule_id||0) <=> ($a->funnel_dataflow_rule_id||0)} @{ $self->dataflow_rules( $branch_code ) }) {
# parameter substitution into input_id_template is rule-specific
my $output_ids_for_this_rule;
......@@ -289,21 +291,21 @@ sub dataflow_output_id {
} else {
if(my $funnel_branch_code = $rule->funnel_branch_code()) { # a semaphored fan: they will have to wait in cache until the funnel is created
if(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) { # a semaphored fan: they will have to wait in cache until the funnel is created
my $fan_cache_this_branch = $self->fan_cache()->{$funnel_branch_code} ||= [];
my $fan_cache_this_branch = $self->fan_cache()->{$funnel_dataflow_rule_id} ||= [];
push @$fan_cache_this_branch, map { [$_, $target_analysis_or_table] } @$output_ids_for_this_rule;
} else {
my $fan_cache = $self->fan_cache()->{$branch_code};
my $fan_cache = delete $self->fan_cache()->{$rule->dbID}; # clear the cache at the same time
if($fan_cache && @$fan_cache) { # a semaphored funnel
my $funnel_job_id;
if( (my $funnel_job_number = scalar(@$output_ids_for_this_rule)) !=1 ) {
if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
$self->transient_error(0);
die "Asked to dataflow into $funnel_job_number funnel jobs instead of 1";
die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
} elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( # if a semaphored funnel job creation succeeded,
-input_id => $output_ids_for_this_rule->[0],
......@@ -328,8 +330,6 @@ sub dataflow_output_id {
die "Could not create a funnel job";
}
delete $self->fan_cache()->{$branch_code}; # clear the cache
} else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
foreach my $output_id ( @$output_ids_for_this_rule ) {
......
......@@ -112,7 +112,7 @@ sub fetch_all_by_from_analysis_id_and_branch_code {
=cut
sub create_rule {
my ($self, $from_analysis, $to_analysis_or_url, $branch_name_or_code, $input_id_template, $funnel_branch_name_or_code) = @_;
my ($self, $from_analysis, $to_analysis_or_url, $branch_name_or_code, $input_id_template, $funnel_dataflow_rule_id) = @_;
return unless($from_analysis and $to_analysis_or_url);
......@@ -125,7 +125,7 @@ sub create_rule {
-branch_code => $self->branch_name_2_code($branch_name_or_code),
-input_id_template => (ref($input_id_template) ? stringify($input_id_template) : $input_id_template),
-funnel_branch_code => $self->branch_name_2_code($funnel_branch_name_or_code, 1),
-funnel_dataflow_rule_id => $funnel_dataflow_rule_id,
);
return $self->store($rule, 1); # avoid redundancy
......
......@@ -23,7 +23,7 @@
dataflow_rule_id int(10) unsigned NOT NULL AUTO_INCREMENT,
from_analysis_id int(10) unsigned NOT NULL,
branch_code int(10) default 1 NOT NULL,
funnel_branch_code int(10) default NULL,
funnel_dataflow_rule_id int(10) unsigned default NULL,
to_analysis_url varchar(255) default '' NOT NULL,
input_id_template TEXT DEFAULT NULL,
......@@ -67,8 +67,8 @@ sub new {
my $class = shift @_;
my $self = bless {}, $class;
my ( $dbID, $adaptor, $fromAnalysis, $toAnalysis, $from_analysis_id, $branch_code, $funnel_branch_code, $to_analysis_url, $input_id_template ) =
rearrange( [ qw (DBID ADAPTOR FROM_ANALYSIS TO_ANALYSIS FROM_ANALYSIS_ID BRANCH_CODE FUNNEL_BRANCH_CODE TO_ANALYSIS_URL INPUT_ID_TEMPLATE) ], @_ );
my ( $dbID, $adaptor, $fromAnalysis, $toAnalysis, $from_analysis_id, $branch_code, $funnel_dataflow_rule_id, $to_analysis_url, $input_id_template ) =
rearrange( [ qw (DBID ADAPTOR FROM_ANALYSIS TO_ANALYSIS FROM_ANALYSIS_ID BRANCH_CODE FUNNEL_DATAFLOW_RULE_ID TO_ANALYSIS_URL INPUT_ID_TEMPLATE) ], @_ );
# database persistence:
$self->dbID( $dbID ) if(defined($dbID));
......@@ -82,7 +82,7 @@ sub new {
$self->from_analysis_id($from_analysis_id) if(defined($from_analysis_id));
$self->to_analysis_url($to_analysis_url) if(defined($to_analysis_url));
$self->branch_code($branch_code) if(defined($branch_code));
$self->funnel_branch_code($funnel_branch_code) if(defined($funnel_branch_code));
$self->funnel_dataflow_rule_id($funnel_dataflow_rule_id) if(defined($funnel_dataflow_rule_id));
$self->input_id_template($input_id_template) if(defined($input_id_template));
return $self;
......@@ -133,19 +133,19 @@ sub branch_code {
return $self->{'_branch_code'};
}
=head2 funnel_branch_code
=head2 funnel_dataflow_rule_id
Function: getter/setter method for the funnel_branch_code of the dataflow rule
Function: getter/setter method for the funnel_dataflow_rule_id of the dataflow rule
=cut
sub funnel_branch_code {
sub funnel_dataflow_rule_id {
my $self = shift @_;
if(@_) { # setter mode
$self->{'_funnel_branch_code'} = shift @_;
$self->{'_funnel_dataflow_rule_id'} = shift @_;
}
return $self->{'_funnel_branch_code'};
return $self->{'_funnel_dataflow_rule_id'};
}
=head2 input_id_template
......
......@@ -440,12 +440,37 @@ sub run {
$flow_into ||= {};
$flow_into = { 1 => $flow_into } unless(ref($flow_into) eq 'HASH'); # force non-hash into a hash
foreach my $branch_tag (keys %$flow_into) {
my ($branch_name_or_code, $funnel_branch_name_or_code) = split(/:/, $branch_tag);
my $heirs = $flow_into->{$branch_tag};
my %group_tag_to_funnel_dataflow_rule_id = ();
$heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first
my $semaphore_sign = '->';
my @all_branch_tags = keys %$flow_into;
foreach my $branch_tag ((grep {/^[A-Z]$semaphore_sign/} @all_branch_tags), (grep {/$semaphore_sign[A-Z]$/} @all_branch_tags), (grep {!/$semaphore_sign/} @all_branch_tags)) {
my ($branch_name_or_code, $group_role, $group_tag);
if($branch_tag=~/^([A-Z])$semaphore_sign(-?\w+)$/) {
($branch_name_or_code, $group_role, $group_tag) = ($2, 'funnel', $1);
} elsif($branch_tag=~/^(-?\w+)$semaphore_sign([A-Z])$/) {
($branch_name_or_code, $group_role, $group_tag) = ($1, 'fan', $2);
} elsif($branch_tag=~/^(-?\w+)$/) {
($branch_name_or_code, $group_role, $group_tag) = ($1, '');
} elsif($branch_tag=~/:/) {
die "Please use newer '2${semaphore_sign}A' and 'A${semaphore_sign}1' notation instead of '2:1' and '1'\n";
} else {
die "Error parsing the group tag '$branch_tag'\n";
}
my $funnel_dataflow_rule_id = undef; # NULL by default
if($group_role eq 'fan') {
unless($funnel_dataflow_rule_id = $group_tag_to_funnel_dataflow_rule_id{$group_tag}) {
die "No funnel dataflow_rule defined for group '$group_tag'\n";
}
}
my $heirs = $flow_into->{$branch_tag};
$heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first
$heirs = { map { ($_ => undef) } @$heirs } if(ref($heirs) eq 'ARRAY'); # now force it into a hash if it wasn't
while(my ($heir_url, $input_id_template_list) = each %$heirs) {
......@@ -456,7 +481,15 @@ sub run {
my $heir_analysis = $analysis_adaptor->fetch_by_logic_name_or_url($heir_url);
$dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis || $heir_url, $branch_name_or_code, $input_id_template, $funnel_branch_name_or_code);
my $rule = $dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis || $heir_url, $branch_name_or_code, $input_id_template, $funnel_dataflow_rule_id);
if($group_role eq 'funnel') {
if($group_tag_to_funnel_dataflow_rule_id{$group_tag}) {
die "More than one funnel dataflow_rule defined for group '$group_tag'\n";
} else {
$group_tag_to_funnel_dataflow_rule_id{$group_tag} = $rule->dbID();
}
}
warn "DataFlow rule: [$branch_tag] $logic_name -> $heir_url"
.($input_id_template ? ' WITH TEMPLATE: '.stringify($input_id_template) : '')."\n";
......
......@@ -112,8 +112,8 @@ sub pipeline_create_commands {
All 'add_together' jobs will wait for completion of 'part_multiply' jobs before their own execution (to ensure all data is available).
There are two control modes in this pipeline:
A. The default mode is to use the '2' dataflow rule from 'start' analysis and a -wait_for rule in 'add_together' analysis for analysis-wide synchronization.
B. The semaphored mode is to use '2:1' semaphored dataflow rule from 'start' instead, and comment out the analysis-wide -wait_for rule, relying on semaphores.
A. The default mode is to use the '2' and '1' dataflow rules from 'start' analysis and a -wait_for rule in 'add_together' analysis for analysis-wide synchronization.
B. The semaphored mode is to use '2->A' and 'A->1' semaphored dataflow rules from 'start' instead, and comment out the analysis-wide -wait_for rule, relying on semaphores.
=cut
......@@ -129,8 +129,9 @@ sub pipeline_analyses {
],
-flow_into => {
2 => [ 'part_multiply' ], # will create a fan of jobs
# '2:1' => [ 'part_multiply' ], # will create a semaphored fan of jobs (comment out the -wait_for rule from 'add_together')
1 => [ 'add_together' ], # will create a funnel job to wait for the fan to complete and add the results
# '2->A' => [ 'part_multiply' ], # will create a semaphored fan of jobs (comment out the -wait_for rule from 'add_together')
# 'A->1' => [ 'add_together' ], # will create a semaphored funnel job to wait for the fan to complete and add the results
},
},
......
......@@ -30,7 +30,7 @@ $Author: lg4 $
=head1 VERSION
$Revision: 1.7 $
$Revision: 1.8 $
=cut
......@@ -209,9 +209,9 @@ sub _control_rules {
}
sub _midpoint_name {
my $dfr = shift @_;
my $rule_id = shift @_;
return 'dfr_'.$dfr->dbID().'_mp';
return 'dfr_'.$rule_id.'_mp';
}
sub _dataflow_rules {
......@@ -220,38 +220,9 @@ sub _dataflow_rules {
my $config = $self->config()->{Colours}->{Flows};
my $dataflow_rules = $self->dba()->get_DataflowRuleAdaptor()->fetch_all();
my %funnel_to_fan_list = ();
# pre-create midpoint nodes
foreach my $rule (@{$dataflow_rules}) {
my $from_analysis_id = $rule->from_analysis_id();
my $funnel_branch_code = $rule->funnel_branch_code();
my $midpoint_name = _midpoint_name($rule);
$graph->add_node(
$midpoint_name,
label => '',
defined($funnel_branch_code)
? (
shape => 'circle',
fixedsize => 1,
width => 0.1,
height => 0.1,
) : (
shape => 'point',
fixedsize => 1,
width => 0.01,
height => 0.01,
),
color => $config->{data},
);
if($funnel_branch_code) {
push @{$funnel_to_fan_list{$from_analysis_id}{$funnel_branch_code}}, $midpoint_name;
}
}
foreach my $rule (@{$dataflow_rules}) {
my ($from_analysis_id, $branch_code, $funnel_branch_code, $to) = ($rule->from_analysis_id(), $rule->branch_code(), $rule->funnel_branch_code(), $rule->to_analysis());
my ($from_analysis_id, $branch_code, $funnel_dataflow_rule_id, $to) = ($rule->from_analysis_id(), $rule->branch_code(), $rule->funnel_dataflow_rule_id(), $rule->to_analysis());
my $to_node;
#If we've been told to flow from an analysis to a table or external source we need
......@@ -266,7 +237,7 @@ sub _dataflow_rules {
next;
}
my $midpoint_name = _midpoint_name($rule);
my $midpoint_name = _midpoint_name($rule->dbID);
$graph->add_edge($from_analysis_id => $midpoint_name,
color => $config->{data},
......@@ -274,18 +245,33 @@ sub _dataflow_rules {
label => '#'.$branch_code,
fontname => $self->config()->{Fonts}->{edge},
);
$graph->add_node(
$midpoint_name,
label => '',
defined($funnel_dataflow_rule_id)
? (
shape => 'circle',
fixedsize => 1,
width => 0.1,
height => 0.1,
) : (
shape => 'point',
fixedsize => 1,
width => 0.01,
height => 0.01,
),
color => $config->{data},
);
$graph->add_edge($midpoint_name => $to_node,
color => $config->{data},
);
if($funnel_to_fan_list{$from_analysis_id}{$branch_code}) {
foreach my $fan_midpoint (@{$funnel_to_fan_list{$from_analysis_id}{$branch_code}}) {
$graph->add_edge($fan_midpoint => $midpoint_name,
if($funnel_dataflow_rule_id) {
$graph->add_edge( $midpoint_name => _midpoint_name($funnel_dataflow_rule_id),
color => $config->{semablock},
fontname => $self->config()->{Fonts}->{edge},
style => 'dashed',
arrowhead => 'tee',
);
}
);
}
}
}
......
......@@ -685,10 +685,10 @@ sub run_module_with_job {
print("\n!!! *no* WRITE_OUTPUT and *no* AUTOFLOW\n") if($self->debug);
}
my @zombie_funnel_branches = keys %{$job->fan_cache};
if( scalar(@zombie_funnel_branches) ) {
my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
$job->transient_error(0);
die "There are cached semaphored fans for which a funnel job (branch ".join(',',@zombie_funnel_branches).") has never been dataflown";
die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
}
$job->query_count($self->queen->dbc->query_count);
......
# Substitute funnel_branch_code column by funnel_dataflow_rule_id column
#
# Please note: this patch will *not* magically convert any data, just patch the schema.
# If you had any semaphored funnels done the old way, you'll have to convert them manually.
ALTER TABLE dataflow_rule DROP COLUMN funnel_branch_code;
ALTER TABLE dataflow_rule ADD COLUMN funnel_dataflow_rule_id int(10) unsigned default NULL;
......@@ -147,7 +147,7 @@ CREATE TABLE worker (
-- dataflow_rule_id - internal ID
-- from_analysis_id - foreign key to analysis table analysis_id
-- branch_code - branch_code of the fan
-- funnel_branch_code - branch_code of the semaphored funnel (is NULL by default, which means dataflow is not semaphored)
-- funnel_dataflow_fule_id - dataflow_rule_id of the semaphored funnel (is NULL by default, which means dataflow is not semaphored)
-- to_analysis_url - foreign key to net distributed analysis logic_name reference
-- input_id_template - a template for generating a new input_id (not necessarily a hashref) in this dataflow; if undefined is kept original
......@@ -155,12 +155,12 @@ CREATE TABLE dataflow_rule (
dataflow_rule_id int(10) unsigned NOT NULL AUTO_INCREMENT,
from_analysis_id int(10) unsigned NOT NULL,
branch_code int(10) default 1 NOT NULL,
funnel_branch_code int(10) default NULL,
funnel_dataflow_rule_id int(10) unsigned default NULL,
to_analysis_url varchar(255) default '' NOT NULL,
input_id_template TEXT DEFAULT NULL,
PRIMARY KEY (dataflow_rule_id),
UNIQUE KEY (from_analysis_id, to_analysis_url, branch_code, input_id_template(512))
UNIQUE KEY (from_analysis_id, branch_code, funnel_dataflow_rule_id, to_analysis_url, input_id_template(512))
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......
......@@ -134,7 +134,7 @@ CREATE INDEX IF NOT EXISTS analysis_id_status_idx ON worker (analysis_id,
-- dataflow_rule_id - internal ID
-- from_analysis_id - foreign key to analysis table analysis_id
-- branch_code - branch_code of the fan
-- funnel_branch_code - branch_code of the semaphored funnel (is NULL by default, which means dataflow is not semaphored)
-- funnel_dataflow_rule_id - dataflow_rule_id of the semaphored funnel (is NULL by default, which means dataflow is not semaphored)
-- to_analysis_url - foreign key to net distributed analysis logic_name reference
-- input_id_template - a template for generating a new input_id (not necessarily a hashref) in this dataflow; if undefined is kept original
......@@ -143,11 +143,11 @@ CREATE TABLE dataflow_rule (
dataflow_rule_id INTEGER PRIMARY KEY AUTOINCREMENT,
from_analysis_id INTEGER NOT NULL,
branch_code int(10) default 1 NOT NULL,
funnel_branch_code int(10) default NULL,
funnel_dataflow_rule_id INTEGER default NULL,
to_analysis_url varchar(255) default '' NOT NULL,
input_id_template TEXT DEFAULT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS from_to_branch_template_idx ON dataflow_rule (from_analysis_id, to_analysis_url, branch_code, input_id_template);
CREATE UNIQUE INDEX IF NOT EXISTS from_to_branch_template_idx ON dataflow_rule (from_analysis_id, branch_code, funnel_dataflow_rule_id, to_analysis_url, input_id_template);
-- ---------------------------------------------------------------------------------
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment