Skip to content
Snippets Groups Projects
Commit e2ac50d8 authored by Leo Gordon's avatar Leo Gordon
Browse files

JobFactory uses $overriding_hash to create jobs/rows from input_id_template;...

JobFactory uses $overriding_hash to create jobs/rows from input_id_template; 'input_id' parameter deprecated; standaloneJob supports templates.
parent ee740713
No related branches found
No related tags found
No related merge requests found
...@@ -9,8 +9,7 @@ Bio::EnsEMBL::Hive::RunnableDB::JobFactory ...@@ -9,8 +9,7 @@ Bio::EnsEMBL::Hive::RunnableDB::JobFactory
standaloneJob.pl Bio::EnsEMBL::Hive::RunnableDB::JobFactory \ standaloneJob.pl Bio::EnsEMBL::Hive::RunnableDB::JobFactory \
--inputcmd 'cd ${ENSEMBL_CVS_ROOT_DIR}/ensembl-hive/modules/Bio/EnsEMBL/Hive/RunnableDB; ls -1 *.pm' \ --inputcmd 'cd ${ENSEMBL_CVS_ROOT_DIR}/ensembl-hive/modules/Bio/EnsEMBL/Hive/RunnableDB; ls -1 *.pm' \
--input_id "{'meta_key'=>'module_name','meta_value'=>'#_0#'}" \ --flow_into "{ 2 => { 'mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2914/lg4_compara_families_70/meta' => {'meta_key'=>'module_name','meta_value'=>'#_0#'} } }""
--flow_into "{ 2 => ['mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2912/lg4_compara_families_64/meta']}"
=head1 DESCRIPTION =head1 DESCRIPTION
...@@ -37,6 +36,25 @@ use strict; ...@@ -37,6 +36,25 @@ use strict;
use base ('Bio::EnsEMBL::Hive::Process'); use base ('Bio::EnsEMBL::Hive::Process');
sub param_defaults {
return {
'column_names' => 0,
'delimiter' => undef,
'randomize' => 0,
'step' => 0,
'key_column' => 0,
'input_id' => 0, # this parameter is no longer supported and should stay at 0
'inputlist' => undef,
'inputfile' => undef,
'inputquery' => undef,
'inputcmd' => undef,
'fan_branch_code' => 2,
};
}
=head2 fetch_input =head2 fetch_input
Description : Implements fetch_input() interface method of Bio::EnsEMBL::Hive::Process that is used to read in parameters and load data. Description : Implements fetch_input() interface method of Bio::EnsEMBL::Hive::Process that is used to read in parameters and load data.
...@@ -58,10 +76,7 @@ use base ('Bio::EnsEMBL::Hive::Process'); ...@@ -58,10 +76,7 @@ use base ('Bio::EnsEMBL::Hive::Process');
param('column_names'): Controls the column names that come out of the parser: 0 = "no names", 1 = "parse names from data", arrayref = "take names from this array" param('column_names'): Controls the column names that come out of the parser: 0 = "no names", 1 = "parse names from data", arrayref = "take names from this array"
param('delimiter'): If you set it your lines in file/cmd mode will be split into columns that you can use individually when constructing the template input_id hash. param('delimiter'): If you set it your lines in file/cmd mode will be split into columns that you can use individually when constructing the input_id_template hash.
param('input_id'): The template that will become the input_id of newly created jobs (Note: this is something entirely different from $self->input_id of the current JobFactory job).
After introduction of param('column_names') its significance has dropped, but it may still become handy.
param('randomize'): Shuffles the rows before creating jobs - can sometimes lead to better overall performance of the pipeline. Doesn't make any sence for minibatches (step>1). param('randomize'): Shuffles the rows before creating jobs - can sometimes lead to better overall performance of the pipeline. Doesn't make any sence for minibatches (step>1).
...@@ -86,14 +101,14 @@ use base ('Bio::EnsEMBL::Hive::Process'); ...@@ -86,14 +101,14 @@ use base ('Bio::EnsEMBL::Hive::Process');
sub run { sub run {
my $self = shift @_; my $self = shift @_;
my $column_names = $self->param('column_names') || 0; # can be 0 (no names), 1 (names from data) or an arrayref (names from this array) my $column_names = $self->param('column_names'); # can be 0 (no names), 1 (names from data) or an arrayref (names from this array)
my $delimiter = $self->param('delimiter'); my $delimiter = $self->param('delimiter');
my $randomize = $self->param('randomize') || 0; my $randomize = $self->param('randomize');
# minibatching-related: # minibatching-related:
my $step = $self->param('step') || 0; my $step = $self->param('step');
my $key_column = $self->param('key_column') || 0; my $key_column = $self->param('key_column');
my $inputlist = $self->param('inputlist'); my $inputlist = $self->param('inputlist');
my $inputfile = $self->param('inputfile'); my $inputfile = $self->param('inputfile');
...@@ -116,12 +131,8 @@ sub run { ...@@ -116,12 +131,8 @@ sub run {
} }
# after this point $column_names should either contain a list or be false # after this point $column_names should either contain a list or be false
my $template_hash = $self->param('input_id'); if( $self->param('input_id') ) {
unless($template_hash or $column_names) { die "'input_id' is no longer supported, please reconfigure as the input_id_template of the dataflow_rule";
die "At least one of 'input_id' or 'column_names' has to be defined";
}
unless($step ? $template_hash : 1) {
die "If 'step' is defined, 'input_id' also must be defined";
} }
if($randomize) { if($randomize) {
...@@ -129,8 +140,8 @@ sub run { ...@@ -129,8 +140,8 @@ sub run {
} }
my $output_ids = $step my $output_ids = $step
? $self->_substitute_minibatched_rows($rows, $column_names, $template_hash, $step, $key_column) ? $self->_substitute_minibatched_rows($rows, $column_names, $step, $key_column)
: $self->_substitute_rows($rows, $column_names, $template_hash); : $self->_substitute_rows($rows, $column_names);
$self->param('output_ids', $output_ids); $self->param('output_ids', $output_ids);
} }
...@@ -149,7 +160,7 @@ sub write_output { # nothing to write out, but some dataflow to perform: ...@@ -149,7 +160,7 @@ sub write_output { # nothing to write out, but some dataflow to perform:
my $self = shift @_; my $self = shift @_;
my $output_ids = $self->param('output_ids'); my $output_ids = $self->param('output_ids');
my $fan_branch_code = $self->param('fan_branch_code') || 2; my $fan_branch_code = $self->param('fan_branch_code');
# "fan out" into fan_branch_code: # "fan out" into fan_branch_code:
$self->dataflow_output_id($output_ids, $fan_branch_code); $self->dataflow_output_id($output_ids, $fan_branch_code);
...@@ -236,25 +247,16 @@ sub _get_rows_from_open { ...@@ -236,25 +247,16 @@ sub _get_rows_from_open {
=cut =cut
sub _substitute_rows { sub _substitute_rows {
my ($self, $rows, $column_names, $template_hash) = @_; my ($self, $rows, $column_names) = @_;
my @hashes = (); my @hashes = ();
foreach my $row (@$rows) { foreach my $row (@$rows) {
if($template_hash) { my $job_param_hash = $column_names
$self->param('_', $row); # the whole row as a list ? { map { ($column_names->[$_] => $row->[$_]) } (0..scalar(@$row)-1) }
: { '_' => $row, map { ("_$_" => $row->[$_]) } (0..scalar(@$row)-1) };
foreach my $i (0..scalar(@$row)-1) { push @hashes, $job_param_hash;
$self->param("_$i", $row->[$i]);
if($column_names) {
$self->param($column_names->[$i], $row->[$i]);
}
}
push @hashes, $self->param_substitute($template_hash);
} else {
push @hashes, { map { ($column_names->[$_] => $row->[$_]) } (0..scalar(@$row)-1) };
}
} }
return \@hashes; return \@hashes;
} }
...@@ -267,7 +269,7 @@ sub _substitute_rows { ...@@ -267,7 +269,7 @@ sub _substitute_rows {
=cut =cut
sub _substitute_minibatched_rows { sub _substitute_minibatched_rows {
my ($self, $rows, $column_names, $template_hash, $step, $key_column) = @_; my ($self, $rows, $column_names, $step, $key_column) = @_;
my @ranges = (); my @ranges = ();
...@@ -293,21 +295,16 @@ sub _substitute_minibatched_rows { ...@@ -293,21 +295,16 @@ sub _substitute_minibatched_rows {
} }
} }
# pseudo-parameters that will be substituted in the template hash: my $job_range = {
$self->param('_range_start', $range_start); '_range_start' => $range_start,
$self->param('_range_end', $range_end); '_range_end' => $range_end,
$self->param('_range_count', $range_count); '_range_count' => $range_count,
foreach my $i (0..scalar(@$start_row)-1) { $column_names
$self->param("_start_$i", $start_row->[$i]); ? map { ('_start_'.$column_names->[$_] => $start_row->[$_], '_end_'.$column_names->[$_] => $next_row->[$_]) } (0..scalar(@$start_row)-1)
$self->param("_end_$i", $next_row->[$i]); : map { ("_start_$_" => $start_row->[$_], "_end_$_" => $next_row->[$_]) } (0..scalar(@$start_row)-1)
};
if($column_names) { push @ranges, $job_range;
$self->param('_start_'.$column_names->[$i], $start_row->[$i]);
$self->param('_end_'.$column_names->[$i], $next_row->[$i]);
}
}
push @ranges, $self->param_substitute($template_hash);
} }
return \@ranges; return \@ranges;
} }
......
...@@ -51,8 +51,22 @@ foreach my $branch_code (keys %$flow_into) { ...@@ -51,8 +51,22 @@ foreach my $branch_code (keys %$flow_into) {
my $heirs = $flow_into->{$branch_code}; my $heirs = $flow_into->{$branch_code};
$heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first $heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first
$heirs = { map { ($_ => undef) } @$heirs } if(ref($heirs) eq 'ARRAY'); # now force it into a hash if it wasn't
my @dataflow_rules = map { Bio::EnsEMBL::Hive::DataflowRule->new( -to_analysis_url => $_ ) } @$heirs; my @dataflow_rules = ();
while(my ($heir_url, $input_id_template_list) = each %$heirs) {
$input_id_template_list = [ $input_id_template_list ] unless(ref($input_id_template_list) eq 'ARRAY'); # allow for more than one template per analysis
foreach my $input_id_template (@$input_id_template_list) {
push @dataflow_rules, Bio::EnsEMBL::Hive::DataflowRule->new(
-to_analysis_url => $heir_url,
-input_id_template => $input_id_template,
);
}
}
$job->dataflow_rules( $branch_code, \@dataflow_rules ); $job->dataflow_rules( $branch_code, \@dataflow_rules );
} }
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment