AnalysisJob.pm 16.3 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3 4
=pod 

=head1 NAME

5
    Bio::EnsEMBL::Hive::AnalysisJob
Jessica Severin's avatar
Jessica Severin committed
6 7

=head1 DESCRIPTION
Jessica Severin's avatar
Jessica Severin committed
8

9 10 11 12 13
    An AnalysisJob is the link between the input_id control data, the analysis and
    the rule system.  It also tracks the state of the job as it is processed

=head1 LICENSE

14
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Brandon Walts's avatar
Brandon Walts committed
15
    Copyright [2016-2020] EMBL-European Bioinformatics Institute
16 17 18 19 20 21 22 23 24

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
Jessica Severin's avatar
Jessica Severin committed
25 26

=head1 CONTACT
Jessica Severin's avatar
Jessica Severin committed
27

28
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
Jessica Severin's avatar
Jessica Severin committed
29 30

=head1 APPENDIX
Jessica Severin's avatar
Jessica Severin committed
31

32 33
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
34

Jessica Severin's avatar
Jessica Severin committed
35 36
=cut

37

Jessica Severin's avatar
Jessica Severin committed
38 39 40
package Bio::EnsEMBL::Hive::AnalysisJob;

use strict;
41
use warnings;
42

Leo Gordon's avatar
Leo Gordon committed
43
use Bio::EnsEMBL::Hive::Utils ('stringify', 'destringify');
44
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
45
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
46

47
use base (  'Bio::EnsEMBL::Hive::Storable', # inherit dbID(), adaptor() and new() methods
48 49 50
            'Bio::EnsEMBL::Hive::Params',   # inherit param management functionality
         );

Jessica Severin's avatar
Jessica Severin committed
51

52 53
=head1 AUTOLOADED

54 55
    prev_job_id / prev_job

56 57 58 59 60
    analysis_id / analysis

=cut


61 62
sub input_id {
    my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
63 64 65 66 67
    if(@_) {
        my $input_id = shift @_;
        $self->{'_input_id'} = ref($input_id) ? stringify($input_id) : $input_id;
    }

68
    return $self->{'_input_id'};
69 70
}

71 72 73
sub param_id_stack {
    my $self = shift;
    $self->{'_param_id_stack'} = shift if(@_);
74
    $self->{'_param_id_stack'} = '' unless(defined($self->{'_param_id_stack'}));
75
    return $self->{'_param_id_stack'};
Jessica Severin's avatar
Jessica Severin committed
76 77
}

78 79 80
sub accu_id_stack {
    my $self = shift;
    $self->{'_accu_id_stack'} = shift if(@_);
81
    $self->{'_accu_id_stack'} = '' unless(defined($self->{'_accu_id_stack'}));
82
    return $self->{'_accu_id_stack'};
Jessica Severin's avatar
Jessica Severin committed
83 84
}

85
sub role_id {
86
    my $self = shift;
87 88
    $self->{'_role_id'} = shift if(@_);
    return $self->{'_role_id'};
Jessica Severin's avatar
Jessica Severin committed
89 90
}

91 92 93
sub status {
    my $self = shift;
    $self->{'_status'} = shift if(@_);
94
    $self->{'_status'} = ( ($self->semaphore_count>0) ? 'SEMAPHORED' : 'READY' ) unless(defined($self->{'_status'}));
95
    return $self->{'_status'};
96 97
}

Jessica Severin's avatar
Jessica Severin committed
98
sub retry_count {
99 100 101 102
    my $self = shift;
    $self->{'_retry_count'} = shift if(@_);
    $self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'}));
    return $self->{'_retry_count'};
Jessica Severin's avatar
Jessica Severin committed
103 104
}

105
sub when_completed {
106
    my $self = shift;
107 108
    $self->{'_when_completed'} = shift if(@_);
    return $self->{'_when_completed'};
Jessica Severin's avatar
Jessica Severin committed
109 110
}

111
sub runtime_msec {
112 113 114 115
    my $self = shift;
    $self->{'_runtime_msec'} = shift if(@_);
    $self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
    return $self->{'_runtime_msec'};
116 117 118
}

sub query_count {
119 120 121 122
    my $self = shift;
    $self->{'_query_count'} = shift if(@_);
    $self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
    return $self->{'_query_count'};
123 124
}

125
sub semaphore_count {
126 127 128 129
    my $self = shift;
    $self->{'_semaphore_count'} = shift if(@_);
    $self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'}));
    return $self->{'_semaphore_count'};
130 131 132
}

sub semaphored_job_id {
133 134 135 136 137
    my $self = shift;
    $self->{'_semaphored_job_id'} = shift if(@_);
    return $self->{'_semaphored_job_id'};
}

138
sub set_and_update_status {
139
    my ($self, $status ) = @_;
140

141
    $self->status($status);
142 143 144 145

    if(my $adaptor = $self->adaptor) {
        $adaptor->check_in_job($self);
    }
146 147
}

Jessica Severin's avatar
Jessica Severin committed
148
sub stdout_file {
149 150
  my $self = shift;
  $self->{'_stdout_file'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
151 152 153 154
  return $self->{'_stdout_file'};
}

sub stderr_file {
155 156
  my $self = shift;
  $self->{'_stderr_file'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
157 158 159
  return $self->{'_stderr_file'};
}

160 161 162 163 164 165 166 167
sub accu_hash {
    my $self = shift;
    $self->{'_accu_hash'} = shift if(@_);
    $self->{'_accu_hash'} = {} unless(defined($self->{'_accu_hash'}));
    return $self->{'_accu_hash'};
}


168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
=head2 autoflow

    Title   :  autoflow
    Function:  Gets/sets flag for whether the job should
               be automatically dataflowed on branch 1 when the job completes.
               If the subclass manually sends a job along branch 1 with dataflow_output_id,
               the autoflow will turn itself off.
    Returns :  boolean (1=default|0)

=cut

sub autoflow {
  my $self = shift;

  $self->{'_autoflow'} = shift if(@_);
  $self->{'_autoflow'} = 1 unless(defined($self->{'_autoflow'}));  

  return $self->{'_autoflow'};
}


189 190
##-----------------[indicators to the Worker]--------------------------------

191

Leo Gordon's avatar
Leo Gordon committed
192 193 194
sub lethal_for_worker {     # Job should set this to 1 prior to dying (or before running code that might cause death - such as RunnableDB's compilation)
                            # if it believes that the state of things will not allow the Worker to continue normally.
                            # The Worker will check the flag and commit suicide if it is set to true.
195 196 197
    my $self = shift;
    $self->{'_lethal_for_worker'} = shift if(@_);
    return $self->{'_lethal_for_worker'};
198 199
}

200 201 202 203 204
sub transient_error {       # Job should set this to 1 prior to dying (or before running code that might cause death)
                            # if it believes that it makes sense to retry the same job without any changes.
                            # It may also set it to 0 prior to dying (or before running code that might cause death)
                            # if it believes that there is no point in re-trying (say, if the parameters are wrong).
                            # The Worker will check the flag and make necessary adjustments to the database state.
205 206 207 208 209 210 211 212 213 214
    my $self = shift;
    $self->{'_transient_error'} = shift if(@_);
    return $self->{'_transient_error'};
}

sub incomplete {            # Job should set this to 0 prior to throwing if the job is done,
                            # but it wants the thrown message to be recorded with is_error=0.
    my $self = shift;
    $self->{'_incomplete'} = shift if(@_);
    return $self->{'_incomplete'};
215 216
}

217 218 219 220 221 222 223 224

sub died_somewhere {
    my $self = shift;

    $self->{'_died_somewhere'} ||= shift if(@_);    # NB: the '||=' only applies in this case - do not copy around!
    return $self->{'_died_somewhere'} ||=0;
}

225 226
##-----------------[/indicators to the Worker]-------------------------------

227

228 229 230 231 232
sub load_parameters {
    my ($self, $runnable_object) = @_;

    my @params_precedence = ();

233 234
    push @params_precedence, $runnable_object->param_defaults if($runnable_object);

235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
    if(my $job_adaptor = $self->adaptor) {
        my $job_id          = $self->dbID;
        my $accu_adaptor    = $job_adaptor->db->get_AccumulatorAdaptor;

        $self->accu_hash( $accu_adaptor->fetch_structures_for_job_ids( $job_id )->{ $job_id } );

        push @params_precedence, $job_adaptor->db->get_PipelineWideParametersAdaptor->fetch_param_hash;

        push @params_precedence, $self->analysis->parameters if($self->analysis);

        if( $job_adaptor->db->hive_use_param_stack ) {
            my $input_ids_hash      = $job_adaptor->fetch_input_ids_for_job_ids( $self->param_id_stack, 2, 0 );     # input_ids have lower precedence (FOR EACH ID)
            my $accu_hash           = $accu_adaptor->fetch_structures_for_job_ids( $self->accu_id_stack, 2, 1 );     # accus have higher precedence (FOR EACH ID)
            my %input_id_accu_hash  = ( %$input_ids_hash, %$accu_hash );
            push @params_precedence, @input_id_accu_hash{ sort { $a <=> $b } keys %input_id_accu_hash }; # take a slice. Mmm...
        }
    }
252

253 254 255 256 257 258
    push @params_precedence, $self->input_id, $self->accu_hash;

    $self->param_init( @params_precedence );
}


259 260 261 262 263 264 265
sub fan_cache {     # a self-initializing getter (no setting)
                    # Returns a hash-of-lists { 2 => [list of jobs waiting to be funneled into 2], 3 => [list of jobs waiting to be funneled into 3], etc}
    my $self = shift;

    return $self->{'_fan_cache'} ||= {};
}

266 267 268 269
=head2 dataflow_output_id

    Title        :  dataflow_output_id
    Arg[1](req)  :  <string> $output_id 
270 271
    Arg[2](opt)  :  <int> $branch_name_or_code (optional, defaults to 1)
    Usage        :  $self->dataflow_output_id($output_id, $branch_name_or_code);
272 273 274 275
    Function:  
      If a RunnableDB(Process) needs to create jobs, this allows it to have jobs 
      created and flowed through the dataflow rules of the workflow graph.
      This 'output_id' becomes the 'input_id' of the newly created job at
276
      the ends of the dataflow pipes.  The optional 'branch_name_or_code' determines
277 278 279 280 281
      which dataflow pipe(s) to flow the job through.      

=cut

sub dataflow_output_id {
282
    my ($self, $output_ids, $branch_name_or_code, $create_job_options) = @_;
283

284 285 286
    my $input_id                = $self->input_id();
    my $param_id_stack          = $self->param_id_stack();
    my $accu_id_stack           = $self->accu_id_stack();
287

288 289
    my $job_adaptor             = $self->adaptor() || 'Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor';
    my $hive_use_param_stack    = ref($job_adaptor) && $job_adaptor->db->hive_use_param_stack();
290 291 292 293 294 295 296 297 298 299 300

    if($hive_use_param_stack) {
        if($input_id and ($input_id ne '{}')) {     # add the parent to the param_id_stack if it had non-trivial extra parameters
            $param_id_stack = ($param_id_stack ? $param_id_stack.',' : '').$self->dbID();
        }
        if(scalar(keys %{$self->accu_hash()})) {    # add the parent to the accu_id_stack if it had "own" accumulator
            $accu_id_stack = ($accu_id_stack ? $accu_id_stack.',' : '').$self->dbID();
        }
    }

    $output_ids  ||= [ $hive_use_param_stack ? {} : $input_id ];            # by default replicate the parameters of the parent in the child
301 302
    $output_ids    = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY');   # force previously used single values into an arrayref

303 304 305
    if($create_job_options) {
        die "Please consider configuring semaphored dataflow from PipeConfig rather than setting it up manually";
    }
306

307 308
        # map branch names to numbers:
    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);
309

310
        # if branch_code is set to 1 (explicitly or impliticly), turn off automatic dataflow:
311
    $self->autoflow(0) if($branch_code == 1);
312 313

    my @output_job_ids = ();
314 315

        # sort rules to make sure the fan rules come before funnel rules for the same branch_code:
316
    foreach my $rule (sort {($b->funnel_dataflow_rule//0) cmp ($a->funnel_dataflow_rule//0)} @{ $self->analysis->dataflow_rules_by_branch->{$branch_code} || [] }) {
317

318
            # parameter substitution into input_id_template is rule-specific
319
        my $output_ids_for_this_rule;
320 321 322
        if(my $template_string = $rule->input_id_template()) {
            my $template_hash = destringify($template_string);
            $output_ids_for_this_rule = [ map { $self->param_substitute($template_hash, $_) } @$output_ids ];
323 324
        } else {
            $output_ids_for_this_rule = $output_ids;
325 326 327 328
        }

        my $target_analysis_or_table = $rule->to_analysis();

329
        if($target_analysis_or_table->can('dataflow')) {
330

331
            $target_analysis_or_table->dataflow( $output_ids_for_this_rule, $self );
332

333 334 335 336 337 338 339 340 341
        } else {

            my @common_params = (
                'prev_job'          => $self,
                'analysis'          => $target_analysis_or_table,   # expecting an Analysis
                'param_id_stack'    => $param_id_stack,
                'accu_id_stack'     => $accu_id_stack,
            );

342
            if( my $funnel_dataflow_rule = $rule->funnel_dataflow_rule ) {    # members of a semaphored fan will have to wait in cache until the funnel is created:
343

344
                my $fan_cache_this_branch = $self->fan_cache->{"$funnel_dataflow_rule"} ||= [];
345
                push @$fan_cache_this_branch, map { Bio::EnsEMBL::Hive::AnalysisJob->new(
346
                                                        @common_params,
347 348
                                                        'input_id'          => $_,
                                                        # semaphored_job_id  => to be set when the $funnel_job has been stored
349
                                                    ) } @$output_ids_for_this_rule;
350

351
            } else {    # either a semaphored funnel or a non-semaphored dataflow:
352

353
                my $fan_jobs = delete $self->fan_cache->{"$rule"};   # clear the cache at the same time
354

355
                if( $fan_jobs && @$fan_jobs ) { # a semaphored funnel
356

357
                    if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
358 359

                        $self->transient_error(0);
360
                        die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
361

362 363
                    } else {
                        my $funnel_job = Bio::EnsEMBL::Hive::AnalysisJob->new(
364
                                            @common_params,
365 366 367
                                            'input_id'          => $output_ids_for_this_rule->[0],
                                            'semaphore_count'   => scalar(@$fan_jobs),          # "pre-increase" the semaphore count before creating the dependent jobs
                                            'semaphored_job_id' => $self->semaphored_job_id(),  # propagate parent's semaphore if any
368
                        );
369

370
                        my ($funnel_job_id) = @{ $job_adaptor->store_jobs_and_adjust_counters( [ $funnel_job ], 0) };
371

372 373
                        unless($funnel_job_id) {    # apparently it has been created previously, trying to leech to it:

374
                            if( $funnel_job = $job_adaptor->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id) ) {
375 376 377 378 379 380 381 382 383 384 385
                                $funnel_job_id = $funnel_job->dbID;

                                if( $funnel_job->status eq 'SEMAPHORED' ) {
                                    $job_adaptor->increase_semaphore_count_for_jobid( $funnel_job_id, scalar(@$fan_jobs) );    # "pre-increase" the semaphore count before creating the dependent jobs

                                    $job_adaptor->db->get_LogMessageAdaptor->store_job_message($self->dbID, "Discovered and using an existing funnel ".$funnel_job->toString, 0);
                                } else {
                                    die "The funnel job (id=$funnel_job_id) fetched from the database was not in SEMAPHORED status";
                                }
                            } else {
                                die "The funnel job could neither be stored nor fetched";
386
                            }
387
                        }
388 389 390 391 392 393

                        foreach my $fan_job (@$fan_jobs) {  # set the funnel in every fan's job:
                            $fan_job->semaphored_job_id( $funnel_job_id );
                        }
                        push @output_job_ids, $funnel_job_id, @{ $job_adaptor->store_jobs_and_adjust_counters( $fan_jobs, 1) };

394
                    }
395 396
                } else {    # non-semaphored dataflow (but potentially propagating any existing semaphores)
                    my @non_semaphored_jobs = map { Bio::EnsEMBL::Hive::AnalysisJob->new(
397
                                                        @common_params,
398 399
                                                        'input_id'          => $_,
                                                        'semaphored_job_id' => $self->semaphored_job_id(),  # propagate parent's semaphore if any
400 401 402
                    ) } @$output_ids_for_this_rule;

                    push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0) };
403
                }
404 405 406
            } # /if funnel

        } # /if (table or analysis)
407
    } # /foreach my $rule
408

409 410 411 412
    return \@output_job_ids;
}


413 414 415
sub toString {
    my $self = shift @_;

416 417 418
    my $analysis_label = $self->analysis
        ? ( $self->analysis->logic_name.'('.$self->analysis_id.')' )
        : '(NULL)';
419

420
    return 'Job dbID='.($self->dbID || '(NULL)')." analysis=$analysis_label, input_id='".$self->input_id."', status=".$self->status.", retry_count=".$self->retry_count;
Jessica Severin's avatar
Jessica Severin committed
421 422
}

423

Jessica Severin's avatar
Jessica Severin committed
424
1;
425