Commit 486e481c authored by Leo Gordon's avatar Leo Gordon
Browse files

integrated semaphored fans/funnels into dataflow rules

parent 19897cc0
......@@ -232,12 +232,18 @@ sub warning {
}
}
sub fan_cache { # a self-initializing getter (no setting)
# Returns a hash-of-lists { 2 => [list of jobs waiting to be funneled into 2], 3 => [list of jobs waiting to be funneled into 3], etc}
my $self = shift;
return $self->{'_fan_cache'} ||= {};
}
=head2 dataflow_output_id
Title : dataflow_output_id
Arg[1](req) : <string> $output_id
Arg[2](opt) : <int> $branch_name_or_code (optional, defaults to 1)
Arg[3](opt) : <hashref> $create_job_options (optional, defaults to {}, options added to the CreateNewJob method)
Usage : $self->dataflow_output_id($output_id, $branch_name_or_code);
Function:
If a RunnableDB(Process) needs to create jobs, this allows it to have jobs
......@@ -254,13 +260,9 @@ sub dataflow_output_id {
$output_ids ||= [ $self->input_id() ]; # replicate the input_id in the branch_code's output by default
$output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY'); # force previously used single values into an arrayref
$create_job_options ||= {}; # { -block => 1 } or { -semaphore_count => scalar(@fan_job_ids) } or { -semaphored_job_id => $funnel_job_id }
# this tricky code is responsible for correct propagation of semaphores down the dataflow pipes:
my $propagate_semaphore = not exists ($create_job_options->{'-semaphored_job_id'}); # CONVENTION: if zero is explicitly supplied, it is a request not to propagate
# However if nothing is supplied, semaphored_job_id will be propagated from the parent job:
my $semaphored_job_id = $create_job_options->{'-semaphored_job_id'} ||= $self->semaphored_job_id();
if($create_job_options) {
die "Please consider configuring semaphored dataflow from PipeConfig rather than setting it up manually";
}
# map branch names to numbers:
my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);
......@@ -271,6 +273,7 @@ sub dataflow_output_id {
my @output_job_ids = ();
foreach my $rule (@{ $self->dataflow_rules( $branch_name_or_code ) }) {
# parameter substitution into input_id_template is rule-specific
my $output_ids_for_this_rule;
if(my $template = $rule->input_id_template()) {
$output_ids_for_this_rule = [ eval $self->param_substitute($template) ];
......@@ -286,23 +289,62 @@ sub dataflow_output_id {
} else {
foreach my $output_id ( @$output_ids_for_this_rule ) {
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-analysis => $target_analysis_or_table,
-input_job_id => $self->dbID, # creator_job's id
%$create_job_options
)) {
if($semaphored_job_id and $propagate_semaphore) {
$self->adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id ); # propagate the semaphore
if(my $funnel_branch_code = $rule->funnel_branch_code()) { # a semaphored fan: they will have to wait in cache until the funnel is created
my $fan_cache_this_branch = $self->fan_cache()->{$funnel_branch_code} ||= [];
push @$fan_cache_this_branch, map { [$_, $target_analysis_or_table] } @$output_ids_for_this_rule;
} else {
my $fan_cache = $self->fan_cache()->{$branch_code};
if($fan_cache && @$fan_cache) { # a semaphored funnel
my $funnel_job_id;
if( (my $funnel_job_number = scalar(@$output_ids_for_this_rule)) !=1 ) {
$self->transient_error(0);
die "Asked to dataflow into $funnel_job_number funnel jobs instead of 1";
} elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob( # if a semaphored funnel job creation succeeded,
-input_id => $output_ids_for_this_rule->[0],
-analysis => $target_analysis_or_table,
-prev_job => $self,
-semaphore_count => scalar(@$fan_cache),
)) { # then create the fan out of the cache:
push @output_job_ids, $funnel_job_id;
foreach my $pair ( @$fan_cache ) {
my ($output_id, $fan_analysis) = @$pair;
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-analysis => $fan_analysis,
-prev_job => $self,
-semaphored_job_id => $funnel_job_id, # by passing this parameter we request not to propagate semaphores
)) {
push @output_job_ids, $job_id;
}
}
} else {
die "Could not create a funnel job";
}
# only add the ones that were indeed created:
push @output_job_ids, $job_id;
} elsif($semaphored_job_id and !$propagate_semaphore) {
$self->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # if we didn't succeed in creating the job, fix the semaphore
delete $self->fan_cache()->{$branch_code}; # clear the cache
} else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
foreach my $output_id ( @$output_ids_for_this_rule ) {
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-analysis => $target_analysis_or_table,
-prev_job => $self,
)) {
# only add the ones that were indeed created:
push @output_job_ids, $job_id;
}
}
}
}
}
}
......
......@@ -80,8 +80,8 @@ sub CreateNewJob {
return undef unless(scalar @args);
my ($input_id, $analysis, $prev_job_id, $blocked, $semaphore_count, $semaphored_job_id) =
rearrange([qw(INPUT_ID ANALYSIS INPUT_JOB_ID BLOCK SEMAPHORE_COUNT SEMAPHORED_JOB_ID)], @args);
my ($input_id, $analysis, $prev_job, $prev_job_id, $blocked, $semaphore_count, $semaphored_job_id) =
rearrange([qw(INPUT_ID ANALYSIS PREV_JOB INPUT_JOB_ID BLOCK SEMAPHORE_COUNT SEMAPHORED_JOB_ID)], @args);
throw("must define input_id") unless($input_id);
throw("must define analysis") unless($analysis);
......@@ -89,6 +89,15 @@ sub CreateNewJob {
unless($analysis->isa('Bio::EnsEMBL::Analysis'));
throw("analysis must have adaptor connected to database")
unless($analysis->adaptor and $analysis->adaptor->db);
throw("Please specify prev_job object instead of input_job_id if available") if ($prev_job_id); # 'obsolete' message
$prev_job_id = $prev_job && $prev_job->dbID();
# if the user did not specifically ask for a new fan, consider propagation:
my $propagate_semaphore = !defined($semaphored_job_id);
# if nothing is supplied, semaphored_job_id will be propagated from the parent job:
$semaphored_job_id ||= $prev_job && $prev_job->semaphored_job_id();
if(ref($input_id)) { # let's do the Perl hash stringification centrally rather than in many places:
$input_id = stringify($input_id);
......@@ -109,20 +118,33 @@ sub CreateNewJob {
(input_id, prev_job_id,analysis_id,status,semaphore_count,semaphored_job_id)
VALUES (?,?,?,?,?,?)};
my $sth = $dbc->prepare($sql);
my $sth = $dbc->prepare($sql);
my @values = ($input_id, $prev_job_id, $analysis_id, $status, $semaphore_count || 0, $semaphored_job_id);
$sth->execute($input_id, $prev_job_id, $analysis_id, $status, $semaphore_count || 0, $semaphored_job_id);
my $job_id = $dbc->db_handle->last_insert_id(undef, undef, 'job', 'job_id');
$sth->finish;
my $return_code = $sth->execute(@values)
# using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
or die "Coule not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
unless($dba->hive_use_triggers()) {
$dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
,unclaimed_job_count=unclaimed_job_count+1
,status = (CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END)
WHERE analysis_id=$analysis_id
});
my $job_id;
if($return_code > 0) { # <--- for the same reason we have to be expliticly numeric here:
$job_id = $dbc->db_handle->last_insert_id(undef, undef, 'job', 'job_id');
$sth->finish;
if($semaphored_job_id and $propagate_semaphore) { # ready to propagate and something to propagate
$prev_job->adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id ); # propagate the semaphore
}
unless($dba->hive_use_triggers()) {
$dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
,unclaimed_job_count=unclaimed_job_count+1
,status = (CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END)
WHERE analysis_id=$analysis_id
});
}
} elsif($semaphored_job_id and !$propagate_semaphore) { # if we didn't succeed in creating the job, fix the semaphore
$prev_job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );
}
return $job_id;
......
......@@ -56,7 +56,7 @@ Description: encodes a branch mnemonic name into numeric code
sub branch_name_2_code {
my $branch_name_or_code = pop @_; # NB: we take the *last* arg, so it works both as a method and a subroutine
$branch_name_or_code=1 unless(defined($branch_name_or_code));
return unless(defined($branch_name_or_code));
my $branch_code = ($branch_name_or_code=~/^\-?\d+$/)
? $branch_name_or_code
......@@ -109,7 +109,7 @@ sub fetch_all_by_from_analysis_id_and_branch_code {
=cut
sub create_rule {
my ($self, $from_analysis, $to_analysis_or_url, $branch_name_or_code, $input_id_template) = @_;
my ($self, $from_analysis, $to_analysis_or_url, $branch_name_or_code, $input_id_template, $funnel_branch_name_or_code) = @_;
return unless($from_analysis and $to_analysis_or_url);
......@@ -120,8 +120,9 @@ sub create_rule {
? ( -to_analysis => $to_analysis_or_url )
: ( -to_analysis_url => $to_analysis_or_url ),
-branch_code => $self->branch_name_2_code($branch_name_or_code),
-branch_code => defined($branch_name_or_code) ? $self->branch_name_2_code($branch_name_or_code) : 1,
-input_id_template => (ref($input_id_template) ? stringify($input_id_template) : $input_id_template),
-funnel_branch_code => $self->branch_name_2_code($funnel_branch_name_or_code),
);
return $self->store($rule, 1); # avoid redundancy
......
......@@ -20,10 +20,11 @@
A data container object (methods are intelligent getters/setters) that corresponds to a row stored in 'dataflow_rule' table:
CREATE TABLE dataflow_rule (
dataflow_rule_id int(10) unsigned not null auto_increment,
dataflow_rule_id int(10) unsigned NOT NULL AUTO_INCREMENT,
from_analysis_id int(10) unsigned NOT NULL,
to_analysis_url varchar(255) default '' NOT NULL,
branch_code int(10) default 1 NOT NULL,
funnel_branch_code int(10) default NULL,
to_analysis_url varchar(255) default '' NOT NULL,
input_id_template TEXT DEFAULT NULL,
PRIMARY KEY (dataflow_rule_id),
......@@ -66,8 +67,8 @@ sub new {
my $class = shift @_;
my $self = bless {}, $class;
my ( $dbID, $adaptor, $fromAnalysis, $toAnalysis, $from_analysis_id, $to_analysis_url, $branch_code, $input_id_template ) =
rearrange( [ qw (DBID ADAPTOR FROM_ANALYSIS TO_ANALYSIS FROM_ANALYSIS_ID TO_ANALYSIS_URL BRANCH_CODE INPUT_ID_TEMPLATE) ], @_ );
my ( $dbID, $adaptor, $fromAnalysis, $toAnalysis, $from_analysis_id, $branch_code, $funnel_branch_code, $to_analysis_url, $input_id_template ) =
rearrange( [ qw (DBID ADAPTOR FROM_ANALYSIS TO_ANALYSIS FROM_ANALYSIS_ID BRANCH_CODE FUNNEL_BRANCH_CODE TO_ANALYSIS_URL INPUT_ID_TEMPLATE) ], @_ );
# database persistence:
$self->dbID( $dbID ) if(defined($dbID));
......@@ -78,9 +79,10 @@ sub new {
$self->to_analysis( $toAnalysis ) if(defined($toAnalysis));
# simple scalars:
$self->from_analysis_id( $from_analysis_id ) if(defined($from_analysis_id));
$self->to_analysis_url( $to_analysis_url ) if(defined($to_analysis_url));
$self->branch_code( $branch_code ) if(defined($branch_code));
$self->from_analysis_id($from_analysis_id) if(defined($from_analysis_id));
$self->to_analysis_url($to_analysis_url) if(defined($to_analysis_url));
$self->branch_code($branch_code) if(defined($branch_code));
$self->funnel_branch_code($funnel_branch_code) if(defined($funnel_branch_code));
$self->input_id_template($input_id_template) if(defined($input_id_template));
return $self;
......@@ -131,6 +133,21 @@ sub branch_code {
return $self->{'_branch_code'};
}
=head2 funnel_branch_code
Function: getter/setter method for the funnel_branch_code of the dataflow rule
=cut
sub funnel_branch_code {
my $self = shift @_;
if(@_) { # setter mode
$self->{'_funnel_branch_code'} = shift @_;
}
return $self->{'_funnel_branch_code'};
}
=head2 input_id_template
Function: getter/setter method for the input_id_template of the dataflow rule
......
......@@ -440,8 +440,9 @@ sub run {
$flow_into ||= {};
$flow_into = { 1 => $flow_into } unless(ref($flow_into) eq 'HASH'); # force non-hash into a hash
foreach my $branch_name_or_code (keys %$flow_into) {
my $heirs = $flow_into->{$branch_name_or_code};
foreach my $branch_tag (keys %$flow_into) {
my ($branch_name_or_code, $funnel_branch_name_or_code) = split(/:/, $branch_tag);
my $heirs = $flow_into->{$branch_tag};
$heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first
......@@ -451,9 +452,9 @@ sub run {
my $heir_analysis = $analysis_adaptor->fetch_by_logic_name_or_url($heir_url);
$dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis || $heir_url, $branch_name_or_code, $input_id_template);
$dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis || $heir_url, $branch_name_or_code, $input_id_template, $funnel_branch_name_or_code);
warn "DataFlow rule: [$branch_name_or_code] $logic_name -> $heir_url"
warn "DataFlow rule: [$branch_tag] $logic_name -> $heir_url"
.($input_id_template ? ' WITH TEMPLATE: '.stringify($input_id_template) : '')."\n";
}
}
......
......@@ -124,7 +124,7 @@ sub pipeline_analyses {
{ 'a_multiplier' => $self->o('second_mult'), 'b_multiplier' => $self->o('first_mult') },
],
-flow_into => {
2 => [ 'part_multiply' ], # will create a fan of jobs
'2:1' => [ 'part_multiply' ], # will create a fan of jobs
1 => [ 'add_together' ], # will create a funnel job to wait for the fan to complete and add the results
},
},
......@@ -147,7 +147,7 @@ sub pipeline_analyses {
-input_ids => [
# (jobs for this analysis will be flown_into via branch-1 from 'start' jobs above)
],
-wait_for => [ 'part_multiply' ], # we can only start adding when all partial products have been computed
# -wait_for => [ 'part_multiply' ], # we can only start adding when all partial products have been computed
-flow_into => {
1 => [ ':////final_result' ],
},
......
......@@ -685,6 +685,12 @@ sub run_module_with_job {
print("\n!!! *no* WRITE_OUTPUT and *no* AUTOFLOW\n") if($self->debug);
}
my @zombie_funnel_branches = keys %{$job->fan_cache};
if( scalar(@zombie_funnel_branches) ) {
$job->transient_error(0);
die "There are cached semaphored fans for which a funnel job (branch ".join(',',@zombie_funnel_branches).") has never been dataflown";
}
$job->query_count($self->queen->dbc->query_count);
$job->runtime_msec( $job_stopwatch->get_elapsed );
......
# extend the dataflow_rule table to add an optional semaphored funnel branch:
ALTER TABLE dataflow_rule ADD COLUMN funnel_branch_code int(10) default NULL;
......@@ -146,15 +146,17 @@ CREATE TABLE worker (
-- semantics:
-- dataflow_rule_id - internal ID
-- from_analysis_id - foreign key to analysis table analysis_id
-- branch_code - branch_code of the fan
-- funnel_branch_code - branch_code of the semaphored funnel (is NULL by default, which means dataflow is not semaphored)
-- to_analysis_url - foreign key to net distributed analysis logic_name reference
-- branch_code - joined to job.branch_code to allow branching
-- input_id_template - a template for generating a new input_id (not necessarily a hashref) in this dataflow; if undefined is kept original
CREATE TABLE dataflow_rule (
dataflow_rule_id int(10) unsigned NOT NULL AUTO_INCREMENT,
from_analysis_id int(10) unsigned NOT NULL,
to_analysis_url varchar(255) default '' NOT NULL,
branch_code int(10) default 1 NOT NULL,
funnel_branch_code int(10) default NULL,
to_analysis_url varchar(255) default '' NOT NULL,
input_id_template TEXT DEFAULT NULL,
PRIMARY KEY (dataflow_rule_id),
......
......@@ -133,15 +133,18 @@ CREATE INDEX IF NOT EXISTS analysis_id_status_idx ON worker (analysis_id,
-- semantics:
-- dataflow_rule_id - internal ID
-- from_analysis_id - foreign key to analysis table analysis_id
-- branch_code - branch_code of the fan
-- funnel_branch_code - branch_code of the semaphored funnel (is NULL by default, which means dataflow is not semaphored)
-- to_analysis_url - foreign key to net distributed analysis logic_name reference
-- branch_code - joined to job.branch_code to allow branching
-- input_id_template - a template for generating a new input_id (not necessarily a hashref) in this dataflow; if undefined is kept original
CREATE TABLE dataflow_rule (
dataflow_rule_id INTEGER PRIMARY KEY AUTOINCREMENT,
from_analysis_id INTEGER NOT NULL,
to_analysis_url varchar(255) default '' NOT NULL,
branch_code int(10) default 1 NOT NULL,
funnel_branch_code int(10) default NULL,
to_analysis_url varchar(255) default '' NOT NULL,
input_id_template TEXT DEFAULT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS from_to_branch_template_idx ON dataflow_rule (from_analysis_id, to_analysis_url, branch_code, input_id_template);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment