JobFactory.pm 11.9 KB
Newer Older
1 2 3 4
=pod 

=head1 NAME

5
    Bio::EnsEMBL::Hive::RunnableDB::JobFactory
6

Leo Gordon's avatar
typo  
Leo Gordon committed
7 8
=head1 SYNOPSIS

9 10
    standaloneJob.pl Bio::EnsEMBL::Hive::RunnableDB::JobFactory \
                    --inputcmd 'cd ${ENSEMBL_CVS_ROOT_DIR}/ensembl-hive/modules/Bio/EnsEMBL/Hive/RunnableDB; ls -1 *.pm' \
11
                    --flow_into "{ 2 => { 'mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2914/lg4_compara_families_70/meta' => {'meta_key'=>'module_name','meta_value'=>'#_0#'} } }""
Leo Gordon's avatar
Leo Gordon committed
12

13 14
=head1 DESCRIPTION

15 16 17 18 19 20 21 22 23 24 25
    This is a generic RunnableDB module for creating batches of similar jobs using dataflow mechanism
    (a fan of jobs is created in one branch and the funnel in another).
    Make sure you wire this buliding block properly from outside.

    You can supply as parameter one of 4 sources of ids from which the batches will be generated:

        param('inputlist');  The list is explicitly given in the parameters, can be abbreviated: 'inputlist' => ['a'..'z']

        param('inputfile');  The list is contained in a file whose name is supplied as parameter: 'inputfile' => 'myfile.txt'

        param('inputquery'); The list is generated by an SQL query (against the production database by default) : 'inputquery' => 'SELECT object_id FROM object WHERE x=y'
26

27
        param('inputcmd');   The list is generated by running a system command: 'inputcmd' => 'find /tmp/big_directory -type f'
28

29 30 31 32 33
    NB for developpers: fetch_input() method is intentionally missing from JobFactory.pm .
    If JobFactory is subclassed (say, by a Compara RunnableDB) the child class's should use fetch_input()
    to set $self->param('inputlist') to whatever list of ids specific to that particular type of data (slices, members, etc).
    The rest functionality will be taken care for by the parent class code.

34
=head1 LICENSE
35

36
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Matthieu Muffato's avatar
Matthieu Muffato committed
37
    Copyright [2016-2018] EMBL-European Bioinformatics Institute
38

39 40
    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
Leo Gordon's avatar
Leo Gordon committed
41

42 43 44 45 46 47 48 49
         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

50
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
51 52 53

=cut

54

55 56 57
package Bio::EnsEMBL::Hive::RunnableDB::JobFactory;

use strict;
58

59
use base ('Bio::EnsEMBL::Hive::Process');
60

61

62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
sub param_defaults {
    return {
        'column_names'      => 0,
        'delimiter'         => undef,
        'randomize'         => 0,
        'step'              => 0,
        'key_column'        => 0,
        'input_id'          => 0,   # this parameter is no longer supported and should stay at 0

        'inputlist'         => undef,
        'inputfile'         => undef,
        'inputquery'        => undef,
        'inputcmd'          => undef,

        'fan_branch_code'   => 2,
    };
}


81

Leo Gordon's avatar
Leo Gordon committed
82 83 84 85
=head2 run

    Description : Implements run() interface method of Bio::EnsEMBL::Hive::Process that is used to perform the main bulk of the job (minus input and output).

86
    param('column_names'):  Controls the column names that come out of the parser: 0 = "no names", 1 = "parse names from data", arrayref = "take names from this array"
Leo Gordon's avatar
Leo Gordon committed
87

88
    param('delimiter'): If you set it your lines in file/cmd mode will be split into columns that you can use individually when constructing the input_id_template hash.
Leo Gordon's avatar
Leo Gordon committed
89

90 91 92
    param('randomize'): Shuffles the rows before creating jobs - can sometimes lead to better overall performance of the pipeline. Doesn't make any sence for minibatches (step>1).

    param('step'):      The requested size of the minibatch (1 by default). The real size of a range may be smaller than the requested size.
93 94 95 96

    param('key_column'): If every line of your input is a list (it happens, for example, when your SQL returns multiple columns or you have set the 'delimiter' in file/cmd mode)
                         this is the way to say which column is undergoing 'ranging'

97

Leo Gordon's avatar
Leo Gordon committed
98 99
        # The following 4 parameters are mutually exclusive and define the source of ids for the jobs:

100
    param('inputlist');  The list is explicitly given in the parameters, can be abbreviated: 'inputlist' => ['a'..'z']
Leo Gordon's avatar
Leo Gordon committed
101

102
    param('inputfile');  The list is contained in a file whose name is supplied as parameter: 'inputfile' => 'myfile.txt'
Leo Gordon's avatar
Leo Gordon committed
103

