AnalysisJobAdaptor.pm 22.7 KB
Newer Older
1 2 3 4 5 6 7 8 9
# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
#
# Date of creation: 22.03.2004
# Original Creator : Jessica Severin <jessica@ebi.ac.uk>
#
# Copyright EMBL-EBI 2004
#
# You may distribute this module under the same terms as perl itself

10
=pod
11 12 13

=head1 NAME

Jessica Severin's avatar
Jessica Severin committed
14
  Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
15 16 17 18 19 20 21 22 23 24 25 26 27

=head1 SYNOPSIS

  $analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
  $analysisJobAdaptor = $analysisJob->adaptor;

=head1 DESCRIPTION

  Module to encapsulate all db access for persistent class AnalysisJob.
  There should be just one per application and database connection.

=head1 CONTACT

28
  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
29 30 31

=head1 APPENDIX

Jessica Severin's avatar
Jessica Severin committed
32 33
  The rest of the documentation details each of the object methods.
  Internal methods are preceded with a _
34 35 36 37 38 39

=cut



package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
Jessica Severin's avatar
Jessica Severin committed
40 41

use strict;
42

43 44 45 46
use Bio::EnsEMBL::Utils::Argument ('rearrange');
use Bio::EnsEMBL::Utils::Exception ('throw');

use Bio::EnsEMBL::Hive::Utils ('stringify');
47
use Bio::EnsEMBL::Hive::AnalysisJob;
48

49
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
Jessica Severin's avatar
Jessica Severin committed
50

51 52 53 54 55
###############################################################################
#
#  CLASS methods
#
###############################################################################
Jessica Severin's avatar
Jessica Severin committed
56

Jessica Severin's avatar
Jessica Severin committed
57 58
=head2 CreateNewJob

59
  Args       : -input_id => string of input_id which will be passed to run the job (or a Perl hash that will be automagically stringified)
60 61 62
               -analysis => Bio::EnsEMBL::Hive::Analysis object stored in the database
               -prev_job_id => (optional) job_id of job that is creating this job.
                               Used purely for book keeping.
63
  Example    : $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
Jessica Severin's avatar
Jessica Severin committed
64 65
                                    -input_id => 'my input data',
                                    -analysis => $myAnalysis);
Jessica Severin's avatar
Jessica Severin committed
66 67 68
  Description: uses the analysis object to get the db connection from the adaptor to store a new
               job in a hive.  This is a class level method since it does not have any state.
               Also updates corresponding analysis_stats by incrementing total_job_count,
69
               ready_job_count and flagging the incremental update by changing the status
Jessica Severin's avatar
Jessica Severin committed
70
               to 'LOADING' (but only if the analysis is not blocked).
71
               NOTE: no AnalysisJob object is created in memory as the result of this call; it is simply a "fast store".
72
  Returntype : int job_id on database analysis is from.
Jessica Severin's avatar
Jessica Severin committed
73 74 75 76 77
  Exceptions : thrown if either -input_id or -analysis are not properly defined
  Caller     : general

=cut

78 79 80
sub CreateNewJob {
  my ($class, @args) = @_;

81
  my ($input_id, $analysis, $prev_job, $prev_job_id, $semaphore_count, $semaphored_job_id, $push_new_semaphore) =
82
     rearrange([qw(input_id analysis prev_job prev_job_id semaphore_count semaphored_job_id push_new_semaphore)], @args);
83

84
  throw("must define input_id") unless($input_id);
85
  throw("must define analysis") unless($analysis);
86 87
  throw("analysis must be [Bio::EnsEMBL::Hive::Analysis] not a [$analysis]")
    unless($analysis->isa('Bio::EnsEMBL::Hive::Analysis'));
88 89
  throw("analysis must have adaptor connected to database")
    unless($analysis->adaptor and $analysis->adaptor->db);
90
  throw("Please specify prev_job object instead of prev_job_id if available") if ($prev_job_id);   # 'obsolete' message
91 92 93

  $prev_job_id = $prev_job && $prev_job->dbID();

94 95 96 97
  if(ref($input_id)) {  # let's do the Perl hash stringification centrally rather than in many places:
    $input_id = stringify($input_id);
  }

98 99 100 101 102
  if(length($input_id) >= 255) {
    my $input_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($input_id);
    $input_id = "_ext_input_analysis_data_id $input_data_id";
  }

103 104
  $semaphore_count ||= 0;

105 106
  my $dba = $analysis->adaptor->db;
  my $dbc = $dba->dbc;
107
  my $insertion_method  = ($dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
108
  my $job_status        = ($semaphore_count>0) ? 'SEMAPHORED' : 'READY';
109
  my $analysis_id       = $analysis->dbID();
110 111

  my $sql = qq{$insertion_method INTO job 
112
              (input_id, prev_job_id,analysis_id,status,semaphore_count,semaphored_job_id)
113
              VALUES (?,?,?,?,?,?)};
Jessica Severin's avatar
Jessica Severin committed
114
 
115
  my $sth       = $dbc->prepare($sql);
116
  my @values    = ($input_id, $prev_job_id, $analysis_id, $job_status, $semaphore_count, $semaphored_job_id);
117 118 119

  my $return_code = $sth->execute(@values)
            # using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
120
        or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
121 122

  my $job_id;
Leo Gordon's avatar
Leo Gordon committed
123
  if($return_code > 0) {    # <--- for the same reason we have to be explicitly numeric here:
124 125 126
      $job_id = $dbc->db_handle->last_insert_id(undef, undef, 'job', 'job_id');
      $sth->finish;

127 128 129
      if($semaphored_job_id and !$push_new_semaphore) {     # if we are not creating a new semaphore (where dependent jobs have already been counted),
                                                            # but rather propagating an existing one (same or other level), we have to up-adjust the counter
            $prev_job->adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id );
130 131 132 133 134 135
      }

      unless($dba->hive_use_triggers()) {
          $dbc->do(qq{
            UPDATE analysis_stats
               SET total_job_count=total_job_count+1
136
          }
137 138 139 140
          .(($job_status eq 'READY')
                  ? " ,ready_job_count=ready_job_count+1 "
                  : " ,semaphored_job_count=semaphored_job_count+1 "
          ).qq{
141 142 143 144
                  ,status = (CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END)
             WHERE analysis_id=$analysis_id
          });
      }
145
  } else {  #   if we got 0E0, it means "ignored insert collision" (job created previously), so we simply return an undef and deal with it outside
146
  }
147

148
  return $job_id;
149 150 151 152 153 154 155
}

###############################################################################
#
#  INSTANCE methods
#
###############################################################################
Jessica Severin's avatar
Jessica Severin committed
156

157
=head2 fetch_by_dbID
Jessica Severin's avatar
Jessica Severin committed
158

Jessica Severin's avatar
Jessica Severin committed
159 160 161
  Arg [1]    : int $id
               the unique database identifier for the feature to be obtained
  Example    : $feat = $adaptor->fetch_by_dbID(1234);
162
  Description: Returns the AnalysisJob defined by the job_id $id.
Jessica Severin's avatar
Jessica Severin committed
163 164 165
  Returntype : Bio::EnsEMBL::Hive::AnalysisJob
  Exceptions : thrown if $id is not defined
  Caller     : general
Jessica Severin's avatar
Jessica Severin committed
166

Jessica Severin's avatar
Jessica Severin committed
167
=cut
Jessica Severin's avatar
Jessica Severin committed
168

Jessica Severin's avatar
Jessica Severin committed
169 170 171 172
sub fetch_by_dbID {
  my ($self,$id) = @_;

  unless(defined $id) {
173
    throw("fetch_by_dbID must have an id");
Jessica Severin's avatar
Jessica Severin committed
174 175 176 177 178 179 180 181 182 183 184 185 186 187
  }

  my @tabs = $self->_tables;

  my ($name, $syn) = @{$tabs[0]};

  #construct a constraint like 't1.table1_id = 1'
  my $constraint = "${syn}.${name}_id = $id";

  #return first element of _generic_fetch list
  my ($obj) = @{$self->_generic_fetch($constraint)};
  return $obj;
}

188

189
=head2 fetch_all_by_analysis_id_status
190 191

  Arg [1]    : (optional) int $analysis_id
192 193 194 195 196
  Arg [2]    : (optional) string $status
  Arg [3]    : (optional) int $retry_at_least
  Example    : $all_failed_jobs = $adaptor->fetch_all_by_analysis_id_status(undef, 'FAILED');
               $analysis_done_jobs = $adaptor->fetch_all_by_analysis_id_status($analysis->dbID, 'DONE');
  Description: Returns a list of all jobs filtered by given analysis_id (if specified) and given status (if specified).
197 198 199 200
  Returntype : reference to list of Bio::EnsEMBL::Hive::AnalysisJob objects

=cut

201 202
sub fetch_all_by_analysis_id_status {
    my ($self, $analysis_id, $status, $retry_count_at_least) = @_;
203

204 205 206 207
    my @constraints = ();
    push @constraints, "j.analysis_id=$analysis_id"             if ($analysis_id);
    push @constraints, "j.status='$status'"                     if ($status);
    push @constraints, "j.retry_count >= $retry_count_at_least" if ($retry_count_at_least);
208 209 210 211
    return $self->_generic_fetch( join(" AND ", @constraints) );
}


212 213
sub fetch_some_by_analysis_id_limit {
    my ($self, $analysis_id, $limit) = @_;
214

215
    return $self->_generic_fetch( "j.analysis_id = '$analysis_id'", undef, "LIMIT $limit" );
216
}
Jessica Severin's avatar
Jessica Severin committed
217 218


219 220 221
sub fetch_all_incomplete_jobs_by_worker_id {
    my ($self, $worker_id) = @_;

222
    my $constraint = "j.status IN ('COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP') AND j.worker_id='$worker_id'";
223 224 225 226
    return $self->_generic_fetch($constraint);
}


227 228 229 230
sub fetch_by_url_query {
    my ($self, $field_name, $field_value) = @_;

    if($field_name eq 'dbID' and $field_value) {
231

232
        return $self->fetch_by_dbID($field_value);
233

234 235 236
    } else {

        return;
237 238 239 240

    }
}

Jessica Severin's avatar
Jessica Severin committed
241 242 243 244 245 246
#
# INTERNAL METHODS
#
###################

sub _generic_fetch {
247
  my ($self, $constraint, $join, $final_clause) = @_;
Jessica Severin's avatar
Jessica Severin committed
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
  
  my @tables = $self->_tables;
  my $columns = join(', ', $self->_columns());
  
  if ($join) {
    foreach my $single_join (@{$join}) {
      my ($tablename, $condition, $extra_columns) = @{$single_join};
      if ($tablename && $condition) {
        push @tables, $tablename;
        
        if($constraint) {
          $constraint .= " AND $condition";
        } else {
          $constraint = " $condition";
        }
      } 
      if ($extra_columns) {
        $columns .= ", " . join(', ', @{$extra_columns});
      }
    }
  }
      
  #construct a nice table string like 'table1 t1, table2 t2'
  my $tablenames = join(', ', map({ join(' ', @$_) } @tables));

  my $sql = "SELECT $columns FROM $tablenames";

  my $default_where = $self->_default_where_clause;

  #append a where clause if it was defined
  if($constraint) { 
    $sql .= " WHERE $constraint ";
    if($default_where) {
      $sql .= " AND $default_where ";
    }
  } elsif($default_where) {
    $sql .= " WHERE $default_where ";
  }

  #append additional clauses which may have been defined
288
  $sql .= " $final_clause" if($final_clause);
Jessica Severin's avatar
Jessica Severin committed
289 290 291 292

  my $sth = $self->prepare($sql);
  $sth->execute;  

293
  #print STDOUT $sql,"\n";
Jessica Severin's avatar
Jessica Severin committed
294 295 296 297

  return $self->_objs_from_sth($sth);
}

298

Jessica Severin's avatar
Jessica Severin committed
299 300 301
sub _tables {
  my $self = shift;

302
  return (['job', 'j']);
Jessica Severin's avatar
Jessica Severin committed
303 304
}

305

Jessica Severin's avatar
Jessica Severin committed
306 307 308
sub _columns {
  my $self = shift;

309 310 311 312 313 314 315 316 317 318 319 320
  return qw (j.job_id  
             j.prev_job_id
             j.analysis_id	      
             j.input_id 
             j.worker_id	      
             j.status 
             j.retry_count          
             j.completed
             j.runtime_msec
             j.query_count
             j.semaphore_count
             j.semaphored_job_id
Jessica Severin's avatar
Jessica Severin committed
321 322 323
            );
}

324

Jessica Severin's avatar
Jessica Severin committed
325 326 327 328 329 330 331
sub _objs_from_sth {
  my ($self, $sth) = @_;
  
  my %column;
  $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));

  my @jobs = ();
332
    
Jessica Severin's avatar
Jessica Severin committed
333
  while ($sth->fetch()) {
334 335 336 337 338 339

    my $input_id = ($column{'input_id'} =~ /_ext_input_analysis_data_id (\d+)/)
            ? $self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1)
            : $column{'input_id'};

    my $job = Bio::EnsEMBL::Hive::AnalysisJob->new(
340 341 342 343 344 345 346 347 348 349 350 351
        -dbID               => $column{'job_id'},
        -analysis_id        => $column{'analysis_id'},
        -input_id           => $input_id,
        -worker_id          => $column{'worker_id'},
        -status             => $column{'status'},
        -retry_count        => $column{'retry_count'},
        -completed          => $column{'completed'},
        -runtime_msec       => $column{'runtime_msec'},
        -query_count        => $column{'query_count'},
        -semaphore_count    => $column{'semaphore_count'},
        -semaphored_job_id  => $column{'semaphored_job_id'},
        -adaptor            => $self,
352
    );
Jessica Severin's avatar
Jessica Severin committed
353 354 355 356 357 358 359 360

    push @jobs, $job;    
  }
  $sth->finish;
  
  return \@jobs
}

361

Jessica Severin's avatar
Jessica Severin committed
362 363 364 365 366
#
# STORE / UPDATE METHODS
#
################

367

368
sub decrease_semaphore_count_for_jobid {    # used in semaphore annihilation or unsuccessful creation
369
    my $self  = shift @_;
370
    my $jobid = shift @_ or return;
371 372
    my $dec   = shift @_ || 1;

373 374 375
        # NB: BOTH THE ORDER OF UPDATES AND EXACT WORDING IS ESSENTIAL FOR SYNCHRONOUS ATOMIC OPERATION,
        #       otherwise the same command tends to behave differently on MySQL and SQLite (at least)
        #
376 377
    my $sql = qq{
        UPDATE job
378 379
        SET status=(CASE WHEN semaphore_count>1 THEN 'SEMAPHORED' ELSE 'READY' END),
            semaphore_count=semaphore_count-?
380 381
        WHERE job_id=? AND status='SEMAPHORED'
    };
382 383 384 385 386 387
    
    my $sth = $self->prepare($sql);
    $sth->execute($dec, $jobid);
    $sth->finish;
}

388 389
sub increase_semaphore_count_for_jobid {    # used in semaphore propagation
    my $self  = shift @_;
390
    my $jobid = shift @_ or return;
391 392
    my $inc   = shift @_ || 1;

393
    my $sql = qq{
394
        UPDATE job
395 396 397
        SET semaphore_count=semaphore_count+?
        WHERE job_id=? AND status='SEMAPHORED'
    };
398
    
399 400
        # This particular query is infamous for collisions and 'deadlock' situations; let's wait and retry:
    $self->dbc->protected_prepare_execute( $sql, $inc, $jobid );
401 402
}

403

Jessica Severin's avatar
Jessica Severin committed
404
=head2 update_status
Jessica Severin's avatar
Jessica Severin committed
405

Jessica Severin's avatar
Jessica Severin committed
406 407
  Arg [1]    : $analysis_id
  Example    :
408
  Description: updates the job.status in the database
Jessica Severin's avatar
Jessica Severin committed
409
  Returntype : 
Jessica Severin's avatar
Jessica Severin committed
410
  Exceptions :
Jessica Severin's avatar
Jessica Severin committed
411 412
  Caller     : general

Jessica Severin's avatar
Jessica Severin committed
413
=cut
Jessica Severin's avatar
Jessica Severin committed
414

Jessica Severin's avatar
Jessica Severin committed
415
sub update_status {
416
    my ($self, $job) = @_;
Jessica Severin's avatar
Jessica Severin committed
417

418
    my $sql = "UPDATE job SET status='".$job->status."' ";
Leo Gordon's avatar
Leo Gordon committed
419

420
    if($job->status eq 'DONE') {
421
        $sql .= ",completed=CURRENT_TIMESTAMP";
422 423 424
        $sql .= ",runtime_msec=".$job->runtime_msec;
        $sql .= ",query_count=".$job->query_count;
    } elsif($job->status eq 'PASSED_ON') {
425
        $sql .= ", completed=CURRENT_TIMESTAMP";
426 427
    } elsif($job->status eq 'READY') {
    }
Leo Gordon's avatar
Leo Gordon committed
428

429
    $sql .= " WHERE job_id='".$job->dbID."' ";
430

431 432
        # This particular query is infamous for collisions and 'deadlock' situations; let's wait and retry:
    $self->dbc->protected_prepare_execute( $sql );
Jessica Severin's avatar
Jessica Severin committed
433 434
}

435

Jessica Severin's avatar
Jessica Severin committed
436
=head2 store_out_files
Jessica Severin's avatar
Jessica Severin committed
437

438
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisJob $job
Jessica Severin's avatar
Jessica Severin committed
439
  Example    :
440
  Description: update locations of log files, if present
Jessica Severin's avatar
Jessica Severin committed
441 442
  Returntype : 
  Exceptions :
Jessica Severin's avatar
Jessica Severin committed
443 444
  Caller     : Bio::EnsEMBL::Hive::Worker

Jessica Severin's avatar
Jessica Severin committed
445 446 447
=cut

sub store_out_files {
448
    my ($self, $job) = @_;
449

450 451 452 453 454 455 456 457 458
    if($job->stdout_file or $job->stderr_file) {
        my $insert_sql = 'REPLACE INTO job_file (job_id, retry, worker_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
        my $sth = $self->dbc()->prepare($insert_sql);
        $sth->execute($job->dbID(), $job->retry_count(), $job->worker_id(), $job->stdout_file(), $job->stderr_file());
        $sth->finish();
    } else {
        my $sql = 'DELETE from job_file WHERE worker_id='.$job->worker_id.' AND job_id='.$job->dbID;
        $self->dbc->do($sql);
    }
Jessica Severin's avatar
Jessica Severin committed
459 460 461
}


462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
=head2 reset_or_grab_job_by_dbID

  Arg [1]    : int $job_id
  Arg [2]    : int $worker_id (optional)
  Description: resets a job to to 'READY' (if no $worker_id given) or directly to 'CLAIMED' so it can be run again, and fetches it..
               NB: Will also reset a previously 'SEMAPHORED' job to READY.
               The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
               but will not change retry_count if a job has never *really* started.
  Returntype : Bio::EnsEMBL::Hive::AnalysisJob or undef

=cut

sub reset_or_grab_job_by_dbID {
    my $self        = shift;
    my $job_id      = shift;
    my $worker_id   = shift;

    my $new_status  = ($worker_id?'CLAIMED':'READY');

        # Note: the order of the fields being updated is critical!
    my $sql = qq{
        UPDATE job
           SET retry_count = (CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END)
             , status=?
             , worker_id=?
         WHERE job_id=?
    };
    my @values = ($new_status, $worker_id, $job_id);

    my $sth = $self->prepare( $sql );
    my $return_code = $sth->execute( @values )
        or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
    $sth->finish;

    my $constraint = "j.job_id='$job_id' AND j.status='$new_status'";
    my ($job) = @{ $self->_generic_fetch($constraint) };

    return $job;
}


Leo Gordon's avatar
Leo Gordon committed
503
=head2 grab_jobs_for_worker
Jessica Severin's avatar
Jessica Severin committed
504

Leo Gordon's avatar
Leo Gordon committed
505 506 507 508 509
  Arg [1]           : Bio::EnsEMBL::Hive::Worker object $worker
  Example: 
    my $jobs  = $job_adaptor->grab_jobs_for_worker( $worker );
  Description: 
    For the specified worker, it will search available jobs, 
510
    and using the how_many_this_batch parameter, claim/fetch that
Leo Gordon's avatar
Leo Gordon committed
511 512 513 514 515 516
    number of jobs, and then return them.
  Returntype : 
    reference to array of Bio::EnsEMBL::Hive::AnalysisJob objects
  Caller     : Bio::EnsEMBL::Hive::Worker::run

=cut
Jessica Severin's avatar
Jessica Severin committed
517

Leo Gordon's avatar
Leo Gordon committed
518
sub grab_jobs_for_worker {
519
    my ($self, $worker, $how_many_this_batch) = @_;
Leo Gordon's avatar
Leo Gordon committed
520
  
Leo Gordon's avatar
Leo Gordon committed
521
  my $analysis_id = $worker->analysis_id();
522
  my $worker_id   = $worker->dbID();
Leo Gordon's avatar
Leo Gordon committed
523

524
  my $update_sql            = "UPDATE job SET worker_id='$worker_id', status='CLAIMED'";
525
  my $selection_start_sql   = " WHERE analysis_id='$analysis_id' AND status='READY'";
526

527 528
  my $virgin_selection_sql  = $selection_start_sql . " AND retry_count=0 LIMIT $how_many_this_batch";
  my $any_selection_sql     = $selection_start_sql . " LIMIT $how_many_this_batch";
529

530
  if($self->dbc->driver eq 'sqlite') {
531
            # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
532
      if( (my $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $virgin_selection_sql) AND status='READY'" )) == 0 ) {
533 534 535
            $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $any_selection_sql) AND status='READY'" );
      }
  } else {
536
            # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
537
      if( (my $claim_count = $self->dbc->do( $update_sql . $virgin_selection_sql )) == 0 ) {
538 539
            $claim_count = $self->dbc->do( $update_sql . $any_selection_sql );
      }
540
  }
Leo Gordon's avatar
Leo Gordon committed
541

542 543
#  my $constraint = "j.analysis_id='$analysis_id' AND j.worker_id='$worker_id' AND j.status='CLAIMED'";
    my $constraint = "j.worker_id='$worker_id' AND j.status='CLAIMED'";
544 545 546 547
    return $self->_generic_fetch($constraint);
}


Leo Gordon's avatar
Leo Gordon committed
548
=head2 release_undone_jobs_from_worker
549 550

  Arg [1]    : Bio::EnsEMBL::Hive::Worker object
551
  Arg [2]    : optional message to be recorded in 'job_message' table
552 553 554 555
  Example    :
  Description: If a worker has died some of its jobs need to be reset back to 'READY'
               so they can be rerun.
               Jobs in state CLAIMED as simply reset back to READY.
556
               If jobs was 'in progress' (COMPILATION, PRE_CLEANUP, FETCH_INPUT, RUN, WRITE_OUTPUT, POST_CLEANUP) 
557
               the retry_count is increased and the status set back to READY.
558 559
               If the retry_count >= $max_retry_count (3 by default) the job is set
               to 'FAILED' and not rerun again.
560 561 562 563
  Exceptions : $worker must be defined
  Caller     : Bio::EnsEMBL::Hive::Queen

=cut
564

Leo Gordon's avatar
Leo Gordon committed
565
sub release_undone_jobs_from_worker {
566
    my ($self, $worker, $msg) = @_;
567

568
    my $max_retry_count = $worker->analysis->max_retry_count();
569
    my $worker_id       = $worker->dbID();
Leo Gordon's avatar
Leo Gordon committed
570
    my $analysis        = $worker->analysis();
571

Leo Gordon's avatar
Leo Gordon committed
572
        #first just reset the claimed jobs, these don't need a retry_count index increment:
573
        # (previous worker_id does not matter, because that worker has never had a chance to run the job)
Leo Gordon's avatar
Leo Gordon committed
574
    $self->dbc->do( qq{
575
        UPDATE job
576
           SET status='READY', worker_id=NULL
577 578
         WHERE worker_id='$worker_id'
           AND status='CLAIMED'
Leo Gordon's avatar
Leo Gordon committed
579
    } );
580

Leo Gordon's avatar
Leo Gordon committed
581
    my $sth = $self->prepare( qq{
582 583
        SELECT job_id
          FROM job
584
         WHERE worker_id='$worker_id'
585
           AND status in ('COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP')
Leo Gordon's avatar
Leo Gordon committed
586
    } );
587 588
    $sth->execute();

Leo Gordon's avatar
Leo Gordon committed
589
    my $cod = $worker->cause_of_death();
590 591
    $msg ||= "GarbageCollector: The worker died because of $cod";

592 593 594
    my $resource_overusage = ($cod eq 'MEMLIMIT') || ($cod eq 'RUNLIMIT' and $worker->work_done()==0);

    while(my ($job_id) = $sth->fetchrow_array()) {
Leo Gordon's avatar
Leo Gordon committed
595

Leo Gordon's avatar
Leo Gordon committed
596 597
        my $passed_on = 0;  # the flag indicating that the garbage_collection was attempted and was successful

Leo Gordon's avatar
Leo Gordon committed
598
        if( $resource_overusage ) {
Leo Gordon's avatar
Leo Gordon committed
599
            if($passed_on = $self->gc_dataflow( $analysis, $job_id, $cod )) {
600 601
                $msg .= ', performing gc_dataflow';
            }
Leo Gordon's avatar
Leo Gordon committed
602
        }
603
        unless($passed_on) {
Leo Gordon's avatar
Leo Gordon committed
604
            if($passed_on = $self->gc_dataflow( $analysis, $job_id, 'ANYFAILURE' )) {
605 606
                $msg .= ", performing 'ANYFAILURE' gc_dataflow";
            }
Leo Gordon's avatar
Leo Gordon committed
607
        }
608

609
        $self->db()->get_LogMessageAdaptor()->store_job_message($job_id, $msg, not $passed_on );
Leo Gordon's avatar
Leo Gordon committed
610

Leo Gordon's avatar
Leo Gordon committed
611 612 613
        unless($passed_on) {
            $self->release_and_age_job( $job_id, $max_retry_count, not $resource_overusage );
        }
614
    }
Leo Gordon's avatar
Leo Gordon committed
615
    $sth->finish();
616 617
}

618

Leo Gordon's avatar
Leo Gordon committed
619
sub release_and_age_job {
620
    my ($self, $job_id, $max_retry_count, $may_retry, $runtime_msec) = @_;
Leo Gordon's avatar
bugfix?  
Leo Gordon committed
621
    $may_retry ||= 0;
622
    $runtime_msec = "NULL" unless(defined $runtime_msec);
Leo Gordon's avatar
Leo Gordon committed
623
        # NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
624 625 626
        #
        # FIXME: would it be possible to retain worker_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
        #
Leo Gordon's avatar
Leo Gordon committed
627
    $self->dbc->do( qq{
628
        UPDATE job
629
           SET status=(CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END),
630 631
               retry_count=retry_count+1,
               runtime_msec=$runtime_msec
632
         WHERE job_id=$job_id
633
           AND status in ('COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP')
Leo Gordon's avatar
Leo Gordon committed
634
    } );
635 636
}

Leo Gordon's avatar
Leo Gordon committed
637
=head2 gc_dataflow
Leo Gordon's avatar
Leo Gordon committed
638

Leo Gordon's avatar
Leo Gordon committed
639 640
    Description:    perform automatic dataflow from a dead job that overused resources if a corresponding dataflow rule was provided
                    Should only be called once during garbage collection phase, when the job is definitely 'abandoned' and not being worked on.
Leo Gordon's avatar
Leo Gordon committed
641 642 643 644

=cut

sub gc_dataflow {
645
    my ($self, $analysis, $job_id, $branch_name) = @_;
Leo Gordon's avatar
Leo Gordon committed
646

647
    unless(@{ $self->db->get_DataflowRuleAdaptor->fetch_all_by_from_analysis_id_and_branch_code($analysis->dbID, $branch_name) }) {
Leo Gordon's avatar
Leo Gordon committed
648 649 650 651
        return 0;   # no corresponding gc_dataflow rule has been defined
    }

    my $job = $self->fetch_by_dbID($job_id);
Leo Gordon's avatar
Leo Gordon committed
652

653
    $job->param_init( 0, $analysis->parameters(), $job->input_id() );    # input_id_templates still supported, however to a limited extent
Leo Gordon's avatar
Leo Gordon committed
654

655
    $job->dataflow_output_id( $job->input_id() , $branch_name );
Leo Gordon's avatar
Leo Gordon committed
656 657

    $job->update_status('PASSED_ON');
658 659 660 661

    if(my $semaphored_job_id = $job->semaphored_job_id) {
        $self->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore
    }
Leo Gordon's avatar
Leo Gordon committed
662 663
    
    return 1;
Leo Gordon's avatar
Leo Gordon committed
664 665
}

666

667
=head2 reset_jobs_for_analysis_id
668 669

  Arg [1]    : int $analysis_id
670 671 672 673
  Arg [2]    : bool $all (false by default)
  Description: Resets either all FAILED jobs of an analysis (default)
                or ALL jobs of an analysis to 'READY' and their retry_count to 0.
  Caller     : beekeeper.pl
674 675 676

=cut

677 678
sub reset_jobs_for_analysis_id {
    my ($self, $analysis_id, $all) = @_;
679

680 681 682 683
    my $sql = "UPDATE job SET status='READY', retry_count=0 WHERE analysis_id=?".($all ? "" : " AND status='FAILED'");
    my $sth = $self->prepare($sql);
    $sth->execute($analysis_id);
    $sth->finish;
684

685
    $self->db->get_AnalysisStatsAdaptor->update_status($analysis_id, 'LOADING');
686 687
}

688

Jessica Severin's avatar
Jessica Severin committed
689 690
1;