AnalysisJob.pm 15.7 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3 4
=pod 

=head1 NAME

5
    Bio::EnsEMBL::Hive::AnalysisJob
Jessica Severin's avatar
Jessica Severin committed
6 7

=head1 DESCRIPTION
Jessica Severin's avatar
Jessica Severin committed
8

9 10 11 12 13
    An AnalysisJob is the link between the input_id control data, the analysis and
    the rule system.  It also tracks the state of the job as it is processed

=head1 LICENSE

14
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
nwillhoft's avatar
nwillhoft committed
15
    Copyright [2016-2021] EMBL-European Bioinformatics Institute
16 17 18 19 20 21 22 23 24

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
Jessica Severin's avatar
Jessica Severin committed
25 26

=head1 CONTACT
Jessica Severin's avatar
Jessica Severin committed
27

28
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
Jessica Severin's avatar
Jessica Severin committed
29 30

=head1 APPENDIX
Jessica Severin's avatar
Jessica Severin committed
31

32 33
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
34

Jessica Severin's avatar
Jessica Severin committed
35 36
=cut

37

Jessica Severin's avatar
Jessica Severin committed
38 39 40
package Bio::EnsEMBL::Hive::AnalysisJob;

use strict;
41
use warnings;
42

43
use Bio::EnsEMBL::Hive::Utils ('stringify', 'destringify', 'throw');
44
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
45
use Bio::EnsEMBL::Hive::TheApiary;
46

47
use base (  'Bio::EnsEMBL::Hive::Storable', # inherit dbID(), adaptor() and new() methods, but also hive_pipeline()
48 49 50
            'Bio::EnsEMBL::Hive::Params',   # inherit param management functionality
         );

Jessica Severin's avatar
Jessica Severin committed
51

52 53
=head1 AUTOLOADED

54 55
    prev_job_id / prev_job

56 57
    analysis_id / analysis

58
    controlled_semaphore_id / controlled_semaphore
59

60 61 62
=cut


63 64
sub input_id {
    my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
65 66 67 68 69
    if(@_) {
        my $input_id = shift @_;
        $self->{'_input_id'} = ref($input_id) ? stringify($input_id) : $input_id;
    }

70
    return $self->{'_input_id'};
71 72
}

73 74 75
sub param_id_stack {
    my $self = shift;
    $self->{'_param_id_stack'} = shift if(@_);
76
    $self->{'_param_id_stack'} = '' unless(defined($self->{'_param_id_stack'}));
77
    return $self->{'_param_id_stack'};
Jessica Severin's avatar
Jessica Severin committed
78 79
}

80 81 82
sub accu_id_stack {
    my $self = shift;
    $self->{'_accu_id_stack'} = shift if(@_);
83
    $self->{'_accu_id_stack'} = '' unless(defined($self->{'_accu_id_stack'}));
84
    return $self->{'_accu_id_stack'};
Jessica Severin's avatar
Jessica Severin committed
85 86
}

87
sub role_id {
88
    my $self = shift;
89 90
    $self->{'_role_id'} = shift if(@_);
    return $self->{'_role_id'};
Jessica Severin's avatar
Jessica Severin committed
91 92
}

93 94 95
sub status {
    my $self = shift;
    $self->{'_status'} = shift if(@_);
96
    return $self->{'_status'} || 'READY';
97 98
}

Jessica Severin's avatar
Jessica Severin committed
99
sub retry_count {
100 101 102 103
    my $self = shift;
    $self->{'_retry_count'} = shift if(@_);
    $self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'}));
    return $self->{'_retry_count'};
Jessica Severin's avatar
Jessica Severin committed
104 105
}

106
sub when_completed {
107
    my $self = shift;
108 109
    $self->{'_when_completed'} = shift if(@_);
    return $self->{'_when_completed'};
Jessica Severin's avatar
Jessica Severin committed
110 111
}

112
sub runtime_msec {
113 114 115 116
    my $self = shift;
    $self->{'_runtime_msec'} = shift if(@_);
    $self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
    return $self->{'_runtime_msec'};
117 118 119
}

sub query_count {
120 121 122 123
    my $self = shift;
    $self->{'_query_count'} = shift if(@_);
    $self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
    return $self->{'_query_count'};
124 125
}

126

127
sub set_and_update_status {
128
    my ($self, $status ) = @_;
129

130
    $self->status($status);
131 132 133 134

    if(my $adaptor = $self->adaptor) {
        $adaptor->check_in_job($self);
    }
135 136
}

Jessica Severin's avatar
Jessica Severin committed
137
sub stdout_file {
138 139
  my $self = shift;
  $self->{'_stdout_file'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
140 141 142 143
  return $self->{'_stdout_file'};
}

sub stderr_file {
144 145
  my $self = shift;
  $self->{'_stderr_file'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
146 147 148
  return $self->{'_stderr_file'};
}

149 150 151 152 153 154 155 156
sub accu_hash {
    my $self = shift;
    $self->{'_accu_hash'} = shift if(@_);
    $self->{'_accu_hash'} = {} unless(defined($self->{'_accu_hash'}));
    return $self->{'_accu_hash'};
}


157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
=head2 autoflow

    Title   :  autoflow
    Function:  Gets/sets flag for whether the job should
               be automatically dataflowed on branch 1 when the job completes.
               If the subclass manually sends a job along branch 1 with dataflow_output_id,
               the autoflow will turn itself off.
    Returns :  boolean (1=default|0)

=cut

sub autoflow {
  my $self = shift;

  $self->{'_autoflow'} = shift if(@_);
  $self->{'_autoflow'} = 1 unless(defined($self->{'_autoflow'}));  

  return $self->{'_autoflow'};
}


178 179
##-----------------[indicators to the Worker]--------------------------------

180

Leo Gordon's avatar
Leo Gordon committed
181 182 183
sub lethal_for_worker {     # Job should set this to 1 prior to dying (or before running code that might cause death - such as RunnableDB's compilation)
                            # if it believes that the state of things will not allow the Worker to continue normally.
                            # The Worker will check the flag and commit suicide if it is set to true.
184 185 186
    my $self = shift;
    $self->{'_lethal_for_worker'} = shift if(@_);
    return $self->{'_lethal_for_worker'};
187 188
}

189 190 191 192 193
sub transient_error {       # Job should set this to 1 prior to dying (or before running code that might cause death)
                            # if it believes that it makes sense to retry the same job without any changes.
                            # It may also set it to 0 prior to dying (or before running code that might cause death)
                            # if it believes that there is no point in re-trying (say, if the parameters are wrong).
                            # The Worker will check the flag and make necessary adjustments to the database state.
194
                            # Errors are considered transient by default
195 196
    my $self = shift;
    $self->{'_transient_error'} = shift if(@_);
197
    return ($self->{'_transient_error'} // 1);
198 199 200 201 202 203 204
}

sub incomplete {            # Job should set this to 0 prior to throwing if the job is done,
                            # but it wants the thrown message to be recorded with is_error=0.
    my $self = shift;
    $self->{'_incomplete'} = shift if(@_);
    return $self->{'_incomplete'};
205 206
}

207 208 209 210 211 212 213 214

sub died_somewhere {
    my $self = shift;

    $self->{'_died_somewhere'} ||= shift if(@_);    # NB: the '||=' only applies in this case - do not copy around!
    return $self->{'_died_somewhere'} ||=0;
}

215 216
##-----------------[/indicators to the Worker]-------------------------------

217

218 219
sub load_stack_and_accu {
    my ( $self ) = @_;
220

221 222 223 224
    if(my $job_adaptor = $self->adaptor) {
        my $job_id          = $self->dbID;
        my $accu_adaptor    = $job_adaptor->db->get_AccumulatorAdaptor;

225
        if($self->param_id_stack or $self->accu_id_stack) {
226 227 228
            my $input_ids_hash      = $job_adaptor->fetch_input_ids_for_job_ids( $self->param_id_stack, 2, 0 );     # input_ids have lower precedence (FOR EACH ID)
            my $accu_hash           = $accu_adaptor->fetch_structures_for_job_ids( $self->accu_id_stack, 2, 1 );     # accus have higher precedence (FOR EACH ID)
            my %input_id_accu_hash  = ( %$input_ids_hash, %$accu_hash );
229
            $self->{'_unsubstituted_stack_items'} = [ @input_id_accu_hash{ sort { $a <=> $b } keys %input_id_accu_hash } ];   # take a slice. Mmm...
230
        }
231 232

        $self->accu_hash( $accu_adaptor->fetch_structures_for_job_ids( $job_id )->{ $job_id } );
233
    }
234
}
235

236 237 238 239 240 241 242 243 244 245 246 247 248 249

sub load_parameters {
    my ($self, $runnable_object) = @_;

    $self->load_stack_and_accu();

    my @params_precedence = (
        $runnable_object ?                      $runnable_object->param_defaults : (),
                                                $self->hive_pipeline->params_as_hash,
        $self->analysis ?                       $self->analysis->parameters : (),
        $self->{'_unsubstituted_stack_items'} ? @{ $self->{'_unsubstituted_stack_items'}} : (),
                                                $self->input_id,
                                                $self->accu_hash,
    );
250

251 252
    my $prev_transient_error = $self->transient_error(); # make a note of previously set transience status
    $self->transient_error(0);
253
    $self->param_init( @params_precedence );
254
    $self->transient_error($prev_transient_error);
255 256 257
}


258 259 260 261 262 263 264 265 266
sub flattened_stack_and_accu {      # here we assume $self->load_stack_and_accu() has already been called by $self->load_parameters()
    my ( $self, $overriding_hash, $extend_param_stack ) = @_;

    return $self->fuse_param_hashes( $extend_param_stack ? (@{$self->{'_unsubstituted_stack_items'}}, $self->input_id) : (),
                                $self->accu_hash,
                                $overriding_hash );
}


267 268 269 270 271 272 273
sub fan_cache {     # a self-initializing getter (no setting)
                    # Returns a hash-of-lists { 2 => [list of jobs waiting to be funneled into 2], 3 => [list of jobs waiting to be funneled into 3], etc}
    my $self = shift;

    return $self->{'_fan_cache'} ||= {};
}

274 275 276 277
=head2 dataflow_output_id

    Title        :  dataflow_output_id
    Arg[1](req)  :  <string> $output_id 
278 279
    Arg[2](opt)  :  <int> $branch_name_or_code (optional, defaults to 1)
    Usage        :  $self->dataflow_output_id($output_id, $branch_name_or_code);
280 281 282 283
    Function:  
      If a RunnableDB(Process) needs to create jobs, this allows it to have jobs 
      created and flowed through the dataflow rules of the workflow graph.
      This 'output_id' becomes the 'input_id' of the newly created job at
284
      the ends of the dataflow pipes.  The optional 'branch_name_or_code' determines
285 286 287 288 289
      which dataflow pipe(s) to flow the job through.      

=cut

sub dataflow_output_id {
290
    my ($self, $output_ids, $branch_name_or_code) = @_;
291

292
    my $input_id                = $self->input_id();
293
    my $hive_use_param_stack    = $self->hive_pipeline->hive_use_param_stack;
294

295
    $output_ids    =  destringify($output_ids) unless ref($output_ids);     # destringify the string
296 297
    $output_ids    = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY');   # force previously used single values into an arrayref

298 299 300 301 302 303 304 305 306
    my @destringified_output_ids;
    foreach my $output_id (@$output_ids) {
        $output_id = destringify($output_id) unless ref($output_id);        # destringify the string
        if ((defined $output_id) and (ref($output_id) ne 'HASH')) {         # Only undefs and hashrefs work as input_ids
            die stringify($output_id)." is not a hashref ! Cannot dataflow";
        }
        push @destringified_output_ids, $output_id;
    }

307 308
        # map branch names to numbers:
    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);
309

Leo Gordon's avatar
Leo Gordon committed
310
        # if branch_code is set to 1 (explicitly or implicitly), turn off automatic dataflow:
311
    $self->autoflow(0) if($branch_code == 1);
312 313

    my @output_job_ids = ();
314

Leo Gordon's avatar
Leo Gordon committed
315 316
        # fan rules come sorted before funnel rules for the same branch_code:
    foreach my $df_rule ( @{ $self->analysis->dataflow_rules_by_branch->{$branch_code} || [] } ) {
317

318 319 320
        my $targets_grouped_by_condition    = $df_rule->get_my_targets_grouped_by_condition;    # the pairs are deliberately ordered to put the DEFAULT branch last
        my @conditions                      = map { $_->[0] } @$targets_grouped_by_condition;

321 322
        my $total_output_ids_for_the_rule = 0;

323
        foreach my $output_id (@destringified_output_ids) {  # filter the output_ids and place them into the [2] part of $targets_grouped_by_condition
324 325
            my $condition_match_count = 0;
            foreach my $condition_idx (0..@conditions-1) {
326
                my $unsubstituted_condition = $conditions[$condition_idx];
327 328

                if(defined($unsubstituted_condition)) {
329
                    if(my $substituted_condition = $self->param_substitute('#expr('.$unsubstituted_condition.')expr#', $output_id)) {
330 331 332 333 334 335
                        $condition_match_count++;
                    } else {
                        next;   # non-DEFAULT condition branch failed
                    }
                } elsif($condition_match_count) {
                    next;   # DEFAULT condition branch failed, because one of the conditions fired
336 337
                } else {
                    # DEFAULT condition branch succeeded => follow to the push
338 339 340
                }

                push @{$targets_grouped_by_condition->[$condition_idx][2]}, $output_id;
341
                $total_output_ids_for_the_rule += scalar( @{ $targets_grouped_by_condition->[$condition_idx][1] } );
342 343 344
            }
        }

345 346 347 348 349
        my $fan_cache_for_this_rule = exists($self->fan_cache->{"$df_rule"}) && $self->fan_cache->{"$df_rule"};
        if($fan_cache_for_this_rule && @$fan_cache_for_this_rule && $total_output_ids_for_the_rule!=1) {
            die "The total number of funnel output_ids (considering ".scalar(@conditions)." conditions) was $total_output_ids_for_the_rule, but expected to be 1. Please investigate";
        }

350 351 352
        foreach my $triple (@$targets_grouped_by_condition) {
            my ($unsubstituted_condition, $df_targets, $filtered_output_ids) = @$triple;

353
            if($filtered_output_ids && @$filtered_output_ids) {
354

355 356
                foreach my $df_target (@$df_targets) {

357
                    my $extend_param_stack  = $hive_use_param_stack || $df_target->extend_param_stack;                      # this boolean is df_target-specific
358
                    my $default_param_hash  = $extend_param_stack ? {} : $input_id;                                         # this is what undefs will turn into
359

360
                    my @pre_substituted_output_ids = map { $_ // $default_param_hash } @$filtered_output_ids;
361

362
                        # parameter substitution into input_id_template is also df_target-specific:
363 364
                    my $output_ids_for_this_rule;
                    if(my $template_string = $df_target->input_id_template()) {
365
                        my $template_hash = destringify($template_string);
366
                        $output_ids_for_this_rule = [ map { $self->param_substitute($template_hash, $_) } @pre_substituted_output_ids ];
367
                    } else {
368
                        $output_ids_for_this_rule = \@pre_substituted_output_ids;
369
                    }
370

371 372 373 374
                    my $target_object       = $df_target->to_analysis;
                    my $same_db_dataflow    = $self->analysis->hive_pipeline == $target_object->hive_pipeline;

                    unless($same_db_dataflow) {
375 376
                        my $prev_transient_error = $self->transient_error(); # make a note of previously set transience status
                        $self->transient_error(0);
377
                        @$output_ids_for_this_rule = map { $self->flattened_stack_and_accu( $_, $extend_param_stack ); } @$output_ids_for_this_rule;
378
                        $self->transient_error($prev_transient_error);
379 380 381
                    }

                    my ($stored_listref) = $target_object->dataflow( $output_ids_for_this_rule, $self, $same_db_dataflow, $extend_param_stack, $df_rule );
382

383
                    push @output_job_ids, @$stored_listref;
384

385 386
                } # /foreach my $df_target
            } # /if(filtered_output_ids are workable)
387
        } # /foreach my $unsubstituted_condition
Leo Gordon's avatar
Leo Gordon committed
388
    } # /foreach my $df_rule
389

390 391 392 393
    return \@output_job_ids;
}


394 395
sub url_query_params {
     my ($self) = @_;
396

397 398 399
     return {
        'job_id'                => $self->dbID,
     };
400 401 402
}


403 404 405
sub toString {
    my $self = shift @_;

406 407 408
    my $analysis_label = $self->analysis
        ? ( $self->analysis->logic_name.'('.$self->analysis_id.')' )
        : '(NULL)';
409

410
    return 'Job dbID='.($self->dbID || '(NULL)')." analysis=$analysis_label, input_id='".$self->input_id."', status=".$self->status.", retry_count=".$self->retry_count;
Jessica Severin's avatar
Jessica Severin committed
411 412
}

413

414 415 416 417 418 419
sub fetch_local_blocking_semaphore {    # ToDo: we may want to perform smart caching in future
    my $self = shift @_;

    return $self->adaptor->db->get_SemaphoreAdaptor->fetch_by_dependent_job_id( $self->dbID );
}

Jessica Severin's avatar
Jessica Severin committed
420
1;
421