Commit 3230de85 authored by Leo Gordon's avatar Leo Gordon
Browse files

Hive::RunnableDB::JobFactory interface change. You may need to review your PipeConfig modules

parent 5545b6fa
......@@ -72,12 +72,13 @@ sub pipeline_analyses {
{ -logic_name => 'get_databases',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::JobFactory',
-parameters => {
'inputquery' => q{SHOW DATABASES LIKE "}.$self->o('only_databases').q{"},
'inputquery' => q{SHOW DATABASES LIKE "}.$self->o('only_databases').q{"},
'column_names' => [ 'dbname' ],
},
-hive_capacity => 5, # allow several workers to perform identical tasks in parallel
-input_ids => [
{ 'db_conn' => $self->o('source_server1'), 'input_id' => { 'db_conn' => {'-host' => $self->o('source_server1', '-host'), '-port' => $self->o('source_server1', '-port'), '-user' => $self->o('source_server1', '-user'), '-pass' => $self->o('source_server1', '-pass'), '-dbname' => '#_range_start#'}, }, },
{ 'db_conn' => $self->o('source_server2'), 'input_id' => { 'db_conn' => {'-host' => $self->o('source_server2', '-host'), '-port' => $self->o('source_server2', '-port'), '-user' => $self->o('source_server2', '-user'), '-pass' => $self->o('source_server2', '-pass'), '-dbname' => '#_range_start#'}, }, },
{ 'db_conn' => $self->o('source_server1'), 'input_id' => { 'db_conn' => {'-host' => $self->o('source_server1', '-host'), '-port' => $self->o('source_server1', '-port'), '-user' => $self->o('source_server1', '-user'), '-pass' => $self->o('source_server1', '-pass'), '-dbname' => '#dbname#'}, }, },
{ 'db_conn' => $self->o('source_server2'), 'input_id' => { 'db_conn' => {'-host' => $self->o('source_server2', '-host'), '-port' => $self->o('source_server2', '-port'), '-user' => $self->o('source_server2', '-user'), '-pass' => $self->o('source_server2', '-pass'), '-dbname' => '#dbname#'}, }, },
],
-flow_into => {
2 => [ 'dummy' ], # will create a fan of jobs
......
......@@ -88,14 +88,17 @@ sub pipeline_analyses {
-module => 'Bio::EnsEMBL::Hive::RunnableDB::JobFactory',
-parameters => {
'inputlist' => '#expr([0..$job_count-1])expr#', # this expression will evaluate into a listref
'column_names' => [ 'value' ],
},
-input_ids => [
{
'job_count' => $self->o('job_count'), # turn this option into a passable parameter
'failure_rate' => $self->o('failure_rate'), # turn the other option into a passable parameter as well
'state' => $self->o('state'), # turn the third option into a passable parameter too
'lethal_after' => $self->o('lethal_after'),
'input_id' => { 'value' => '#_range_start#', 'divisor' => '#failure_rate#', 'state' => '#state#', 'lethal_after' => '#lethal_after#' },
'input_id' => {
'value' => '#value#',
'divisor' => $self->o('failure_rate'),
'state' => $self->o('state'),
'lethal_after' => $self->o('lethal_after'),
},
},
],
-flow_into => {
......
......@@ -101,10 +101,11 @@ sub pipeline_analyses {
{ -logic_name => 'get_files',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::JobFactory',
-parameters => {
'inputcmd' => 'find '.$self->o('directory').' -type f -name "'.$self->o('only_files').'"',
'inputcmd' => 'find '.$self->o('directory').' -type f -name "'.$self->o('only_files').'"',
'column_names' => [ 'filename' ],
},
-input_ids => [
{ 'input_id' => { 'filename' => '#_range_start#' }, },
{ }, # no need to define the template in simple cases like this
],
-flow_into => {
2 => [ 'zipper_unzipper' ], # will create a fan of jobs
......
......@@ -116,12 +116,11 @@ sub pipeline_analyses {
-module => 'Bio::EnsEMBL::Hive::RunnableDB::JobFactory',
-parameters => {
'db_conn' => $self->o('source_db'),
# 'inputquery' => 'SHOW TABLES LIKE "'.$self->o('only_tables').'"', # to support negative patterns in MySQL 5.1 we need a trick
'inputquery' => 'SELECT table_name FROM information_schema.tables WHERE table_schema = "'.$self->o('source_dbname').'" AND table_name '
.($self->o('invert_selection')?'NOT LIKE':'LIKE').' "'.$self->o('only_tables').'"',
},
-input_ids => [
{ 'input_id' => { 'table_name' => '#_range_start#' }, },
{ }, # the template is now implicitly defined by column_names of the query
],
-flow_into => {
2 => [ 'dumper_zipper' ], # will create a fan of jobs
......
......@@ -39,10 +39,10 @@ whose count is automatically set to the number of fan jobs that it will be waiti
package Bio::EnsEMBL::Hive::RunnableDB::JobFactory;
use strict;
use Bio::EnsEMBL::Hive::Utils ('dir_revhash'); # import dir_revhash
use base ('Bio::EnsEMBL::Hive::Process');
=head2 fetch_input
Description : Implements fetch_input() interface method of Bio::EnsEMBL::Hive::Process that is used to read in parameters and load data.
......@@ -62,19 +62,20 @@ use base ('Bio::EnsEMBL::Hive::Process');
Description : Implements run() interface method of Bio::EnsEMBL::Hive::Process that is used to perform the main bulk of the job (minus input and output).
param('input_id'): The template that will become the input_id of newly created jobs (Note: this is something entirely different from $self->input_id of the current JobFactory job).
param('column_names'): Controls the column names that come out of the parser: 0 = "no names", 1 = "parse names from data", arrayref = "take names from this array"
param('step'): The requested size of the minibatch (1 by default). The real size may be smaller.
param('delimiter'): If you set it your lines in file/cmd mode will be split into columns that you can use individually when constructing the template input_id hash.
param('randomize'): Shuffles the ids before creating jobs - can sometimes lead to better overall performance of the pipeline. Doesn't make any sence for minibatches (step>1).
param('input_id'): The template that will become the input_id of newly created jobs (Note: this is something entirely different from $self->input_id of the current JobFactory job).
After introduction of param('column_names') its significance has dropped, but it may still become handy.
param('delimiter'): If you set it your lines in file/cmd mode will be split into columns that you can use individually when constructing the template input_id hash.
param('randomize'): Shuffles the rows before creating jobs - can sometimes lead to better overall performance of the pipeline. Doesn't make any sence for minibatches (step>1).
param('step'): The requested size of the minibatch (1 by default). The real size of a range may be smaller than the requested size.
param('key_column'): If every line of your input is a list (it happens, for example, when your SQL returns multiple columns or you have set the 'delimiter' in file/cmd mode)
this is the way to say which column is undergoing 'ranging'
param('hashed_column_number'): if defined, turns 'hashed_column_number' into a dir_revhash and appends it to the list of fields.
# The following 4 parameters are mutually exclusive and define the source of ids for the jobs:
......@@ -91,45 +92,56 @@ use base ('Bio::EnsEMBL::Hive::Process');
sub run {
my $self = shift @_;
my $template_hash = $self->param('input_id') || die "'input_id' is an obligatory parameter";
my $step = $self->param('step') || 1;
my $column_names = $self->param('column_names') || 0; # can be 0 (no names), 1 (names from data) or an arrayref (names from this array)
my $delimiter = $self->param('delimiter');
my $randomize = $self->param('randomize') || 0;
# minibatching-related:
my $step = $self->param('step') || 0;
my $key_column = $self->param('key_column') || 0;
my $delimiter = $self->param('delimiter');
my $hashed_column_number = $self->param('hashed_column_number'); # skip this step if undefined
my $inputlist = $self->param('inputlist');
my $inputfile = $self->param('inputfile');
my $inputquery = $self->param('inputquery');
my $inputcmd = $self->param('inputcmd');
my $list = $self->param_substitute( $inputlist )
|| ($inputquery && $self->_make_list_from_query( $self->param_substitute( $inputquery ) ))
|| ($inputfile && $self->_make_list_from_open( $self->param_substitute( $inputfile ), $delimiter ))
|| ($inputcmd && $self->_make_list_from_open( $self->param_substitute( $inputcmd ).' |', $delimiter ))
|| die "range of values should be defined by setting 'inputlist', 'inputfile' or 'inputquery'";
my $parse_column_names = $column_names && (ref($column_names) ne 'ARRAY');
if($randomize) {
_fisher_yates_shuffle_in_place($list);
}
my ($rows, $column_names_from_data) =
$inputlist ? $self->_get_rows_from_list( $self->param_substitute( $inputlist ) )
: $inputquery ? $self->_get_rows_from_query( $self->param_substitute( $inputquery ) )
: $inputfile ? $self->_get_rows_from_open( $self->param_substitute( $inputfile ), $delimiter, $parse_column_names )
: $inputcmd ? $self->_get_rows_from_open( $self->param_substitute( $inputcmd ).' |', $delimiter, $parse_column_names )
: die "range of values should be defined by setting 'inputlist', 'inputquery', 'inputfile' or 'inputcmd'";
if(defined($hashed_column_number) and scalar(@$list)) {
if( $column_names_from_data # column data is available
and ( defined($column_names) ? (ref($column_names) ne 'ARRAY') : 1 ) # and is badly needed
) {
$column_names = $column_names_from_data;
}
# after this point $column_names should either contain a list or be false
if(!ref($list->[0])) {
$list = [ map { [$_] } @$list ]; # create the second dimension if it was missing
}
my $template_hash = $self->param('input_id');
unless($template_hash or $column_names) {
die "At least one of 'input_id' or 'column_names' has to be defined";
}
unless($step ? $template_hash : 1) {
die "If 'step' is defined, 'input_id' also must be defined";
}
foreach my $row (@$list) {
push @$row, dir_revhash($row->[$hashed_column_number]);
}
if($randomize) {
_fisher_yates_shuffle_in_place($rows);
}
my $output_ids = $self->_split_list_into_ranges($template_hash, $list, $step, $key_column);
my $output_ids = $step
? $self->_substitute_minibatched_rows($rows, $column_names, $template_hash, $step, $key_column)
: $self->_substitute_rows($rows, $column_names, $template_hash);
$self->param('output_ids', $output_ids);
}
=head2 write_output
Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
......@@ -163,79 +175,134 @@ sub write_output { # nothing to write out, but some dataflow to perform:
}
}
################################### main functionality starts here ###################
=head2 _make_list_from_query
=head2 _get_rows_from_list
Description: this is a private method that loads ids from a given sql query
Description: a private method that ensures the list is 2D
=cut
sub _get_rows_from_list {
my ($self, $inputlist) = @_;
return ref($inputlist->[0])
? $inputlist
: [ map { [ $_ ] } @$inputlist ];
}
=head2 _get_rows_from_query
Description: a private method that loads ids from a given sql query
param('db_conn'): An optional hash to pass in connection parameters to the database upon which the query will have to be run.
=cut
sub _make_list_from_query {
sub _get_rows_from_query {
my ($self, $inputquery) = @_;
my @list = ();
my @rows = ();
my $sth = $self->dbh()->prepare($inputquery);
$sth->execute();
my @column_names_from_data = @{$sth->{NAME}}; # tear it off the original reference to gain some freedom
while (my @cols = $sth->fetchrow_array()) {
push @list, scalar(@cols)==1 ? $cols[0] : \@cols;
push @rows, \@cols;
}
$sth->finish();
return \@list;
return (\@rows, \@column_names_from_data);
}
=head2 _make_list_from_open
=head2 _get_rows_from_open
Description: this is a private method that loads ids from a given file or command pipe
Description: a private method that loads ids from a given file or command pipe
=cut
sub _make_list_from_open {
my ($self, $input_file_or_pipe, $delimiter) = @_;
sub _get_rows_from_open {
my ($self, $input_file_or_pipe, $delimiter, $parse_header) = @_;
my @list = ();
my @rows = ();
open(FILE, $input_file_or_pipe) or die "Could not open '$input_file_or_pipe' because: $!";
while(my $line = <FILE>) {
chomp $line;
push @list, defined($delimiter) ? [ split(/$delimiter/, $line) ] : $line;
push @rows, [ defined($delimiter) ? split(/$delimiter/, $line) : $line ];
}
close FILE;
return \@list;
my $column_names_from_data = $parse_header ? shift @rows : 0;
return (\@rows, $column_names_from_data);
}
=head2 _substitute_rows
Description: a private method that goes through a list and transforms every row into a hash
=cut
sub _substitute_rows {
my ($self, $rows, $column_names, $template_hash) = @_;
my @hashes = ();
foreach my $row (@$rows) {
if($template_hash) {
$self->param('_', $row); # the whole row as a list
foreach my $i (0..scalar(@$row)-1) {
$self->param("_$i", $row->[$i]);
if($column_names) {
$self->param($column_names->[$i], $row->[$i]);
}
}
push @hashes, $self->param_substitute($template_hash);
} else {
push @hashes, { map { ($column_names->[$_] => $row->[$_]) } (0..scalar(@$row)-1) };
}
}
return \@hashes;
}
=head2 _split_list_into_ranges
=head2 _substitute_minibatched_rows
Description: this is a private method that splits a list of ids into sub-ranges
Description: a private method that minibatches a list and transforms every minibatch using param-substitution
=cut
sub _split_list_into_ranges {
my ($self, $template_hash, $list, $step, $key_column) = @_;
sub _substitute_minibatched_rows {
my ($self, $rows, $column_names, $template_hash, $step, $key_column) = @_;
my @ranges = ();
while(@$list) {
my $start_line = shift @$list;
my $range_start = (ref($start_line) eq 'ARRAY') ? $start_line->[$key_column] : $start_line;
while(@$rows) {
my $start_row = shift @$rows;
my $range_start = $start_row->[$key_column];
my $range_end = $range_start;
my $range_count = 1;
my $next_line = $start_line; # safety, in case next while doesn't execute even once
while($range_count<$step && @$list) {
$next_line = shift @$list;
my $next_value = (ref($next_line) eq 'ARRAY') ? $next_line->[$key_column] : $next_line;
my $next_row = $start_row; # safety, in case the internal while doesn't execute even once
while($range_count<$step && @$rows) {
$next_row = shift @$rows;
my $next_value = $next_row->[$key_column];
my $predicted_next = $range_end;
if(++$predicted_next eq $next_value) {
$range_end = $next_value;
$range_count++;
} else {
unshift @$list, $next_line;
unshift @$rows, $next_row;
last;
}
}
......@@ -245,21 +312,24 @@ sub _split_list_into_ranges {
$self->param('_range_end', $range_end);
$self->param('_range_count', $range_count);
if(ref($start_line) eq 'ARRAY') {
foreach my $i (0..scalar(@$start_line)-1) {
$self->param("_start_$i", $start_line->[$i]);
$self->param("_end_$i", $next_line->[$i]);
foreach my $i (0..scalar(@$start_row)-1) {
$self->param("_start_$i", $start_row->[$i]);
$self->param("_end_$i", $next_row->[$i]);
if($column_names) {
$self->param('_start_'.$column_names->[$i], $start_row->[$i]);
$self->param('_end_'.$column_names->[$i], $next_row->[$i]);
}
}
push @ranges, $self->param_substitute($template_hash);
}
return \@ranges;
}
=head2 _fisher_yates_shuffle_in_place
Description: this is a private function (not a method) that shuffles a list of ids
Description: a private function (not a method) that shuffles a list of ids
=cut
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment