JobFactory.pm 11.9 KB
Newer Older
1 2 3 4
=pod 

=head1 NAME

5
    Bio::EnsEMBL::Hive::RunnableDB::JobFactory
6

Leo Gordon's avatar
typo  
Leo Gordon committed
7 8
=head1 SYNOPSIS

9 10
    standaloneJob.pl Bio::EnsEMBL::Hive::RunnableDB::JobFactory \
                    --inputcmd 'cd ${ENSEMBL_CVS_ROOT_DIR}/ensembl-hive/modules/Bio/EnsEMBL/Hive/RunnableDB; ls -1 *.pm' \
11
                    --flow_into "{ 2 => { 'mysql://ensadmin:${ENSADMIN_PSW}@127.0.0.1:2914/lg4_compara_families_70/meta' => {'meta_key'=>'module_name','meta_value'=>'#_0#'} } }""
Leo Gordon's avatar
Leo Gordon committed
12

13 14
=head1 DESCRIPTION

15 16 17 18 19 20 21 22 23 24 25
    This is a generic RunnableDB module for creating batches of similar jobs using dataflow mechanism
    (a fan of jobs is created in one branch and the funnel in another).
    Make sure you wire this buliding block properly from outside.

    You can supply as parameter one of 4 sources of ids from which the batches will be generated:

        param('inputlist');  The list is explicitly given in the parameters, can be abbreviated: 'inputlist' => ['a'..'z']

        param('inputfile');  The list is contained in a file whose name is supplied as parameter: 'inputfile' => 'myfile.txt'

        param('inputquery'); The list is generated by an SQL query (against the production database by default) : 'inputquery' => 'SELECT object_id FROM object WHERE x=y'
26

27
        param('inputcmd');   The list is generated by running a system command: 'inputcmd' => 'find /tmp/big_directory -type f'
28

29
=head1 LICENSE
30

31
    Copyright [1999-2014] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
32

33 34
    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
Leo Gordon's avatar
Leo Gordon committed
35

36 37 38 39 40 41 42 43 44
         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

    Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
45 46 47

=cut

48

49 50 51
package Bio::EnsEMBL::Hive::RunnableDB::JobFactory;

use strict;
52

53
use base ('Bio::EnsEMBL::Hive::Process');
54

55

56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
sub param_defaults {
    return {
        'column_names'      => 0,
        'delimiter'         => undef,
        'randomize'         => 0,
        'step'              => 0,
        'key_column'        => 0,
        'input_id'          => 0,   # this parameter is no longer supported and should stay at 0

        'inputlist'         => undef,
        'inputfile'         => undef,
        'inputquery'        => undef,
        'inputcmd'          => undef,

        'fan_branch_code'   => 2,
    };
}


Leo Gordon's avatar
Leo Gordon committed
75
=head2 fetch_input
76

Leo Gordon's avatar
Leo Gordon committed
77 78
    Description : Implements fetch_input() interface method of Bio::EnsEMBL::Hive::Process that is used to read in parameters and load data.
                  Here we have nothing to do.
79 80 81 82 83 84
                  
                  NB: This method is intentionally missing from JobFactory.pm .

                  If JobFactory is subclassed (say, by a Compara RunnableDB) the child class's should use fetch_input()
                  to set $self->param('inputlist') to whatever list of ids specific to that particular type of data (slices, members, etc).
                  The rest functionality will be taken care for by the parent class code.
Leo Gordon's avatar
Leo Gordon committed
85 86 87

=cut

88

89

Leo Gordon's avatar
Leo Gordon committed
90 91 92 93
=head2 run

    Description : Implements run() interface method of Bio::EnsEMBL::Hive::Process that is used to perform the main bulk of the job (minus input and output).

94
    param('column_names'):  Controls the column names that come out of the parser: 0 = "no names", 1 = "parse names from data", arrayref = "take names from this array"
Leo Gordon's avatar
Leo Gordon committed
95

96
    param('delimiter'): If you set it your lines in file/cmd mode will be split into columns that you can use individually when constructing the input_id_template hash.
Leo Gordon's avatar
Leo Gordon committed
97

98 99 100
    param('randomize'): Shuffles the rows before creating jobs - can sometimes lead to better overall performance of the pipeline. Doesn't make any sence for minibatches (step>1).

    param('step'):      The requested size of the minibatch (1 by default). The real size of a range may be smaller than the requested size.
101 102 103 104

    param('key_column'): If every line of your input is a list (it happens, for example, when your SQL returns multiple columns or you have set the 'delimiter' in file/cmd mode)
                         this is the way to say which column is undergoing 'ranging'

105

Leo Gordon's avatar
Leo Gordon committed
106 107
        # The following 4 parameters are mutually exclusive and define the source of ids for the jobs:

108
    param('inputlist');  The list is explicitly given in the parameters, can be abbreviated: 'inputlist' => ['a'..'z']
Leo Gordon's avatar
Leo Gordon committed
109

110
    param('inputfile');  The list is contained in a file whose name is supplied as parameter: 'inputfile' => 'myfile.txt'
Leo Gordon's avatar
Leo Gordon committed
111

112
    param('inputquery'); The list is generated by an SQL query (against the production database by default) : 'inputquery' => 'SELECT object_id FROM object WHERE x=y'
Leo Gordon's avatar
Leo Gordon committed
113

114
    param('inputcmd');   The list is generated by running a system command: 'inputcmd' => 'find /tmp/big_directory -type f'
Leo Gordon's avatar
Leo Gordon committed
115 116 117

=cut

118 119 120
sub run {
    my $self = shift @_;

121
    my $column_names    = $self->param('column_names');   # can be 0 (no names), 1 (names from data) or an arrayref (names from this array)
122 123
    my $delimiter       = $self->param('delimiter');

124
    my $randomize       = $self->param('randomize');
Leo Gordon's avatar
Leo Gordon committed
125

126
        # minibatching-related:
127 128
    my $step            = $self->param('step');
    my $key_column      = $self->param('key_column');
129

130 131 132
    my $inputlist       = $self->param('inputlist');
    my $inputfile       = $self->param('inputfile');
    my $inputquery      = $self->param('inputquery');
Leo Gordon's avatar
Leo Gordon committed
133
    my $inputcmd        = $self->param('inputcmd');
Leo Gordon's avatar
Leo Gordon committed
134

135
    my $parse_column_names = $column_names && (ref($column_names) ne 'ARRAY');
Leo Gordon's avatar
Leo Gordon committed
136

137
    my ($rows, $column_names_from_data) =
138 139 140 141
              $inputlist    ? $self->_get_rows_from_list(  $inputlist  )
            : $inputquery   ? $self->_get_rows_from_query( $inputquery )
            : $inputfile    ? $self->_get_rows_from_open(  $inputfile  , $delimiter, $parse_column_names )
            : $inputcmd     ? $self->_get_rows_from_open( "$inputcmd |", $delimiter, $parse_column_names )
142
            : die "range of values should be defined by setting 'inputlist', 'inputquery', 'inputfile' or 'inputcmd'";
143

144 145 146 147 148 149
    if( $column_names_from_data                                             # column data is available
    and ( defined($column_names) ? (ref($column_names) ne 'ARRAY') : 1 )    # and is badly needed
    ) {
        $column_names = $column_names_from_data;
    }
    # after this point $column_names should either contain a list or be false
150

151 152
    if( $self->param('input_id') ) {
        die "'input_id' is no longer supported, please reconfigure as the input_id_template of the dataflow_rule";
153
    }
154

155 156
    if($randomize) {
        _fisher_yates_shuffle_in_place($rows);
157 158
    }

159
    my $output_ids = $step
160 161
        ? $self->_substitute_minibatched_rows($rows, $column_names, $step, $key_column)
        : $self->_substitute_rows($rows, $column_names);
162

163
    $self->param('output_ids', $output_ids);
164 165
}

166

Leo Gordon's avatar
Leo Gordon committed
167 168 169 170 171 172 173 174 175
=head2 write_output

    Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
                  Here we rely on the dataflow mechanism to create jobs.

    param('fan_branch_code'): defines the branch where the fan of jobs is created (2 by default).

=cut

176
sub write_output {  # nothing to write out, but some dataflow to perform:
177 178
    my $self = shift @_;

179
    my $output_ids              = $self->param('output_ids');
180
    my $fan_branch_code         = $self->param('fan_branch_code');
181

182 183
        # "fan out" into fan_branch_code:
    $self->dataflow_output_id($output_ids, $fan_branch_code);
184 185
}

186

187 188
################################### main functionality starts here ###################

189 190

=head2 _get_rows_from_list
Leo Gordon's avatar
Leo Gordon committed
191
    
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
    Description: a private method that ensures the list is 2D

=cut

sub _get_rows_from_list {
    my ($self, $inputlist) = @_;

    return ref($inputlist->[0])
        ? $inputlist
        : [ map { [ $_ ] } @$inputlist ];
}


=head2 _get_rows_from_query
    
    Description: a private method that loads ids from a given sql query
Leo Gordon's avatar
Leo Gordon committed
208 209 210 211 212

    param('db_conn'): An optional hash to pass in connection parameters to the database upon which the query will have to be run.

=cut

213
sub _get_rows_from_query {
Leo Gordon's avatar
Leo Gordon committed
214 215
    my ($self, $inputquery) = @_;

216 217 218
    if($self->debug()) {
        warn qq{inputquery = "$inputquery"\n};
    }
219
    my @rows = ();
220
    my $sth = $self->data_dbc()->prepare($inputquery);
Leo Gordon's avatar
Leo Gordon committed
221
    $sth->execute();
222 223
    my @column_names_from_data = @{$sth->{NAME}};   # tear it off the original reference to gain some freedom

224
    while (my @cols = $sth->fetchrow_array()) {
225
        push @rows, \@cols;
Leo Gordon's avatar
Leo Gordon committed
226 227 228
    }
    $sth->finish();

229
    return (\@rows, \@column_names_from_data);
Leo Gordon's avatar
Leo Gordon committed
230 231
}

232 233

=head2 _get_rows_from_open
Leo Gordon's avatar
Leo Gordon committed
234
    
235
    Description: a private method that loads ids from a given file or command pipe
Leo Gordon's avatar
Leo Gordon committed
236 237 238

=cut

239 240
sub _get_rows_from_open {
    my ($self, $input_file_or_pipe, $delimiter, $parse_header) = @_;
Leo Gordon's avatar
Leo Gordon committed
241

242 243 244
    if($self->debug()) {
        warn qq{input_file_or_pipe = "$input_file_or_pipe"\n};
    }
245
    my @rows = ();
246 247 248
    open(FILE, $input_file_or_pipe) or die "Could not open '$input_file_or_pipe' because: $!";
    while(my $line = <FILE>) {
        chomp $line;
Leo Gordon's avatar
Leo Gordon committed
249

250
        push @rows, [ defined($delimiter) ? split(/$delimiter/, $line) : $line ];
251 252 253
    }
    close FILE;

254 255 256 257 258 259 260 261 262 263 264 265 266
    my $column_names_from_data = $parse_header ? shift @rows : 0;

    return (\@rows, $column_names_from_data);
}


=head2 _substitute_rows

    Description: a private method that goes through a list and transforms every row into a hash

=cut

sub _substitute_rows {
267
    my ($self, $rows, $column_names) = @_;
268 269 270 271

    my @hashes = ();

    foreach my $row (@$rows) {
272 273 274
        my $job_param_hash =  $column_names
            ?  {              map { ($column_names->[$_] => $row->[$_]) } (0..scalar(@$row)-1) }
            :  { '_' => $row, map { ("_$_"               => $row->[$_]) } (0..scalar(@$row)-1) };
275

276
        push @hashes, $job_param_hash;
277 278
    }
    return \@hashes;
Leo Gordon's avatar
Leo Gordon committed
279 280
}

281 282

=head2 _substitute_minibatched_rows
Leo Gordon's avatar
Leo Gordon committed
283
    
284
    Description: a private method that minibatches a list and transforms every minibatch using param-substitution
Leo Gordon's avatar
Leo Gordon committed
285 286 287

=cut

288
sub _substitute_minibatched_rows {
289
    my ($self, $rows, $column_names, $step, $key_column) = @_;
290 291

    my @ranges = ();
292

293 294 295
    while(@$rows) {
        my $start_row  = shift @$rows;
        my $range_start = $start_row->[$key_column];
296

297 298
        my $range_end   = $range_start;
        my $range_count = 1;
299 300 301 302 303
        my $next_row    = $start_row; # safety, in case the internal while doesn't execute even once

        while($range_count<$step && @$rows) {
               $next_row    = shift @$rows;
            my $next_value  = $next_row->[$key_column];
304

305 306 307 308 309
            my $predicted_next = $range_end;
            if(++$predicted_next eq $next_value) {
                $range_end = $next_value;
                $range_count++;
            } else {
310
                unshift @$rows, $next_row;
311
                last;
312 313
            }
        }
314

315 316 317 318
        my $job_range = {
            '_range_start'  => $range_start,
            '_range_end'    => $range_end,
            '_range_count'  => $range_count,
Leo Gordon's avatar
Leo Gordon committed
319

320 321 322 323 324
            $column_names
                ?  map { ('_start_'.$column_names->[$_] => $start_row->[$_], '_end_'.$column_names->[$_] => $next_row->[$_]) } (0..scalar(@$start_row)-1)
                :  map { ("_start_$_"                   => $start_row->[$_], "_end_$_"                   => $next_row->[$_]) } (0..scalar(@$start_row)-1)
        };
        push @ranges, $job_range;
325
    }
326
    return \@ranges;
327 328
}

329

Leo Gordon's avatar
Leo Gordon committed
330 331
=head2 _fisher_yates_shuffle_in_place
    
332
    Description: a private function (not a method) that shuffles a list of ids
Leo Gordon's avatar
Leo Gordon committed
333 334 335 336

=cut

sub _fisher_yates_shuffle_in_place {
337 338 339 340 341 342 343 344 345 346
    my $array = shift @_;

    for(my $upper=scalar(@$array);--$upper;) {
        my $lower=int(rand($upper+1));
        next if $lower == $upper;
        @$array[$lower,$upper] = @$array[$upper,$lower];
    }
}

1;