104
    param('inputquery'); The list is generated by an SQL query (against the production database by default) : 'inputquery' => 'SELECT object_id FROM object WHERE x=y'
Leo Gordon's avatar
Leo Gordon committed
105

106
    param('inputcmd');   The list is generated by running a system command: 'inputcmd' => 'find /tmp/big_directory -type f'
Leo Gordon's avatar
Leo Gordon committed
107 108 109

=cut

110 111 112
sub run {
    my $self = shift @_;

113
    my $column_names    = $self->param('column_names');   # can be 0 (no names), 1 (names from data) or an arrayref (names from this array)
114 115
    my $delimiter       = $self->param('delimiter');

116
    my $randomize       = $self->param('randomize');
Leo Gordon's avatar
Leo Gordon committed
117

118
        # minibatching-related:
119 120
    my $step            = $self->param('step');
    my $key_column      = $self->param('key_column');
121

122 123 124
    my $inputlist       = $self->param('inputlist');
    my $inputfile       = $self->param('inputfile');
    my $inputquery      = $self->param('inputquery');
Leo Gordon's avatar
Leo Gordon committed
125
    my $inputcmd        = $self->param('inputcmd');
Leo Gordon's avatar
Leo Gordon committed
126

127
    my $parse_column_names = $column_names && (ref($column_names) ne 'ARRAY');
Leo Gordon's avatar
Leo Gordon committed
128

129
    my ($rows, $column_names_from_data) =
130 131 132 133
              $inputlist    ? $self->_get_rows_from_list(  $inputlist  )
            : $inputquery   ? $self->_get_rows_from_query( $inputquery )
            : $inputfile    ? $self->_get_rows_from_open(  $inputfile  , $delimiter, $parse_column_names )
            : $inputcmd     ? $self->_get_rows_from_open( "$inputcmd |", $delimiter, $parse_column_names )
134
            : die "range of values should be defined by setting 'inputlist', 'inputquery', 'inputfile' or 'inputcmd'";
135

136 137 138 139 140 141
    if( $column_names_from_data                                             # column data is available
    and ( defined($column_names) ? (ref($column_names) ne 'ARRAY') : 1 )    # and is badly needed
    ) {
        $column_names = $column_names_from_data;
    }
    # after this point $column_names should either contain a list or be false
142

143 144
    if( $self->param('input_id') ) {
        die "'input_id' is no longer supported, please reconfigure as the input_id_template of the dataflow_rule";
145
    }
146

147 148
    if($randomize) {
        _fisher_yates_shuffle_in_place($rows);
149 150
    }

151
    my $output_ids = $step
152 153
        ? $self->_substitute_minibatched_rows($rows, $column_names, $step, $key_column)
        : $self->_substitute_rows($rows, $column_names);
154

155
    $self->param('output_ids', $output_ids);
156 157
}

158

Leo Gordon's avatar
Leo Gordon committed
159 160 161 162 163 164 165 166 167
=head2 write_output

    Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
                  Here we rely on the dataflow mechanism to create jobs.

    param('fan_branch_code'): defines the branch where the fan of jobs is created (2 by default).

=cut

168
sub write_output {  # nothing to write out, but some dataflow to perform:
169 170
    my $self = shift @_;

171
    my $output_ids              = $self->param('output_ids');
172
    my $fan_branch_code         = $self->param('fan_branch_code');
173

174 175
        # "fan out" into fan_branch_code:
    $self->dataflow_output_id($output_ids, $fan_branch_code);
176 177
}

178

179 180
################################### main functionality starts here ###################

181 182

=head2 _get_rows_from_list
Leo Gordon's avatar
Leo Gordon committed
183
    
184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
    Description: a private method that ensures the list is 2D

=cut

sub _get_rows_from_list {
    my ($self, $inputlist) = @_;

    return ref($inputlist->[0])
        ? $inputlist
        : [ map { [ $_ ] } @$inputlist ];
}


=head2 _get_rows_from_query
    
    Description: a private method that loads ids from a given sql query
Leo Gordon's avatar
Leo Gordon committed
200 201 202 203 204

    param('db_conn'): An optional hash to pass in connection parameters to the database upon which the query will have to be run.

=cut

205
sub _get_rows_from_query {
Leo Gordon's avatar
Leo Gordon committed
206 207
    my ($self, $inputquery) = @_;

208 209 210
    if($self->debug()) {
        warn qq{inputquery = "$inputquery"\n};
    }
211
    my @rows = ();
212
    my $sth = $self->data_dbc()->prepare($inputquery);
Leo Gordon's avatar
Leo Gordon committed
213
    $sth->execute();
214 215
    my @column_names_from_data = @{$sth->{NAME}};   # tear it off the original reference to gain some freedom

216
    while (my @cols = $sth->fetchrow_array()) {
217
        push @rows, \@cols;
Leo Gordon's avatar
Leo Gordon committed
218 219 220
    }
    $sth->finish();

221
    return (\@rows, \@column_names_from_data);
Leo Gordon's avatar
Leo Gordon committed
222 223
}

224 225

=head2 _get_rows_from_open
Leo Gordon's avatar
Leo Gordon committed
226
    
227
    Description: a private method that loads ids from a given file or command pipe
Leo Gordon's avatar
Leo Gordon committed
228 229 230

=cut

231 232
sub _get_rows_from_open {
    my ($self, $input_file_or_pipe, $delimiter, $parse_header) = @_;
Leo Gordon's avatar
Leo Gordon committed
233

234 235 236
    if($self->debug()) {
        warn qq{input_file_or_pipe = "$input_file_or_pipe"\n};
    }
237
    my @rows = ();
238 239 240
    open(FILE, $input_file_or_pipe) or die "Could not open '$input_file_or_pipe' because: $!";
    while(my $line = <FILE>) {
        chomp $line;
Leo Gordon's avatar
Leo Gordon committed
241

242
        push @rows, [ defined($delimiter) ? split(/$delimiter/, $line) : $line ];
243
    }
244 245
    close FILE
        or die "Could not read from '$input_file_or_pipe'. Received the error ".($! || $?);
246

247 248 249 250 251 252 253 254 255 256 257 258 259
    my $column_names_from_data = $parse_header ? shift @rows : 0;

    return (\@rows, $column_names_from_data);
}


=head2 _substitute_rows

    Description: a private method that goes through a list and transforms every row into a hash

=cut

sub _substitute_rows {
260
    my ($self, $rows, $column_names) = @_;
261 262 263 264

    my @hashes = ();

    foreach my $row (@$rows) {
265 266 267
        my $job_param_hash =  $column_names
            ?  {              map { ($column_names->[$_] => $row->[$_]) } (0..scalar(@$row)-1) }
            :  { '_' => $row, map { ("_$_"               => $row->[$_]) } (0..scalar(@$row)-1) };
268

269
        push @hashes, $job_param_hash;
270 271
    }
    return \@hashes;
Leo Gordon's avatar
Leo Gordon committed
272 273
}

274 275

=head2 _substitute_minibatched_rows
Leo Gordon's avatar
Leo Gordon committed
276
    
277
    Description: a private method that minibatches a list and transforms every minibatch using param-substitution
Leo Gordon's avatar
Leo Gordon committed
278 279 280

=cut

281
sub _substitute_minibatched_rows {
282
    my ($self, $rows, $column_names, $step, $key_column) = @_;
283 284

    my @ranges = ();
285

286 287 288
    while(@$rows) {
        my $start_row  = shift @$rows;
        my $range_start = $start_row->[$key_column];
289

290 291
        my $range_end   = $range_start;
        my $range_count = 1;
292 293 294 295 296
        my $next_row    = $start_row; # safety, in case the internal while doesn't execute even once

        while($range_count<$step && @$rows) {
               $next_row    = shift @$rows;
            my $next_value  = $next_row->[$key_column];
297

298 299 300 301 302
            my $predicted_next = $range_end;
            if(++$predicted_next eq $next_value) {
                $range_end = $next_value;
                $range_count++;
            } else {
303
                unshift @$rows, $next_row;
304
                last;
305 306
            }
        }
307

308 309 310 311
        my $job_range = {
            '_range_start'  => $range_start,
            '_range_end'    => $range_end,
            '_range_count'  => $range_count,
Leo Gordon's avatar
Leo Gordon committed
312

313 314 315 316 317
            $column_names
                ?  map { ('_start_'.$column_names->[$_] => $start_row->[$_], '_end_'.$column_names->[$_] => $next_row->[$_]) } (0..scalar(@$start_row)-1)
                :  map { ("_start_$_"                   => $start_row->[$_], "_end_$_"                   => $next_row->[$_]) } (0..scalar(@$start_row)-1)
        };
        push @ranges, $job_range;
318
    }
319
    return \@ranges;
320 321
}

322

Leo Gordon's avatar
Leo Gordon committed
323 324
=head2 _fisher_yates_shuffle_in_place
    
325
    Description: a private function (not a method) that shuffles a list of ids
Leo Gordon's avatar
Leo Gordon committed
326 327 328 329

=cut

sub _fisher_yates_shuffle_in_place {
330 331 332 333 334 335 336 337 338 339
    my $array = shift @_;

    for(my $upper=scalar(@$array);--$upper;) {
        my $lower=int(rand($upper+1));
        next if $lower == $upper;
        @$array[$lower,$upper] = @$array[$upper,$lower];
    }
}

1;