AnalysisJob.pm 14.1 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3 4 5 6 7
#
# You may distribute this module under the same terms as perl itself

=pod 

=head1 NAME

Jessica Severin's avatar
Jessica Severin committed
8
  Bio::EnsEMBL::Hive::AnalysisJob
Jessica Severin's avatar
Jessica Severin committed
9 10

=head1 DESCRIPTION
Jessica Severin's avatar
Jessica Severin committed
11

12
  An AnalysisJob is the link between the input_id control data, the analysis and
Jessica Severin's avatar
Jessica Severin committed
13
  the rule system.  It also tracks the state of the job as it is processed
Jessica Severin's avatar
Jessica Severin committed
14 15

=head1 CONTACT
Jessica Severin's avatar
Jessica Severin committed
16

17
  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
Jessica Severin's avatar
Jessica Severin committed
18 19

=head1 APPENDIX
Jessica Severin's avatar
Jessica Severin committed
20

21 22
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
23

Jessica Severin's avatar
Jessica Severin committed
24 25
=cut

26

Jessica Severin's avatar
Jessica Severin committed
27 28 29
package Bio::EnsEMBL::Hive::AnalysisJob;

use strict;
30 31

use Bio::EnsEMBL::Utils::Argument ('rearrange');
32

33
use Bio::EnsEMBL::Hive::Utils ('destringify');
34
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
35
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
36

37 38 39 40
use base (  'Bio::EnsEMBL::Storable',       # inherit dbID(), adaptor() and new() methods
            'Bio::EnsEMBL::Hive::Params',   # inherit param management functionality
         );

Jessica Severin's avatar
Jessica Severin committed
41

42
sub new {
43 44
    my $class = shift @_;

45
    my $self = $class->SUPER::new( @_ );    # deal with Storable stuff
46

47 48
    my($analysis_id, $input_id, $worker_id, $status, $retry_count, $completed, $runtime_msec, $query_count, $semaphore_count, $semaphored_job_id) =
        rearrange([qw(analysis_id input_id worker_id status retry_count completed runtime_msec query_count semaphore_count semaphored_job_id) ], @_);
49 50 51 52 53 54 55 56 57 58 59 60 61

    $self->analysis_id($analysis_id)            if(defined($analysis_id));
    $self->input_id($input_id)                  if(defined($input_id));
    $self->worker_id($worker_id)                if(defined($worker_id));
    $self->status($status)                      if(defined($status));
    $self->retry_count($retry_count)            if(defined($retry_count));
    $self->completed($completed)                if(defined($completed));
    $self->runtime_msec($runtime_msec)          if(defined($runtime_msec));
    $self->query_count($query_count)            if(defined($query_count));
    $self->semaphore_count($semaphore_count)    if(defined($semaphore_count));
    $self->semaphored_job_id($semaphored_job_id) if(defined($semaphored_job_id));

    return $self;
62
}
Jessica Severin's avatar
Jessica Severin committed
63 64 65


sub input_id {
66 67
  my $self = shift;
  $self->{'_input_id'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
68 69 70
  return $self->{'_input_id'};
}

71

72 73 74 75 76 77 78 79 80 81 82 83 84
sub dataflow_rules {    # if ever set will prevent the Job from fetching rules from the DB
    my $self                = shift @_;
    my $branch_name_or_code = shift @_;

    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);

    $self->{'_dataflow_rules'}{$branch_code} = shift if(@_);

    return $self->{'_dataflow_rules'}
        ? ( $self->{'_dataflow_rules'}{$branch_code} || [] )
        : $self->adaptor->db->get_DataflowRuleAdaptor->fetch_all_by_from_analysis_id_and_branch_code($self->analysis_id, $branch_code);
}

85

Leo Gordon's avatar
Leo Gordon committed
86
sub worker_id {
Jessica Severin's avatar
Jessica Severin committed
87
  my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
88 89
  $self->{'_worker_id'} = shift if(@_);
  return $self->{'_worker_id'};
Jessica Severin's avatar
Jessica Severin committed
90 91
}

92

Jessica Severin's avatar
Jessica Severin committed
93
sub analysis_id {
94 95
  my $self = shift;
  $self->{'_analysis_id'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
96 97 98 99
  return $self->{'_analysis_id'};
}

sub status {
100 101
  my $self = shift;
  $self->{'_status'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
102 103 104
  return $self->{'_status'};
}

105 106 107 108 109 110 111
sub update_status {
  my ($self, $status ) = @_;
  return unless($self->adaptor);
  $self->status($status);
  $self->adaptor->update_status($self);
}

Jessica Severin's avatar
Jessica Severin committed
112
sub retry_count {
113 114
  my $self = shift;
  $self->{'_retry_count'} = shift if(@_);
115
  $self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'}));
Jessica Severin's avatar
Jessica Severin committed
116 117 118 119
  return $self->{'_retry_count'};
}

sub completed {
120 121
  my $self = shift;
  $self->{'_completed'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
122 123 124
  return $self->{'_completed'};
}

125
sub runtime_msec {
126 127
  my $self = shift;
  $self->{'_runtime_msec'} = shift if(@_);
128 129 130 131 132
  $self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
  return $self->{'_runtime_msec'};
}

sub query_count {
133 134
  my $self = shift;
  $self->{'_query_count'} = shift if(@_);
135 136 137 138
  $self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
  return $self->{'_query_count'};
}

139 140 141 142 143 144 145 146 147 148 149 150 151
sub semaphore_count {
  my $self = shift;
  $self->{'_semaphore_count'} = shift if(@_);
  $self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'}));
  return $self->{'_semaphore_count'};
}

sub semaphored_job_id {
  my $self = shift;
  $self->{'_semaphored_job_id'} = shift if(@_);
  return $self->{'_semaphored_job_id'};
}

Jessica Severin's avatar
Jessica Severin committed
152
sub stdout_file {
153 154
  my $self = shift;
  $self->{'_stdout_file'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
155 156 157 158
  return $self->{'_stdout_file'};
}

sub stderr_file {
159 160
  my $self = shift;
  $self->{'_stderr_file'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
161 162 163
  return $self->{'_stderr_file'};
}

164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
=head2 autoflow

    Title   :  autoflow
    Function:  Gets/sets flag for whether the job should
               be automatically dataflowed on branch 1 when the job completes.
               If the subclass manually sends a job along branch 1 with dataflow_output_id,
               the autoflow will turn itself off.
    Returns :  boolean (1=default|0)

=cut

sub autoflow {
  my $self = shift;

  $self->{'_autoflow'} = shift if(@_);
  $self->{'_autoflow'} = 1 unless(defined($self->{'_autoflow'}));  

  return $self->{'_autoflow'};
}


185 186
##-----------------[indicators to the Worker]--------------------------------

187

Leo Gordon's avatar
Leo Gordon committed
188 189 190
sub lethal_for_worker {     # Job should set this to 1 prior to dying (or before running code that might cause death - such as RunnableDB's compilation)
                            # if it believes that the state of things will not allow the Worker to continue normally.
                            # The Worker will check the flag and commit suicide if it is set to true.
191 192 193
    my $self = shift;
    $self->{'_lethal_for_worker'} = shift if(@_);
    return $self->{'_lethal_for_worker'};
194 195
}

196 197 198 199 200
sub transient_error {       # Job should set this to 1 prior to dying (or before running code that might cause death)
                            # if it believes that it makes sense to retry the same job without any changes.
                            # It may also set it to 0 prior to dying (or before running code that might cause death)
                            # if it believes that there is no point in re-trying (say, if the parameters are wrong).
                            # The Worker will check the flag and make necessary adjustments to the database state.
201 202 203 204 205 206 207 208 209 210
    my $self = shift;
    $self->{'_transient_error'} = shift if(@_);
    return $self->{'_transient_error'};
}

sub incomplete {            # Job should set this to 0 prior to throwing if the job is done,
                            # but it wants the thrown message to be recorded with is_error=0.
    my $self = shift;
    $self->{'_incomplete'} = shift if(@_);
    return $self->{'_incomplete'};
211 212 213 214
}

##-----------------[/indicators to the Worker]-------------------------------

215 216
=head2 warning

217
    Description:    records a non-error message in 'log_message' table linked to the current job
218 219 220 221 222 223

=cut

sub warning {
    my ($self, $msg) = @_;

224
    if( my $job_adaptor = $self->adaptor ) {
225
        $job_adaptor->db->get_LogMessageAdaptor()->store_job_message($self->dbID, $msg, 0);
226 227 228
    } else {
        print STDERR "Warning: $msg\n";
    }
229
}
230

231 232 233 234 235 236 237
sub fan_cache {     # a self-initializing getter (no setting)
                    # Returns a hash-of-lists { 2 => [list of jobs waiting to be funneled into 2], 3 => [list of jobs waiting to be funneled into 3], etc}
    my $self = shift;

    return $self->{'_fan_cache'} ||= {};
}

238 239 240 241
=head2 dataflow_output_id

    Title        :  dataflow_output_id
    Arg[1](req)  :  <string> $output_id 
242 243
    Arg[2](opt)  :  <int> $branch_name_or_code (optional, defaults to 1)
    Usage        :  $self->dataflow_output_id($output_id, $branch_name_or_code);
244 245 246 247
    Function:  
      If a RunnableDB(Process) needs to create jobs, this allows it to have jobs 
      created and flowed through the dataflow rules of the workflow graph.
      This 'output_id' becomes the 'input_id' of the newly created job at
248
      the ends of the dataflow pipes.  The optional 'branch_name_or_code' determines
249 250 251 252 253
      which dataflow pipe(s) to flow the job through.      

=cut

sub dataflow_output_id {
254
    my ($self, $output_ids, $branch_name_or_code, $create_job_options) = @_;
255 256 257 258

    $output_ids  ||= [ $self->input_id() ];                                 # replicate the input_id in the branch_code's output by default
    $output_ids    = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY');   # force previously used single values into an arrayref

259 260 261
    if($create_job_options) {
        die "Please consider configuring semaphored dataflow from PipeConfig rather than setting it up manually";
    }
262

263 264
        # map branch names to numbers:
    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name_or_code);
265

266
        # if branch_code is set to 1 (explicitly or impliticly), turn off automatic dataflow:
267
    $self->autoflow(0) if($branch_code == 1);
268 269

    my @output_job_ids = ();
270 271 272

        # sort rules to make sure the fan rules come before funnel rules for the same branch_code:
    foreach my $rule (sort {($b->funnel_dataflow_rule_id||0) <=> ($a->funnel_dataflow_rule_id||0)} @{ $self->dataflow_rules( $branch_code ) }) {
273

274
            # parameter substitution into input_id_template is rule-specific
275
        my $output_ids_for_this_rule;
276 277 278
        if(my $template_string = $rule->input_id_template()) {
            my $template_hash = destringify($template_string);
            $output_ids_for_this_rule = [ map { $self->param_substitute($template_hash, $_) } @$output_ids ];
279 280
        } else {
            $output_ids_for_this_rule = $output_ids;
281 282 283 284
        }

        my $target_analysis_or_table = $rule->to_analysis();

285
        if($target_analysis_or_table->can('dataflow')) {
286

287
            $target_analysis_or_table->dataflow( $output_ids_for_this_rule, $self );
288

289 290
        } else {

291
            if(my $funnel_dataflow_rule_id = $rule->funnel_dataflow_rule_id()) {    # members of a semaphored fan will have to wait in cache until the funnel is created:
292

293
                my $fan_cache_this_branch = $self->fan_cache()->{$funnel_dataflow_rule_id} ||= [];
294 295
                push @$fan_cache_this_branch, map { [$_, $target_analysis_or_table] } @$output_ids_for_this_rule;

296
            } else {    # either a semaphored funnel or a non-semaphored dataflow:
297

298
                my $fan_cache = delete $self->fan_cache()->{$rule->dbID};   # clear the cache at the same time
299 300 301

                if($fan_cache && @$fan_cache) { # a semaphored funnel
                    my $funnel_job_id;
302
                    if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
303 304

                        $self->transient_error(0);
305
                        die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
306

307
                    } elsif($funnel_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(   # if a semaphored funnel job creation succeeded, ...
308 309 310
                                            -input_id           => $output_ids_for_this_rule->[0],
                                            -analysis           => $target_analysis_or_table,
                                            -prev_job           => $self,
311 312 313
                                            -semaphore_count    => scalar(@$fan_cache),         # "pre-increase" the semaphore count before creating the dependent jobs
                                            -semaphored_job_id  => $self->semaphored_job_id(),  # propagate parent's semaphore if any
                    )) {                                                                                    # ... then create the fan out of the cache:
314 315
                        push @output_job_ids, $funnel_job_id;

316 317
                        my $failed_to_create = 0;

318 319 320 321 322 323 324
                        foreach my $pair ( @$fan_cache ) {
                            my ($output_id, $fan_analysis) = @$pair;
                            if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
                                -input_id           => $output_id,
                                -analysis           => $fan_analysis,
                                -prev_job           => $self,
                                -semaphored_job_id  => $funnel_job_id,      # by passing this parameter we request not to propagate semaphores
325
                                -push_new_semaphore => 1,                   # inform the adaptor that semaphore count doesn't need up-adjustment
326 327
                            )) {
                                push @output_job_ids, $job_id;
328 329
                            } else {                                        # count all dependent jobs that failed to create
                                $failed_to_create++;
330 331
                            }
                        }
332 333 334
                        if($failed_to_create) { # adjust semaphore_count for jobs that failed to create:
                            $self->adaptor->decrease_semaphore_count_for_jobid( $funnel_job_id, $failed_to_create );
                        }
335
                    } else {    # assume the whole semaphored group of jobs has been created already
336 337
                    }

338 339 340 341 342
                } else {    # non-semaphored dataflow (but potentially propagating any existing semaphores)

                    foreach my $output_id ( @$output_ids_for_this_rule ) {

                        if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
343 344 345 346
                            -input_id           => $output_id,
                            -analysis           => $target_analysis_or_table,
                            -prev_job           => $self,
                            -semaphored_job_id  => $self->semaphored_job_id(),  # propagate parent's semaphore if any
347 348 349 350 351
                        )) {
                                # only add the ones that were indeed created:
                            push @output_job_ids, $job_id;
                        }
                    }
352
                }
353

354 355 356
            }
        }
    }
357

358 359 360 361
    return \@output_job_ids;
}


Jessica Severin's avatar
Jessica Severin committed
362 363
sub print_job {
  my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
364 365 366 367 368
  my $logic_name = $self->adaptor()
      ? $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id)->logic_name()
      : '';

  printf("job_id=%d %35s(%5d) retry=%d input_id='%s'\n", 
Jessica Severin's avatar
Jessica Severin committed
369 370 371 372 373
       $self->dbID,
       $logic_name,
       $self->analysis_id,
       $self->retry_count,
       $self->input_id);
Jessica Severin's avatar
Jessica Severin committed
374 375 376
}

1;