AnalysisJobAdaptor.pm 24.7 KB
Newer Older
1
=pod
2 3 4

=head1 NAME

5
    Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
6 7 8

=head1 SYNOPSIS

9 10
    $analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
    $analysisJobAdaptor = $analysisJob->adaptor;
11 12 13

=head1 DESCRIPTION

14 15 16 17 18
    Module to encapsulate all db access for persistent class AnalysisJob.
    There should be just one per application and database connection.

=head1 LICENSE

19
    Copyright [1999-2014] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
20 21 22 23 24 25 26 27 28

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
29 30 31

=head1 CONTACT

32
    Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
33 34 35

=head1 APPENDIX

36 37
    The rest of the documentation details each of the object methods.
    Internal methods are preceded with a _
38 39 40 41 42

=cut


package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
Jessica Severin's avatar
Jessica Severin committed
43 44

use strict;
45

46 47 48
use Bio::EnsEMBL::Utils::Argument ('rearrange');
use Bio::EnsEMBL::Utils::Exception ('throw');

49
use Bio::EnsEMBL::Hive::DBSQL::AnalysisDataAdaptor;
50
use Bio::EnsEMBL::Hive::AnalysisJob;
51
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
52
use Bio::EnsEMBL::Hive::Utils ('stringify');
53

54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');


sub default_table_name {
    return 'job';
}


sub object_class {
    return 'Bio::EnsEMBL::Hive::AnalysisJob';
}


sub default_overflow_limit {
    return {
        'input_id'          => 255,
        'param_id_stack'    =>  64,
        'accu_id_stack'     =>  64,
    };
}


Jessica Severin's avatar
Jessica Severin committed
76

77 78 79 80 81
###############################################################################
#
#  CLASS methods
#
###############################################################################
Jessica Severin's avatar
Jessica Severin committed
82

Jessica Severin's avatar
Jessica Severin committed
83 84
=head2 CreateNewJob

85
  Args       : -input_id => string of input_id which will be passed to run the job (or a Perl hash that will be automagically stringified)
86 87 88
               -analysis => Bio::EnsEMBL::Hive::Analysis object stored in the database
               -prev_job_id => (optional) job_id of job that is creating this job.
                               Used purely for book keeping.
89
  Example    : $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
Jessica Severin's avatar
Jessica Severin committed
90 91
                                    -input_id => 'my input data',
                                    -analysis => $myAnalysis);
Jessica Severin's avatar
Jessica Severin committed
92 93 94
  Description: uses the analysis object to get the db connection from the adaptor to store a new
               job in a hive.  This is a class level method since it does not have any state.
               Also updates corresponding analysis_stats by incrementing total_job_count,
95
               ready_job_count and flagging the incremental update by changing the status
Jessica Severin's avatar
Jessica Severin committed
96
               to 'LOADING' (but only if the analysis is not blocked).
97
               NOTE: no AnalysisJob object is created in memory as the result of this call; it is simply a "fast store".
98
  Returntype : int job_id on database analysis is from.
Jessica Severin's avatar
Jessica Severin committed
99 100 101 102 103
  Exceptions : thrown if either -input_id or -analysis are not properly defined
  Caller     : general

=cut

104 105 106
sub CreateNewJob {
  my ($class, @args) = @_;

107 108
  my ($input_id, $param_id_stack, $accu_id_stack, $analysis, $prev_job, $prev_job_id, $semaphore_count, $semaphored_job_id, $push_new_semaphore) =
     rearrange([qw(input_id param_id_stack accu_id_stack analysis prev_job prev_job_id semaphore_count semaphored_job_id push_new_semaphore)], @args);
109

110
  throw("must define input_id") unless($input_id);
111
  throw("must define analysis") unless($analysis);
112 113
  throw("analysis must be [Bio::EnsEMBL::Hive::Analysis] not a [$analysis]")
    unless($analysis->isa('Bio::EnsEMBL::Hive::Analysis'));
114 115
  throw("analysis must have adaptor connected to database")
    unless($analysis->adaptor and $analysis->adaptor->db);
116
  throw("Please specify prev_job object instead of prev_job_id if available") if ($prev_job_id);   # 'obsolete' message
117

118
    $prev_job_id = $prev_job && $prev_job->dbID();
119

120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
    if(ref($input_id)) {  # let's do the Perl hash stringification centrally rather than in many places:
        $input_id = stringify($input_id);
    }

    if(length($input_id) >= 255) {
        print "input_id is '$input_id', length = ".length($input_id)."\n";
        my $extended_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($input_id);
        $input_id = "_extended_data_id $extended_data_id";
    }

    if(length($param_id_stack) >= 64) {
        print "param_id_stack is '$param_id_stack', length = ".length($param_id_stack)."\n";
        my $extended_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($param_id_stack);
        $param_id_stack = "_extended_data_id $extended_data_id";
    }

    if(length($accu_id_stack) >= 64) {
        print "accu_id_stack is '$accu_id_stack', length = ".length($accu_id_stack)."\n";
        my $extended_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($accu_id_stack);
        $accu_id_stack = "_extended_data_id $extended_data_id";
    }
141

142

143 144
  $semaphore_count ||= 0;

145 146
  my $dba = $analysis->adaptor->db;
  my $dbc = $dba->dbc;
147
  my $insertion_method  = { 'mysql' => 'INSERT IGNORE', 'sqlite' => 'INSERT OR IGNORE', 'pgsql' => 'INSERT' }->{ $dbc->driver };
148
  my $job_status        = ($semaphore_count>0) ? 'SEMAPHORED' : 'READY';
149
  my $analysis_id       = $analysis->dbID();
150 151

    $dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($semaphored_job_id and ($dbc->driver ne 'sqlite'));
152 153

  my $sql = qq{$insertion_method INTO job 
154 155
              (input_id, param_id_stack, accu_id_stack, prev_job_id,analysis_id,status,semaphore_count,semaphored_job_id)
              VALUES (?,?,?,?,?,?,?,?)};
Jessica Severin's avatar
Jessica Severin committed
156
 
157
  my $sth       = $dbc->prepare($sql);
158
  my @values    = ($input_id, $param_id_stack || '', $accu_id_stack || '', $prev_job_id, $analysis_id, $job_status, $semaphore_count, $semaphored_job_id);
159 160 161

  my $return_code = $sth->execute(@values)
            # using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
162
        or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
163 164

  my $job_id;
Leo Gordon's avatar
Leo Gordon committed
165
  if($return_code > 0) {    # <--- for the same reason we have to be explicitly numeric here:
166 167 168
      $job_id = $dbc->db_handle->last_insert_id(undef, undef, 'job', 'job_id');
      $sth->finish;

169 170 171
      if($semaphored_job_id and !$push_new_semaphore) {     # if we are not creating a new semaphore (where dependent jobs have already been counted),
                                                            # but rather propagating an existing one (same or other level), we have to up-adjust the counter
            $prev_job->adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id );
172 173 174 175 176 177
      }

      unless($dba->hive_use_triggers()) {
          $dbc->do(qq{
            UPDATE analysis_stats
               SET total_job_count=total_job_count+1
178
          }
179 180 181
          .(($job_status eq 'READY')
                  ? " ,ready_job_count=ready_job_count+1 "
                  : " ,semaphored_job_count=semaphored_job_count+1 "
182 183 184 185
          ).(($dbc->driver eq 'pgsql')
          ? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
          : " ,status =      CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
          )." WHERE analysis_id=$analysis_id ");
186
      }
187
  } else {  #   if we got 0E0, it means "ignored insert collision" (job created previously), so we simply return an undef and deal with it outside
188
  }
189

190
  return $job_id;
191 192 193 194 195 196 197
}

###############################################################################
#
#  INSTANCE methods
#
###############################################################################
Jessica Severin's avatar
Jessica Severin committed
198

199

200
=head2 fetch_all_by_analysis_id_status
201 202

  Arg [1]    : (optional) int $analysis_id
203 204 205 206 207
  Arg [2]    : (optional) string $status
  Arg [3]    : (optional) int $retry_at_least
  Example    : $all_failed_jobs = $adaptor->fetch_all_by_analysis_id_status(undef, 'FAILED');
               $analysis_done_jobs = $adaptor->fetch_all_by_analysis_id_status($analysis->dbID, 'DONE');
  Description: Returns a list of all jobs filtered by given analysis_id (if specified) and given status (if specified).
208 209 210 211
  Returntype : reference to list of Bio::EnsEMBL::Hive::AnalysisJob objects

=cut

212 213
sub fetch_all_by_analysis_id_status {
    my ($self, $analysis_id, $status, $retry_count_at_least) = @_;
214

215
    my @constraints = ();
216 217 218 219 220
    push @constraints, "analysis_id=$analysis_id"             if ($analysis_id);
    push @constraints, "status='$status'"                     if ($status);
    push @constraints, "retry_count >= $retry_count_at_least" if ($retry_count_at_least);

    return $self->fetch_all( join(" AND ", @constraints) );
221 222 223
}


224 225
sub fetch_some_by_analysis_id_limit {
    my ($self, $analysis_id, $limit) = @_;
226

227
    return $self->fetch_all( "analysis_id = '$analysis_id' LIMIT $limit" );
228
}
Jessica Severin's avatar
Jessica Severin committed
229 230


231 232 233
sub fetch_all_incomplete_jobs_by_worker_id {
    my ($self, $worker_id) = @_;

234 235
    my $constraint = "status IN ('COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP') AND worker_id='$worker_id'";
    return $self->fetch_all($constraint);
236 237 238
}


239 240 241 242
sub fetch_by_url_query {
    my ($self, $field_name, $field_value) = @_;

    if($field_name eq 'dbID' and $field_value) {
243

244
        return $self->fetch_by_dbID($field_value);
245

246 247 248
    } else {

        return;
249 250 251 252

    }
}

253
########################
Jessica Severin's avatar
Jessica Severin committed
254 255 256
#
# STORE / UPDATE METHODS
#
257
########################
Jessica Severin's avatar
Jessica Severin committed
258

259

260
sub decrease_semaphore_count_for_jobid {    # used in semaphore annihilation or unsuccessful creation
261
    my $self  = shift @_;
262
    my $jobid = shift @_ or return;
263 264
    my $dec   = shift @_ || 1;

265 266 267
        # NB: BOTH THE ORDER OF UPDATES AND EXACT WORDING IS ESSENTIAL FOR SYNCHRONOUS ATOMIC OPERATION,
        #       otherwise the same command tends to behave differently on MySQL and SQLite (at least)
        #
268 269
    my $sql = "UPDATE job "
        .( ($self->dbc->driver eq 'pgsql')
270 271
        ? "SET status = CAST(CASE WHEN semaphore_count>$dec THEN 'SEMAPHORED' ELSE 'READY' END AS jw_status), "
        : "SET status =      CASE WHEN semaphore_count>$dec THEN 'SEMAPHORED' ELSE 'READY' END, "
272
        ).qq{
273
            semaphore_count=semaphore_count-?
274 275
        WHERE job_id=? AND status='SEMAPHORED'
    };
276
    
277
    $self->dbc->protected_prepare_execute( $sql, $dec, $jobid );
278 279
}

280 281
sub increase_semaphore_count_for_jobid {    # used in semaphore propagation
    my $self  = shift @_;
282
    my $jobid = shift @_ or return;
283 284
    my $inc   = shift @_ || 1;

285
    my $sql = qq{
286
        UPDATE job
287
        SET semaphore_count=semaphore_count+?
288
        WHERE job_id=?
289
    };
290
    
291
    $self->dbc->protected_prepare_execute( $sql, $inc, $jobid );
292 293
}

294

Jessica Severin's avatar
Jessica Severin committed
295
=head2 update_status
Jessica Severin's avatar
Jessica Severin committed
296

Jessica Severin's avatar
Jessica Severin committed
297 298
  Arg [1]    : $analysis_id
  Example    :
299
  Description: updates the job.status in the database
Jessica Severin's avatar
Jessica Severin committed
300
  Returntype : 
Jessica Severin's avatar
Jessica Severin committed
301
  Exceptions :
Jessica Severin's avatar
Jessica Severin committed
302 303
  Caller     : general

Jessica Severin's avatar
Jessica Severin committed
304
=cut
Jessica Severin's avatar
Jessica Severin committed
305

Jessica Severin's avatar
Jessica Severin committed
306
sub update_status {
307
    my ($self, $job) = @_;
Jessica Severin's avatar
Jessica Severin committed
308

309
    my $sql = "UPDATE job SET status='".$job->status."' ";
Leo Gordon's avatar
Leo Gordon committed
310

311
    if($job->status eq 'DONE') {
312
        $sql .= ",completed=CURRENT_TIMESTAMP";
313 314 315
        $sql .= ",runtime_msec=".$job->runtime_msec;
        $sql .= ",query_count=".$job->query_count;
    } elsif($job->status eq 'PASSED_ON') {
316
        $sql .= ", completed=CURRENT_TIMESTAMP";
317 318
    } elsif($job->status eq 'READY') {
    }
Leo Gordon's avatar
Leo Gordon committed
319

320
    $sql .= " WHERE job_id='".$job->dbID."' ";
321

322 323
        # This particular query is infamous for collisions and 'deadlock' situations; let's wait and retry:
    $self->dbc->protected_prepare_execute( $sql );
Jessica Severin's avatar
Jessica Severin committed
324 325
}

326

Jessica Severin's avatar
Jessica Severin committed
327
=head2 store_out_files
Jessica Severin's avatar
Jessica Severin committed
328

329
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisJob $job
Jessica Severin's avatar
Jessica Severin committed
330
  Example    :
331
  Description: update locations of log files, if present
Jessica Severin's avatar
Jessica Severin committed
332 333
  Returntype : 
  Exceptions :
Jessica Severin's avatar
Jessica Severin committed
334 335
  Caller     : Bio::EnsEMBL::Hive::Worker

Jessica Severin's avatar
Jessica Severin committed
336 337 338
=cut

sub store_out_files {
339
    my ($self, $job) = @_;
340

341 342 343 344 345 346 347 348 349
    if($job->stdout_file or $job->stderr_file) {
        my $insert_sql = 'REPLACE INTO job_file (job_id, retry, worker_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
        my $sth = $self->dbc()->prepare($insert_sql);
        $sth->execute($job->dbID(), $job->retry_count(), $job->worker_id(), $job->stdout_file(), $job->stderr_file());
        $sth->finish();
    } else {
        my $sql = 'DELETE from job_file WHERE worker_id='.$job->worker_id.' AND job_id='.$job->dbID;
        $self->dbc->do($sql);
    }
Jessica Severin's avatar
Jessica Severin committed
350 351 352
}


353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
=head2 reset_or_grab_job_by_dbID

  Arg [1]    : int $job_id
  Arg [2]    : int $worker_id (optional)
  Description: resets a job to to 'READY' (if no $worker_id given) or directly to 'CLAIMED' so it can be run again, and fetches it..
               NB: Will also reset a previously 'SEMAPHORED' job to READY.
               The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
               but will not change retry_count if a job has never *really* started.
  Returntype : Bio::EnsEMBL::Hive::AnalysisJob or undef

=cut

sub reset_or_grab_job_by_dbID {
    my $self        = shift;
    my $job_id      = shift;
    my $worker_id   = shift;

    my $new_status  = ($worker_id?'CLAIMED':'READY');

        # Note: the order of the fields being updated is critical!
    my $sql = qq{
        UPDATE job
375
           SET retry_count = CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END
376 377 378 379 380 381 382 383 384 385 386
             , status=?
             , worker_id=?
         WHERE job_id=?
    };
    my @values = ($new_status, $worker_id, $job_id);

    my $sth = $self->prepare( $sql );
    my $return_code = $sth->execute( @values )
        or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
    $sth->finish;

387
    my $job = $self->fetch_by_job_id_AND_status($job_id, $new_status) ;
388 389 390 391 392

    return $job;
}


Leo Gordon's avatar
Leo Gordon committed
393
=head2 grab_jobs_for_worker
Jessica Severin's avatar
Jessica Severin committed
394

Leo Gordon's avatar
Leo Gordon committed
395 396 397 398 399
  Arg [1]           : Bio::EnsEMBL::Hive::Worker object $worker
  Example: 
    my $jobs  = $job_adaptor->grab_jobs_for_worker( $worker );
  Description: 
    For the specified worker, it will search available jobs, 
400
    and using the how_many_this_batch parameter, claim/fetch that
Leo Gordon's avatar
Leo Gordon committed
401 402 403 404 405 406
    number of jobs, and then return them.
  Returntype : 
    reference to array of Bio::EnsEMBL::Hive::AnalysisJob objects
  Caller     : Bio::EnsEMBL::Hive::Worker::run

=cut
Jessica Severin's avatar
Jessica Severin committed
407

Leo Gordon's avatar
Leo Gordon committed
408
sub grab_jobs_for_worker {
409
    my ($self, $worker, $how_many_this_batch, $workers_rank) = @_;
Leo Gordon's avatar
Leo Gordon committed
410
  
411 412 413 414
    my $analysis_id = $worker->analysis_id();
    my $worker_id   = $worker->dbID();
    my $offset      = $how_many_this_batch*$workers_rank;

415
    my $prefix_sql = ($self->dbc->driver eq 'mysql') ? qq{
416 417 418 419 420 421
         UPDATE job j
           JOIN (
                            SELECT job_id
                              FROM job
                             WHERE analysis_id='$analysis_id'
                               AND status='READY'
422 423 424 425 426 427 428 429
    } : qq{
         UPDATE job
           SET worker_id='$worker_id', status='CLAIMED'
         WHERE job_id in (
                            SELECT job_id
                              FROM job
                             WHERE analysis_id='$analysis_id'
                               AND status='READY'
430
    };
431 432 433
    my $virgin_sql = qq{       AND retry_count=0 };
    my $limit_sql  = qq{     LIMIT $how_many_this_batch };
    my $offset_sql = qq{    OFFSET $offset };
434
    my $suffix_sql = ($self->dbc->driver eq 'mysql') ? qq{
435 436 437
                 ) as x
         USING (job_id)
           SET j.worker_id='$worker_id', j.status='CLAIMED'
438
         WHERE j.status='READY'
439 440 441
    } : qq{
                 )
           AND status='READY'
442
    };
443

444
        # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
445 446 447 448
    if(  (my $claim_count = $self->dbc->do( $prefix_sql . $virgin_sql . $limit_sql . $offset_sql . $suffix_sql)) == 0 ) {
        if( ($claim_count = $self->dbc->do( $prefix_sql .               $limit_sql . $offset_sql . $suffix_sql)) == 0 ) {
             $claim_count = $self->dbc->do( $prefix_sql .               $limit_sql .               $suffix_sql);
        }
449
    }
Leo Gordon's avatar
Leo Gordon committed
450

451
    return $self->fetch_all_by_worker_id_AND_status($worker_id, 'CLAIMED') ;
452 453 454
}


Leo Gordon's avatar
Leo Gordon committed
455
=head2 release_undone_jobs_from_worker
456 457

  Arg [1]    : Bio::EnsEMBL::Hive::Worker object
458
  Arg [2]    : optional message to be recorded in 'job_message' table
459 460 461 462
  Example    :
  Description: If a worker has died some of its jobs need to be reset back to 'READY'
               so they can be rerun.
               Jobs in state CLAIMED as simply reset back to READY.
463
               If jobs was 'in progress' (COMPILATION, PRE_CLEANUP, FETCH_INPUT, RUN, WRITE_OUTPUT, POST_CLEANUP) 
464
               the retry_count is increased and the status set back to READY.
465 466
               If the retry_count >= $max_retry_count (3 by default) the job is set
               to 'FAILED' and not rerun again.
467 468 469 470
  Exceptions : $worker must be defined
  Caller     : Bio::EnsEMBL::Hive::Queen

=cut
471

Leo Gordon's avatar
Leo Gordon committed
472
sub release_undone_jobs_from_worker {
473
    my ($self, $worker, $msg) = @_;
474

475
    my $max_retry_count = $worker->analysis->max_retry_count();
476
    my $worker_id       = $worker->dbID();
Leo Gordon's avatar
Leo Gordon committed
477
    my $analysis        = $worker->analysis();
478

Leo Gordon's avatar
Leo Gordon committed
479
        #first just reset the claimed jobs, these don't need a retry_count index increment:
480
        # (previous worker_id does not matter, because that worker has never had a chance to run the job)
Leo Gordon's avatar
Leo Gordon committed
481
    $self->dbc->do( qq{
482
        UPDATE job
483
           SET status='READY', worker_id=NULL
484 485
         WHERE worker_id='$worker_id'
           AND status='CLAIMED'
Leo Gordon's avatar
Leo Gordon committed
486
    } );
487

Leo Gordon's avatar
Leo Gordon committed
488
    my $sth = $self->prepare( qq{
489 490
        SELECT job_id
          FROM job
491
         WHERE worker_id='$worker_id'
492
           AND status in ('COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP')
Leo Gordon's avatar
Leo Gordon committed
493
    } );
494 495
    $sth->execute();

Leo Gordon's avatar
Leo Gordon committed
496
    my $cod = $worker->cause_of_death();
497 498
    $msg ||= "GarbageCollector: The worker died because of $cod";

499 500 501
    my $resource_overusage = ($cod eq 'MEMLIMIT') || ($cod eq 'RUNLIMIT' and $worker->work_done()==0);

    while(my ($job_id) = $sth->fetchrow_array()) {
Leo Gordon's avatar
Leo Gordon committed
502

Leo Gordon's avatar
Leo Gordon committed
503 504
        my $passed_on = 0;  # the flag indicating that the garbage_collection was attempted and was successful

Leo Gordon's avatar
Leo Gordon committed
505
        if( $resource_overusage ) {
Leo Gordon's avatar
Leo Gordon committed
506
            if($passed_on = $self->gc_dataflow( $analysis, $job_id, $cod )) {
507 508
                $msg .= ', performing gc_dataflow';
            }
Leo Gordon's avatar
Leo Gordon committed
509
        }
510
        unless($passed_on) {
Leo Gordon's avatar
Leo Gordon committed
511
            if($passed_on = $self->gc_dataflow( $analysis, $job_id, 'ANYFAILURE' )) {
512 513
                $msg .= ", performing 'ANYFAILURE' gc_dataflow";
            }
Leo Gordon's avatar
Leo Gordon committed
514
        }
515

516
        $self->db()->get_LogMessageAdaptor()->store_job_message($job_id, $msg, not $passed_on );
Leo Gordon's avatar
Leo Gordon committed
517

Leo Gordon's avatar
Leo Gordon committed
518 519 520
        unless($passed_on) {
            $self->release_and_age_job( $job_id, $max_retry_count, not $resource_overusage );
        }
521
    }
Leo Gordon's avatar
Leo Gordon committed
522
    $sth->finish();
523 524
}

525

Leo Gordon's avatar
Leo Gordon committed
526
sub release_and_age_job {
527
    my ($self, $job_id, $max_retry_count, $may_retry, $runtime_msec) = @_;
Leo Gordon's avatar
bugfix?  
Leo Gordon committed
528
    $may_retry ||= 0;
529
    $runtime_msec = "NULL" unless(defined $runtime_msec);
Leo Gordon's avatar
Leo Gordon committed
530
        # NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
531 532 533
        #
        # FIXME: would it be possible to retain worker_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
        #
534 535 536 537 538 539
    $self->dbc->do( 
        "UPDATE job "
        .( ($self->dbc->driver eq 'pgsql')
            ? "SET status = CAST(CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END AS jw_status), "
            : "SET status =      CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END, "
         ).qq{
540 541
               retry_count=retry_count+1,
               runtime_msec=$runtime_msec
542
         WHERE job_id=$job_id
543
           AND status in ('COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP')
Leo Gordon's avatar
Leo Gordon committed
544
    } );
545 546
}

Leo Gordon's avatar
Leo Gordon committed
547
=head2 gc_dataflow
Leo Gordon's avatar
Leo Gordon committed
548

Leo Gordon's avatar
Leo Gordon committed
549 550
    Description:    perform automatic dataflow from a dead job that overused resources if a corresponding dataflow rule was provided
                    Should only be called once during garbage collection phase, when the job is definitely 'abandoned' and not being worked on.
Leo Gordon's avatar
Leo Gordon committed
551 552 553 554

=cut

sub gc_dataflow {
555
    my ($self, $analysis, $job_id, $branch_name) = @_;
Leo Gordon's avatar
Leo Gordon committed
556

557 558 559 560
    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name);

    unless( $self->db->get_DataflowRuleAdaptor->count_all_by_from_analysis_id_AND_branch_code($analysis->dbID, $branch_code) ) {
        return 0;   # just return if no corresponding gc_dataflow rule has been defined
Leo Gordon's avatar
Leo Gordon committed
561 562 563
    }

    my $job = $self->fetch_by_dbID($job_id);
Leo Gordon's avatar
Leo Gordon committed
564

565
    $job->param_init( 0, $analysis->parameters(), $job->input_id() );    # input_id_templates still supported, however to a limited extent
Leo Gordon's avatar
Leo Gordon committed
566

567
    $job->dataflow_output_id( $job->input_id() , $branch_name );
Leo Gordon's avatar
Leo Gordon committed
568 569

    $job->update_status('PASSED_ON');
570 571 572 573

    if(my $semaphored_job_id = $job->semaphored_job_id) {
        $self->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore
    }
Leo Gordon's avatar
Leo Gordon committed
574 575
    
    return 1;
Leo Gordon's avatar
Leo Gordon committed
576 577
}

578

579
=head2 reset_jobs_for_analysis_id
580 581

  Arg [1]    : int $analysis_id
582 583 584 585
  Arg [2]    : bool $all (false by default)
  Description: Resets either all FAILED jobs of an analysis (default)
                or ALL jobs of an analysis to 'READY' and their retry_count to 0.
  Caller     : beekeeper.pl
586 587 588

=cut

589
sub reset_jobs_for_analysis_id {
590 591 592 593 594 595 596 597 598
    my ($self, $analysis_id, $input_statuses) = @_;

    my $status_filter = '';

    if(ref($input_statuses) eq 'ARRAY') {
        $status_filter = 'AND status IN ('.join(', ', map { "'$_'" } @$input_statuses).')';
    } elsif(!$input_statuses) {
        $status_filter = "AND status='FAILED'"; # temporarily keep it here for compatibility
    }
599

600
    my $sql = qq{
601 602 603 604 605 606 607 608
            UPDATE job
           SET retry_count = CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED') THEN 0 ELSE 1 END,
        }. ( ($self->dbc->driver eq 'pgsql')
        ? "status = CAST(CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END AS jw_status) "
        : "status =      CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END "
        ).qq{
            WHERE analysis_id=?
        } . $status_filter;
609

610 611 612
    my $sth = $self->prepare($sql);
    $sth->execute($analysis_id);
    $sth->finish;
613

614
    $self->db->get_AnalysisStatsAdaptor->update_status($analysis_id, 'LOADING');
615 616
}

617

618 619 620 621 622 623 624
=head2 balance_semaphores

  Description: Reset all semaphore_counts to the numbers of unDONE semaphoring jobs.

=cut

sub balance_semaphores {
625
    my ($self, $filter_analysis_id) = @_;
626 627

    my $find_sql    = qq{
628 629 630 631 632 633 634 635 636 637 638
                        SELECT * FROM (
                            SELECT funnel.job_id, funnel.semaphore_count AS was, COALESCE(COUNT(CASE WHEN fan.status!='DONE' AND fan.status!='PASSED_ON' THEN 1 ELSE NULL END),0) AS should
                            FROM job funnel
                            LEFT JOIN job fan ON (funnel.job_id=fan.semaphored_job_id)
                            WHERE }
                        .($filter_analysis_id ? "funnel.analysis_id=$filter_analysis_id AND " : '')
                        .qq{
                            funnel.status='SEMAPHORED'
                            GROUP BY funnel.job_id
                         ) AS internal WHERE was<>should OR should=0
                     };
639 640 641 642 643 644 645 646 647 648 649 650 651 652 653

    my $update_sql  = "UPDATE job SET "
        ." semaphore_count=? , "
        .( ($self->dbc->driver eq 'pgsql')
        ? "status = CAST(CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END AS jw_status) "
        : "status =      CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END "
        )." WHERE job_id=? AND status='SEMAPHORED'";

    my $find_sth    = $self->prepare($find_sql);
    my $update_sth  = $self->prepare($update_sql);

    $find_sth->execute();
    while(my ($job_id, $was, $should) = $find_sth->fetchrow_array()) {
        warn "Balancing semaphore: job_id=$job_id ($was -> $should)\n";
        $update_sth->execute($should, $job_id);
654
        $self->db->get_LogMessageAdaptor->store_job_message( $job_id, "Re-balancing the semaphore_count: $was -> $should", 1 );
655 656 657 658 659 660
    }
    $find_sth->finish;
    $update_sth->finish;
}


661 662 663 664 665 666 667 668 669 670 671 672 673 674
sub fetch_input_ids_for_job_ids {
    my ($self, $job_ids_csv, $id_scale, $id_offset) = @_;
    $id_scale   ||= 1;
    $id_offset  ||= 0;

    my %input_ids = ();

    if( $job_ids_csv ) {

        my $sql = "SELECT job_id, input_id FROM job WHERE job_id in ($job_ids_csv)";
        my $sth = $self->prepare( $sql );
        $sth->execute();

        while(my ($job_id, $input_id) = $sth->fetchrow_array() ) {
675
            if($input_id =~ /^_ext(?:\w+)_data_id (\d+)$/) {
676
                $input_id = $self->db->get_AnalysisDataAdaptor->fetch_by_analysis_data_id_TO_data($1);
677
            }
678 679 680 681 682 683 684
            $input_ids{$job_id * $id_scale + $id_offset} = $input_id;
        }
    }
    return \%input_ids;
}


Jessica Severin's avatar
Jessica Severin committed
685 686
1;