Commit 19dd6b12 authored by Leo Gordon's avatar Leo Gordon
Browse files

mnemonic branch_names (slightly tested), 'ANYFAILURE' dataflow (not yet tested)

parent 932a8c80
......@@ -201,30 +201,40 @@ sub incomplete { # Job should set this to 0 prior to throwing if the
##-----------------[/indicators to the Worker]-------------------------------
=head2 warning
Description: records a non-error message in 'job_message' table linked to the current job
=cut
sub warning {
my ($self, $msg) = @_;
$self->adaptor->db->get_JobMessageAdaptor()->register_message($self->dbID, $msg, 0);
}
=head2 dataflow_output_id
Title : dataflow_output_id
Arg[1](req) : <string> $output_id
Arg[2](opt) : <int> $branch_code (optional, defaults to 1)
Arg[2](opt) : <int> $branch_name_or_code (optional, defaults to 1)
Arg[3](opt) : <hashref> $create_job_options (optional, defaults to {}, options added to the CreateNewJob method)
Usage : $self->dataflow_output_id($output_id, $branch_code);
Usage : $self->dataflow_output_id($output_id, $branch_name_or_code);
Function:
If a RunnableDB(Process) needs to create jobs, this allows it to have jobs
created and flowed through the dataflow rules of the workflow graph.
This 'output_id' becomes the 'input_id' of the newly created job at
the ends of the dataflow pipes. The optional 'branch_code' determines
the ends of the dataflow pipes. The optional 'branch_name_or_code' determines
which dataflow pipe(s) to flow the job through.
=cut
sub dataflow_output_id {
my ($self, $output_ids, $branch_code, $create_job_options) = @_;
my ($self, $output_ids, $branch_name_or_code, $create_job_options) = @_;
$output_ids ||= [ $self->input_id() ]; # replicate the input_id in the branch_code's output by default
$output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY'); # force previously used single values into an arrayref
$branch_code ||= 1; # default branch_code is 1
$create_job_options ||= {}; # { -block => 1 } or { -semaphore_count => scalar(@fan_job_ids) } or { -semaphored_job_id => $funnel_job_id }
# this tricky code is responsible for correct propagation of semaphores down the dataflow pipes:
......@@ -233,11 +243,13 @@ sub dataflow_output_id {
# However if nothing is supplied, semaphored_job_id will be propagated from the parent job:
my $semaphored_job_id = $create_job_options->{'-semaphored_job_id'} ||= $self->semaphored_job_id();
my $dataflow_rule_adaptor = $self->adaptor->db->get_DataflowRuleAdaptor;
# if branch_code is set to 1 (explicitly or impliticly), turn off automatic dataflow:
$self->autoflow(0) if($branch_code==1);
$self->autoflow(0) if($dataflow_rule_adaptor->branch_name_2_code($branch_name_or_code)==1);
my @output_job_ids = ();
my $rules = $self->adaptor->db->get_DataflowRuleAdaptor->fetch_from_analysis_id_branch_code($self->analysis_id, $branch_code);
my $rules = $dataflow_rule_adaptor->fetch_from_analysis_id_branch_code($self->analysis_id, $branch_name_or_code);
foreach my $rule (@$rules) {
my $output_ids_for_this_rule;
......@@ -275,6 +287,7 @@ sub dataflow_output_id {
}
}
}
return \@output_job_ids;
}
......
......@@ -577,18 +577,16 @@ sub release_undone_jobs_from_worker {
my $passed_on = 0; # the flag indicating that the garbage_collection was attempted and was successful
if( $resource_overusage ) {
my $branch_code = {
'MEMLIMIT' => '-1',
'RUNLIMIT' => '-2',
}->{$cod};
$passed_on = $self->gc_dataflow( $worker->analysis->dbID(), $job_id, $branch_code );
if($passed_on = $self->gc_dataflow( $worker->analysis->dbID(), $job_id, $cod )) {
$msg .= ', performing gc_dataflow';
}
}
if($passed_on) {
$msg .= ', performing gc_dataflow';
unless($passed_on) {
if($passed_on = $self->gc_dataflow( $worker->analysis->dbID(), $job_id, 'ANYFAILURE' )) {
$msg .= ", performing 'ANYFAILURE' gc_dataflow";
}
}
$self->db()->get_JobMessageAdaptor()->register_message($job_id, $msg, not $passed_on );
unless($passed_on) {
......@@ -623,9 +621,9 @@ sub release_and_age_job {
=cut
sub gc_dataflow {
my ($self, $analysis_id, $job_id, $branch_code) = @_;
my ($self, $analysis_id, $job_id, $branch_name) = @_;
unless(@{ $self->db->get_DataflowRuleAdaptor->fetch_from_analysis_id_branch_code($analysis_id, $branch_code) }) {
unless(@{ $self->db->get_DataflowRuleAdaptor->fetch_from_analysis_id_branch_code($analysis_id, $branch_name) }) {
return 0; # no corresponding gc_dataflow rule has been defined
}
......@@ -633,7 +631,7 @@ sub gc_dataflow {
$job->param_init( 0, $job->input_id() ); # input_id_templates still supported, however to a limited extent
$job->dataflow_output_id( $job->input_id() , $branch_code );
$job->dataflow_output_id( $job->input_id() , $branch_name );
$job->update_status('PASSED_ON');
......
......@@ -46,6 +46,27 @@ use Bio::EnsEMBL::Hive::Utils ('stringify'); # import 'stringify()'
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
=head2 branch_name_2_code
Description: encodes a branch mnemonic name into numeric code
=cut
sub branch_name_2_code {
my $branch_name_or_code = pop @_; # NB: we take the *last* arg, so it works both as a method and a subroutine
$branch_name_or_code=1 unless(defined($branch_name_or_code));
return ($branch_name_or_code=~/^\-?\d+$/)
? $branch_name_or_code
: {
'MAIN' => 1,
'ANYFAILURE' => 0,
'MEMLIMIT' => -1,
'RUNLIMIT' => -2,
}->{$branch_name_or_code} || die "Could not map the branch_name '$branch_name_or_code' to the internal code";
}
=head2 fetch_from_analysis_id_branch_code
......@@ -60,10 +81,11 @@ use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
=cut
sub fetch_from_analysis_id_branch_code {
my ($self, $analysis_id, $branch_code) = @_;
my ($self, $analysis_id, $branch_name_or_code) = @_;
return [] unless($analysis_id);
$branch_code ||= 1;
my $branch_code = $self->branch_name_2_code($branch_name_or_code);
my $constraint = "r.from_analysis_id=${analysis_id} AND r.branch_code=${branch_code}";
......@@ -181,7 +203,7 @@ sub remove {
=cut
sub create_rule {
my ($self, $from_analysis, $to_analysis_or_url, $branch_code, $input_id_template) = @_;
my ($self, $from_analysis, $to_analysis_or_url, $branch_name_or_code, $input_id_template) = @_;
return unless($from_analysis and $to_analysis_or_url);
......@@ -192,7 +214,7 @@ sub create_rule {
? ( -to_analysis => $to_analysis_or_url )
: ( -to_analysis_url => $to_analysis_or_url ),
-branch_code => $branch_code,
-branch_code => $self->branch_name_2_code($branch_name_or_code),
-input_id_template => $input_id_template,
);
......
......@@ -128,7 +128,7 @@ sub branch_code {
if(@_) { # setter mode
$self->{'_branch_code'} = shift @_;
}
return ($self->{'_branch_code'} ||= 1);
return $self->{'_branch_code'};
}
=head2 input_id_template
......
......@@ -431,8 +431,8 @@ sub run {
$flow_into ||= {};
$flow_into = { 1 => $flow_into } unless(ref($flow_into) eq 'HASH'); # force non-hash into a hash
foreach my $branch_code (sort {$a <=> $b} keys %$flow_into) {
my $heirs = $flow_into->{$branch_code};
foreach my $branch_name_or_code (keys %$flow_into) {
my $heirs = $flow_into->{$branch_name_or_code};
$heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first
......@@ -442,10 +442,10 @@ sub run {
my $heir_analysis = $analysis_adaptor->fetch_by_logic_name_or_url($heir_url);
my $new_dfr = $dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis || $heir_url, $branch_code, $input_id_template);
my $new_dfr = $dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis || $heir_url, $branch_name_or_code, $input_id_template);
my $dfr_action = $new_dfr ? 'Created a new' : 'Found an existing';
warn "$dfr_action DataFlow rule: [$branch_code] $logic_name -> $heir_url"
warn "$dfr_action DataFlow rule: [$branch_name_or_code] $logic_name -> $heir_url"
.($input_id_template ? ' WITH TEMPLATE: '.stringify($input_id_template) : '')."\n";
}
}
......
......@@ -131,7 +131,7 @@ sub pipeline_analyses {
],
-flow_into => {
2 => [ 'part_multiply' ], # will create a fan of jobs
1 => [ 'add_together' ], # will create a funnel job to wait for the fan to complete and add the results
'MAIN' => [ 'add_together' ], # will create a funnel job to wait for the fan to complete and add the results
},
},
......@@ -143,7 +143,7 @@ sub pipeline_analyses {
# (jobs for this analysis will be flown_into via branch-2 from 'start' jobs above)
],
-flow_into => {
1 => [ 'mysql:////intermediate_result' ],
'MAIN' => [ 'mysql:////intermediate_result' ],
},
},
......@@ -155,7 +155,7 @@ sub pipeline_analyses {
],
-wait_for => [ 'part_multiply' ], # we can only start adding when all partial products have been computed
-flow_into => {
1 => [ 'mysql:////final_result' ],
'MAIN' => [ 'mysql:////final_result' ],
},
},
];
......
......@@ -319,6 +319,12 @@ sub param_substitute {
return $self->input_job->param_substitute(@_);
}
sub warning {
my $self = shift @_;
return $self->input_job->warning(@_);
}
sub dataflow_output_id {
my $self = shift @_;
......
......@@ -84,7 +84,7 @@ sub write_output { # store and dataflow
'a_multiplier' => $self->param('a_multiplier'),
'b_multiplier' => $self->param('b_multiplier'),
'result' => $self->param('result'),
}, 1);
}, 'MAIN');
}
=head2 _add_together
......
......@@ -67,7 +67,7 @@ sub write_output { # but this time we have something to store
'a_multiplier' => $self->param('a_multiplier'),
'digit' => $self->param('digit'),
'result' => $self->param('result')
}, 1);
}, 'MAIN');
}
=head2 _rec_multiply
......
......@@ -82,10 +82,10 @@ sub write_output { # nothing to write out, but some dataflow to perform:
my $output_ids = $self->param('output_ids');
# first we flow the branch-1 into the (semaphored) funnel job:
my ($funnel_job_id) = @{ $self->dataflow_output_id($self->input_id, 1, { -semaphore_count => scalar(@$output_ids) }) };
# first we flow the MAIN branch into the (semaphored) funnel job:
my ($funnel_job_id) = @{ $self->dataflow_output_id($self->input_id, 'MAIN', { -semaphore_count => scalar(@$output_ids) }) };
# then we fan out into branch-2, and pass the $funnel_job_id to all of them
# then we fan out into branch#2, and pass the $funnel_job_id to all of them
my $fan_job_ids = $self->dataflow_output_id($output_ids, 2, { -semaphored_job_id => $funnel_job_id } );
}
......
......@@ -81,11 +81,11 @@ sub write_output { # nothing to write out, but some dataflow to perform:
my $output_ids = $self->param('output_ids');
# "fan out" into branch-2 first
# "fan out" into branch#2 first
$self->dataflow_output_id($output_ids, 2);
# then flow into the branch-1 funnel; input_id would flow into branch_1 by default anyway, but we request it here explicitly:
$self->dataflow_output_id($self->input_id, 1);
# then flow into the MAIN branch funnel; input_id would flow into MAIN branch by default anyway, but we request it here explicitly:
$self->dataflow_output_id($self->input_id, 'MAIN');
}
1;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment