AnalysisJob.pm 16.8 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3 4
=pod 

=head1 NAME

5
    Bio::EnsEMBL::Hive::AnalysisJob
Jessica Severin's avatar
Jessica Severin committed
6 7

=head1 DESCRIPTION
Jessica Severin's avatar
Jessica Severin committed
8

9 10 11 12 13
    An AnalysisJob is the link between the input_id control data, the analysis and
    the rule system.  It also tracks the state of the job as it is processed

=head1 LICENSE

14
    Copyright [1999-2014] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
15 16 17 18 19 20 21 22 23

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
Jessica Severin's avatar
Jessica Severin committed
24 25

=head1 CONTACT
Jessica Severin's avatar
Jessica Severin committed
26

27
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
Jessica Severin's avatar
Jessica Severin committed
28 29

=head1 APPENDIX
Jessica Severin's avatar
Jessica Severin committed
30

31 32
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
33

Jessica Severin's avatar
Jessica Severin committed
34 35
=cut

36

Jessica Severin's avatar
Jessica Severin committed
37 38 39
package Bio::EnsEMBL::Hive::AnalysisJob;

use strict;
40 41

use Bio::EnsEMBL::Utils::Argument ('rearrange');
42

43
use Bio::EnsEMBL::Hive::Utils ('destringify');
44
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
45
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
46

47 48 49 50
use base (  'Bio::EnsEMBL::Storable',       # inherit dbID(), adaptor() and new() methods
            'Bio::EnsEMBL::Hive::Params',   # inherit param management functionality
         );

Jessica Severin's avatar
Jessica Severin committed
51

52
sub new {
53 54
    my $class = shift @_;

55
    my $self = $class->SUPER::new( @_ );    # deal with Storable stuff
56

57 58
    my($analysis_id, $input_id, $param_id_stack, $accu_id_stack, $worker_id, $status, $retry_count, $completed, $runtime_msec, $query_count, $semaphore_count, $semaphored_job_id) =
        rearrange([qw(analysis_id input_id param_id_stack accu_id_stack worker_id status retry_count completed runtime_msec query_count semaphore_count semaphored_job_id) ], @_);
59 60 61

    $self->analysis_id($analysis_id)            if(defined($analysis_id));
    $self->input_id($input_id)                  if(defined($input_id));
62 63
    $self->param_id_stack($param_id_stack)      if(defined($param_id_stack));
    $self->accu_id_stack($accu_id_stack)        if(defined($accu_id_stack));
64 65 66 67 68 69 70 71 72 73
    $self->worker_id($worker_id)                if(defined($worker_id));
    $self->status($status)                      if(defined($status));
    $self->retry_count($retry_count)            if(defined($retry_count));
    $self->completed($completed)                if(defined($completed));
    $self->runtime_msec($runtime_msec)          if(defined($runtime_msec));
    $self->query_count($query_count)            if(defined($query_count));
    $self->semaphore_count($semaphore_count)    if(defined($semaphore_count));
    $self->semaphored_job_id($semaphored_job_id) if(defined($semaphored_job_id));

    return $self;
74
}
Jessica Severin's avatar
Jessica Severin committed
75 76


77 78 79 80
sub analysis_id {
    my $self = shift;
    $self->{'_analysis_id'} = shift if(@_);
    return $self->{'_analysis_id'};
Jessica Severin's avatar
Jessica Severin committed
81 82
}

83 84 85 86
sub input_id {
    my $self = shift;
    $self->{'_input_id'} = shift if(@_);
    return $self->{'_input_id'};
87 88
}

89 90 91 92
sub param_id_stack {
    my $self = shift;
    $self->{'_param_id_stack'} = shift if(@_);
    return $self->{'_param_id_stack'};
Jessica Severin's avatar
Jessica Severin committed
93 94
}

95 96 97 98
sub accu_id_stack {
    my $self = shift;
    $self->{'_accu_id_stack'} = shift if(@_);
    return $self->{'_accu_id_stack'};
Jessica Severin's avatar
Jessica Severin committed
99 100
}

101 102 103 104
sub worker_id {
    my $self = shift;
    $self->{'_worker_id'} = shift if(@_);
    return $self->{'_worker_id'};
Jessica Severin's avatar
Jessica Severin committed
105 106
}

107 108 109 110
sub status {
    my $self = shift;
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
111 112
}

Jessica Severin's avatar
Jessica Severin committed
113
sub retry_count {
114 115 116 117
    my $self = shift;
    $self->{'_retry_count'} = shift if(@_);
    $self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'}));
    return $self->{'_retry_count'};
Jessica Severin's avatar
Jessica Severin committed
118 119 120
}

sub completed {
121 122 123
    my $self = shift;
    $self->{'_completed'} = shift if(@_);
    return $self->{'_completed'};
Jessica Severin's avatar
Jessica Severin committed
124 125
}

126
sub runtime_msec {
127 128 129 130
    my $self = shift;
    $self->{'_runtime_msec'} = shift if(@_);
    $self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
    return $self->{'_runtime_msec'};
131 132 133
}

sub query_count {
134 135 136 137
    my $self = shift;
    $self->{'_query_count'} = shift if(@_);
    $self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
    return $self->{'_query_count'};
138 139
}

140
sub semaphore_count {
141 142 143 144
    my $self = shift;
    $self->{'_semaphore_count'} = shift if(@_);
    $self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'}));
    return $self->{'_semaphore_count'};
145 146 147
}

sub semaphored_job_id {
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
    my $self = shift;
    $self->{'_semaphored_job_id'} = shift if(@_);
    return $self->{'_semaphored_job_id'};
}


sub update_status {
    my ($self, $status ) = @_;
    return unless($self->adaptor);
    $self->status($status);
    $self->adaptor->update_status($self);
}

sub dataflow_rules {    # if ever set will prevent the Job from fetching rules from the DB
    my $self                = shift @_;
    my $branch_name_or_code = shift @_;

    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);

167 168 169
    if(@_) {
        $self->{'_dataflow_rules'}{$branch_code} = shift @_;
    }
170

171 172 173
    $self->{'_dataflow_rules'} ||= $self->adaptor->db->get_DataflowRuleAdaptor->fetch_all_by_from_analysis_id_HASHED_FROM_branch_code( $self->analysis_id );

    return $self->{'_dataflow_rules'}{$branch_code} || [];
174 175
}

Jessica Severin's avatar
Jessica Severin committed
176
sub stdout_file {
177 178
  my $self = shift;
  $self->{'_stdout_file'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
179 180 181 182
  return $self->{'_stdout_file'};
}

sub stderr_file {
183 184
  my $self = shift;
  $self->{'_stderr_file'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
185 186 187
  return $self->{'_stderr_file'};
}

188 189 190 191 192 193 194 195
sub accu_hash {
    my $self = shift;
    $self->{'_accu_hash'} = shift if(@_);
    $self->{'_accu_hash'} = {} unless(defined($self->{'_accu_hash'}));
    return $self->{'_accu_hash'};
}


196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
=head2 autoflow

    Title   :  autoflow
    Function:  Gets/sets flag for whether the job should
               be automatically dataflowed on branch 1 when the job completes.
               If the subclass manually sends a job along branch 1 with dataflow_output_id,
               the autoflow will turn itself off.
    Returns :  boolean (1=default|0)

=cut

sub autoflow {
  my $self = shift;

  $self->{'_autoflow'} = shift if(@_);
  $self->{'_autoflow'} = 1 unless(defined($self->{'_autoflow'}));  

  return $self->{'_autoflow'};
}


217 218
##-----------------[indicators to the Worker]--------------------------------

219

Leo Gordon's avatar
Leo Gordon committed
220 221 222
sub lethal_for_worker {     # Job should set this to 1 prior to dying (or before running code that might cause death - such as RunnableDB's compilation)
                            # if it believes that the state of things will not allow the Worker to continue normally.
                            # The Worker will check the flag and commit suicide if it is set to true.
223 224 225
    my $self = shift;
    $self->{'_lethal_for_worker'} = shift if(@_);
    return $self->{'_lethal_for_worker'};
226 227
}

228 229 230 231 232
sub transient_error {       # Job should set this to 1 prior to dying (or before running code that might cause death)
                            # if it believes that it makes sense to retry the same job without any changes.
                            # It may also set it to 0 prior to dying (or before running code that might cause death)
                            # if it believes that there is no point in re-trying (say, if the parameters are wrong).
                            # The Worker will check the flag and make necessary adjustments to the database state.
233 234 235 236 237 238 239 240 241 242
    my $self = shift;
    $self->{'_transient_error'} = shift if(@_);
    return $self->{'_transient_error'};
}

sub incomplete {            # Job should set this to 0 prior to throwing if the job is done,
                            # but it wants the thrown message to be recorded with is_error=0.
    my $self = shift;
    $self->{'_incomplete'} = shift if(@_);
    return $self->{'_incomplete'};
243 244 245 246
}

##-----------------[/indicators to the Worker]-------------------------------

247 248
=head2 warning

249
    Description:    records a non-error message in 'log_message' table linked to the current job
250 251 252 253 254 255

=cut

sub warning {
    my ($self, $msg) = @_;

256
    if( my $job_adaptor = $self->adaptor ) {
257
        $job_adaptor->db->get_LogMessageAdaptor()->store_job_message($self->dbID, $msg, 0);
258 259 260
    } else {
        print STDERR "Warning: $msg\n";
    }
261
}
262

263 264 265 266 267 268 269
sub fan_cache {     # a self-initializing getter (no setting)
                    # Returns a hash-of-lists { 2 => [list of jobs waiting to be funneled into 2], 3 => [list of jobs waiting to be funneled into 3], etc}
    my $self = shift;

    return $self->{'_fan_cache'} ||= {};
}

270 271 272 273
=head2 dataflow_output_id

    Title        :  dataflow_output_id
    Arg[1](req)  :  <string> $output_id 
274 275
    Arg[2](opt)  :  <int> $branch_name_or_code (optional, defaults to 1)
    Usage        :  $self->dataflow_output_id($output_id, $branch_name_or_code);
276 277 278 279
    Function:  
      If a RunnableDB(Process) needs to create jobs, this allows it to have jobs 
      created and flowed through the dataflow rules of the workflow graph.
      This 'output_id' becomes the 'input_id' of the newly created job at
280
      the ends of the dataflow pipes.  The optional 'branch_name_or_code' determines
281 282 283 284 285
      which dataflow pipe(s) to flow the job through.      

=cut

sub dataflow_output_id {
286
    my ($self, $output_ids, $branch_name_or_code, $create_job_options) = @_;
287

288 289 290
    my $input_id                = $self->input_id();
    my $param_id_stack          = $self->param_id_stack();
    my $accu_id_stack           = $self->accu_id_stack();
291 292

    my $hive_use_param_stack    = $self->adaptor() && $self->adaptor->db->hive_use_param_stack();
293 294 295 296 297 298 299 300 301 302 303

    if($hive_use_param_stack) {
        if($input_id and ($input_id ne '{}')) {     # add the parent to the param_id_stack if it had non-trivial extra parameters
            $param_id_stack = ($param_id_stack ? $param_id_stack.',' : '').$self->dbID();
        }
        if(scalar(keys %{$self->accu_hash()})) {    # add the parent to the accu_id_stack if it had "own" accumulator
            $accu_id_stack = ($accu_id_stack ? $accu_id_stack.',' : '').$self->dbID();
        }
    }

    $output_ids  ||= [ $hive_use_param_stack ? {} : $input_id ];            # by default replicate the parameters of the parent in the child
304 305
    $output_ids    = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY');   # force previously used single values into an arrayref

306 307 308
    if($create_job_options) {
        die "Please consider configuring semaphored dataflow from PipeConfig rather than setting it up manually";
    }
309

310 311
        # map branch names to numbers:
    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);
312

313
        # if branch_code is set to 1 (explicitly or impliticly), turn off automatic dataflow:
314
    $self->autoflow(0) if($branch_code == 1);
315 316

    my @output_job_ids = ();
317 318 319

        # sort rules to make sure the fan rules come before funnel rules for the same branch_code:
    foreach my $rule (sort {($b->funnel_dataflow_rule_id||0) <=> ($a->funnel_dataflow_rule_id||0)} @{ $self->dataflow_rules( $branch_code ) }) {
320

321
            # parameter substitution into input_id_template is rule-specific
322
        my $output_ids_for_this_rule;
323 324 325
        if(my $template_string = $rule->input_id_template()) {
            my $template_hash = destringify($template_string);
            $output_ids_for_this_rule = [ map { $self->param_substitute($template_hash, $_) } @$output_ids ];
326 327
        } else {
            $output_ids_for_this_rule = $output_ids;
328 329 330 331
        }

        my $target_analysis_or_table = $rule->to_analysis();

332
        if($target_analysis_or_table->can('dataflow')) {
333

334
            $target_analysis_or_table->dataflow( $output_ids_for_this_rule, $self );
335

336 337
        } else {

338
            if(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) {    # members of a semaphored fan will have to wait in cache until the funnel is created:
339

340
                my $fan_cache_this_branch = $self->fan_cache()->{$funnel_dataflow_rule_id} ||= [];
341 342
                push @$fan_cache_this_branch, map { [$_, $target_analysis_or_table] } @$output_ids_for_this_rule;

343
            } else {    # either a semaphored funnel or a non-semaphored dataflow:
344

345
                my $fan_cache = delete $self->fan_cache()->{$rule->dbID};   # clear the cache at the same time
346 347 348

                if($fan_cache && @$fan_cache) { # a semaphored funnel
                    my $funnel_job_id;
349
                    if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
350 351

                        $self->transient_error(0);
352
                        die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
353

354
                    } elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(   # if a semaphored funnel job creation succeeded, ...
355
                                            -input_id           => $output_ids_for_this_rule->[0],
356 357
                                            -param_id_stack     => $param_id_stack,
                                            -accu_id_stack      => $accu_id_stack,
358 359
                                            -analysis           => $target_analysis_or_table,
                                            -prev_job           => $self,
360 361 362
                                            -semaphore_count    => scalar(@$fan_cache),         # "pre-increase" the semaphore count before creating the dependent jobs
                                            -semaphored_job_id  => $self->semaphored_job_id(),  # propagate parent's semaphore if any
                    )) {                                                                                    # ... then create the fan out of the cache:
363 364
                        push @output_job_ids, $funnel_job_id;

365 366
                        my $failed_to_create = 0;

367 368 369 370
                        foreach my $pair ( @$fan_cache ) {
                            my ($output_id, $fan_analysis) = @$pair;
                            if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
                                -input_id           => $output_id,
371 372
                                -param_id_stack     => $param_id_stack,
                                -accu_id_stack      => $accu_id_stack,
373 374 375
                                -analysis           => $fan_analysis,
                                -prev_job           => $self,
                                -semaphored_job_id  => $funnel_job_id,      # by passing this parameter we request not to propagate semaphores
376
                                -push_new_semaphore => 1,                   # inform the adaptor that semaphore count doesn't need up-adjustment
377 378
                            )) {
                                push @output_job_ids, $job_id;
379 380
                            } else {                                        # count all dependent jobs that failed to create
                                $failed_to_create++;
381 382
                            }
                        }
383 384 385
                        if($failed_to_create) { # adjust semaphore_count for jobs that failed to create:
                            $self->adaptor->decrease_semaphore_count_for_jobid( $funnel_job_id, $failed_to_create );
                        }
386
                    } else {    # assume the whole semaphored group of jobs has been created already
387 388
                    }

389 390 391 392 393
                } else {    # non-semaphored dataflow (but potentially propagating any existing semaphores)

                    foreach my $output_id ( @$output_ids_for_this_rule ) {

                        if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
394
                            -input_id           => $output_id,
395 396
                            -param_id_stack     => $param_id_stack,
                            -accu_id_stack      => $accu_id_stack,
397 398 399
                            -analysis           => $target_analysis_or_table,
                            -prev_job           => $self,
                            -semaphored_job_id  => $self->semaphored_job_id(),  # propagate parent's semaphore if any
400 401 402 403 404
                        )) {
                                # only add the ones that were indeed created:
                            push @output_job_ids, $job_id;
                        }
                    }
405
                }
406

407 408 409
            }
        }
    }
410

411 412 413 414
    return \@output_job_ids;
}


415 416 417 418 419 420 421
sub toString {
    my $self = shift @_;

    return 'Job '.$self->dbID." input_id='".$self->input_id."', retry=".$self->retry_count;
}


Jessica Severin's avatar
Jessica Severin committed
422 423
sub print_job {
  my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
424 425 426 427 428
  my $logic_name = $self->adaptor()
      ? $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id)->logic_name()
      : '';

  printf("job_id=%d %35s(%5d) retry=%d input_id='%s'\n", 
Jessica Severin's avatar
Jessica Severin committed
429 430 431 432 433
       $self->dbID,
       $logic_name,
       $self->analysis_id,
       $self->retry_count,
       $self->input_id);
Jessica Severin's avatar
Jessica Severin committed
434 435 436
}

1;