AnalysisJobAdaptor.pm 27.6 KB
Newer Older
1
=pod
2 3 4

=head1 NAME

5
    Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
6 7 8

=head1 SYNOPSIS

9 10
    $analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
    $analysisJobAdaptor = $analysisJob->adaptor;
11 12 13

=head1 DESCRIPTION

14 15 16 17 18
    Module to encapsulate all db access for persistent class AnalysisJob.
    There should be just one per application and database connection.

=head1 LICENSE

19
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Brandon Walts's avatar
Brandon Walts committed
20
    Copyright [2016-2020] EMBL-European Bioinformatics Institute
21 22 23 24 25 26 27 28 29

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
30 31 32

=head1 CONTACT

33
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
34 35 36

=head1 APPENDIX

37 38
    The rest of the documentation details each of the object methods.
    Internal methods are preceded with a _
39 40 41 42 43

=cut


package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
Jessica Severin's avatar
Jessica Severin committed
44 45

use strict;
46
use warnings;
47

48
use Bio::EnsEMBL::Hive::AnalysisJob;
49
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
50
use Bio::EnsEMBL::Hive::Utils ('stringify');
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');


sub default_table_name {
    return 'job';
}


sub object_class {
    return 'Bio::EnsEMBL::Hive::AnalysisJob';
}


sub default_overflow_limit {
    return {
        'input_id'          => 255,
        'param_id_stack'    =>  64,
        'accu_id_stack'     =>  64,
    };
}


74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
=head2 fetch_by_analysis_id_and_input_id

  Arg [1]    : Integer $analysis_id
  Arg [2]    : String $input_id
  Example    : $funnel_job = $job_adaptor->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id);
  Description: Attempts to find the job by contents, then makes another attempt if the input_id is expected to have overflown into analysis_data
  Returntype : AnalysisJob object

=cut

sub fetch_by_analysis_id_and_input_id {     # It is a special case not covered by AUTOLOAD; note the lowercase _and_
    my ($self, $analysis_id, $input_id) = @_;

    my $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id, $input_id);

    if(!$job and length($input_id)>$self->default_overflow_limit->{input_id}) {
        if(my $ext_data_id = $self->db->get_AnalysisDataAdaptor->fetch_by_data_TO_analysis_data_id( $input_id )) {
            $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id, "_extended_data_id $ext_data_id");
        }
    }
    return $job;
}


98 99 100 101 102 103 104 105 106 107
=head2 store_jobs_and_adjust_counters

  Arg [1]    : arrayref of Bio::EnsEMBL::Hive::AnalysisJob $jobs_to_store
  Arg [2]    : (optional) boolean $push_new_semaphore
  Example    : my @output_job_ids = @{ $job_adaptor->store_jobs_and_adjust_counters( \@jobs_to_store ) };
  Description: Attempts to store a list of jobs, returns an arrayref of successfully stored job_ids
  Returntype : Reference to list of job_dbIDs

=cut

108 109
sub store_jobs_and_adjust_counters {
    my ($self, $jobs, $push_new_semaphore) = @_;
110

111
        # NB: our use patterns assume all jobs from the same storing batch share the same semaphored_job_id:
112 113
    my $semaphored_job_id                   = scalar(@$jobs) && $jobs->[0]->semaphored_job_id();
    my $need_to_increase_semaphore_count    = ($semaphored_job_id && !$push_new_semaphore);
114

115 116
    my @output_job_ids              = ();
    my $failed_to_store_local_jobs  = 0;
117 118

    foreach my $job (@$jobs) {
119

120 121
        my $analysis    = $job->analysis;
        my $job_adaptor = $analysis ? $analysis->adaptor->db->get_AnalysisJobAdaptor : $self;   # if analysis object is undefined, consider the job local
122
        my $local_job   = $job_adaptor eq $self;
123

124
            # avoid deadlocks when dataflowing under transactional mode (used in Ortheus Runnable for example):
125 126 127 128 129
        if($need_to_increase_semaphore_count and $local_job and ($job_adaptor->dbc->driver ne 'sqlite')) {
            $job_adaptor->dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" );
        }

        $job->prev_job( undef ) unless( $local_job );   # break the link with the previous job if dataflowing across databases (current schema doesn't support URLs for job_ids)
130

131
        my ($job, $stored_this_time) = $job_adaptor->store( $job );
132 133

        if($stored_this_time) {
134 135
            if($need_to_increase_semaphore_count and $local_job) {  # if we are not creating a new semaphore (where dependent jobs have already been counted),
                                                                    # but rather propagating an existing one (same or other level), we have to up-adjust the counter
136 137 138
                $self->increase_semaphore_count_for_jobid( $semaphored_job_id );
            }

139 140
            unless($job_adaptor->db->hive_use_triggers()) {
                $job_adaptor->dbc->do(qq{
141 142 143 144 145 146
                        UPDATE analysis_stats
                        SET total_job_count=total_job_count+1
                    }
                    .(($job->status eq 'READY')
                        ? " ,ready_job_count=ready_job_count+1 "
                        : " ,semaphored_job_count=semaphored_job_count+1 "
147
                    ).(($job_adaptor->dbc->driver eq 'pgsql')
148 149 150 151 152 153 154 155
                        ? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
                        : " ,status =      CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
                    )." WHERE analysis_id=".$job->analysis_id
                );
            }

            push @output_job_ids, $job->dbID();

156
        } elsif( $local_job ) {
157 158
            $self->db->get_LogMessageAdaptor->store_hive_message( "JobAdaptor failed to store the local Job( analysis_id=".$job->analysis_id.', '.$job->input_id." ), possibly due to a collision", 0 );

159
            $failed_to_store_local_jobs++;
160
        }
161
    }
162

163
        # adjust semaphore_count for jobs that failed to be stored (but have been pre-counted during funnel's creation):
164 165
    if($push_new_semaphore and $failed_to_store_local_jobs) {
        $self->decrease_semaphore_count_for_jobid( $semaphored_job_id, $failed_to_store_local_jobs );
166
    }
167 168

    return \@output_job_ids;
169 170 171
}


172
=head2 fetch_all_by_analysis_id_status
173

174
  Arg [1]    : (optional) listref $list_of_analyses
175 176 177
  Arg [2]    : (optional) string $status
  Arg [3]    : (optional) int $retry_at_least
  Example    : $all_failed_jobs = $adaptor->fetch_all_by_analysis_id_status(undef, 'FAILED');
178
               $analysis_done_jobs = $adaptor->fetch_all_by_analysis_id_status( $list_of_analyses, 'DONE');
179
  Description: Returns a list of all jobs filtered by given analysis_id (if specified) and given status (if specified).
180 181 182 183
  Returntype : reference to list of Bio::EnsEMBL::Hive::AnalysisJob objects

=cut

184
sub fetch_all_by_analysis_id_status {
185
    my ($self, $list_of_analyses, $status, $retry_count_at_least) = @_;
186

187
    my @constraints = ();
188 189 190 191 192 193 194 195 196

    if($list_of_analyses) {
        if(ref($list_of_analyses) eq 'ARRAY') {
            push @constraints, "analysis_id IN (".(join(',', map {$_->dbID} @$list_of_analyses)).")";
        } else {
            push @constraints, "analysis_id=$list_of_analyses"; # for compatibility with old interface
        }
    }

197 198 199 200
    push @constraints, "status='$status'"                     if ($status);
    push @constraints, "retry_count >= $retry_count_at_least" if ($retry_count_at_least);

    return $self->fetch_all( join(" AND ", @constraints) );
201 202 203
}


204 205
sub fetch_some_by_analysis_id_limit {
    my ($self, $analysis_id, $limit) = @_;
206

207
    return $self->fetch_all( "analysis_id = '$analysis_id' LIMIT $limit" );
208
}
Jessica Severin's avatar
Jessica Severin committed
209 210


211 212
sub fetch_all_incomplete_jobs_by_role_id {
    my ($self, $role_id) = @_;
213

214
    my $constraint = "status IN ('CLAIMED','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP') AND role_id='$role_id'";
215
    return $self->fetch_all($constraint);
216 217 218
}


219 220 221 222
sub fetch_by_url_query {
    my ($self, $field_name, $field_value) = @_;

    if($field_name eq 'dbID' and $field_value) {
223

224
        return $self->fetch_by_dbID($field_value);
225

226 227 228
    } else {

        return;
229 230 231 232

    }
}

233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253

sub fetch_job_counts_hashed_by_status {
    my ($self, $requested_analysis_id) = @_;

    my %job_counts = ();

        # Note: this seemingly useless dummy_analysis_id is here to force MySQL use existing index on (analysis_id, status)
    my $sql = "SELECT analysis_id, status, count(*) FROM job WHERE analysis_id=? GROUP BY analysis_id, status";
    my $sth = $self->prepare($sql);
    $sth->execute( $requested_analysis_id );

    while (my ($dummy_analysis_id, $status, $job_count)=$sth->fetchrow_array()) {
        $job_counts{ $status } = $job_count;
    }

    $sth->finish;

    return \%job_counts;
}


254
########################
Jessica Severin's avatar
Jessica Severin committed
255 256 257
#
# STORE / UPDATE METHODS
#
258
########################
Jessica Severin's avatar
Jessica Severin committed
259

260

261
sub decrease_semaphore_count_for_jobid {    # used in semaphore annihilation or unsuccessful creation
262
    my $self  = shift @_;
263
    my $jobid = shift @_ or return;
264 265
    my $dec   = shift @_ || 1;

266 267 268
        # NB: BOTH THE ORDER OF UPDATES AND EXACT WORDING IS ESSENTIAL FOR SYNCHRONOUS ATOMIC OPERATION,
        #       otherwise the same command tends to behave differently on MySQL and SQLite (at least)
        #
269 270
    my $sql = "UPDATE job "
        .( ($self->dbc->driver eq 'pgsql')
271 272
        ? "SET status = CAST(CASE WHEN semaphore_count>$dec THEN 'SEMAPHORED' ELSE 'READY' END AS jw_status), "
        : "SET status =      CASE WHEN semaphore_count>$dec THEN 'SEMAPHORED' ELSE 'READY' END, "
273
        ).qq{
274
            semaphore_count=semaphore_count-?
275 276
        WHERE job_id=? AND status='SEMAPHORED'
    };
277
    
Leo Gordon's avatar
Leo Gordon committed
278 279 280
    $self->dbc->protected_prepare_execute( [ $sql, $dec, $jobid ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'decreasing semaphore_count'.$after, 0 ); }
    );
281 282
}

283 284
sub increase_semaphore_count_for_jobid {    # used in semaphore propagation
    my $self  = shift @_;
285
    my $jobid = shift @_ or return;
286 287
    my $inc   = shift @_ || 1;

288
    my $sql = qq{
289
        UPDATE job
290
        SET semaphore_count=semaphore_count+?
291
        WHERE job_id=?
292
    };
293
    
Leo Gordon's avatar
Leo Gordon committed
294 295 296
    $self->dbc->protected_prepare_execute( [ $sql, $inc, $jobid ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'increasing semaphore_count'.$after, 0 ); }
    );
297 298
}

299

300
=head2 check_in_job
Jessica Severin's avatar
Jessica Severin committed
301

Jessica Severin's avatar
Jessica Severin committed
302 303
  Arg [1]    : $analysis_id
  Example    :
304
  Description: updates the job.status in the database
Jessica Severin's avatar
Jessica Severin committed
305
  Returntype : 
Jessica Severin's avatar
Jessica Severin committed
306
  Exceptions :
Jessica Severin's avatar
Jessica Severin committed
307 308
  Caller     : general

Jessica Severin's avatar
Jessica Severin committed
309
=cut
Jessica Severin's avatar
Jessica Severin committed
310

311
sub check_in_job {
312
    my ($self, $job) = @_;
Jessica Severin's avatar
Jessica Severin committed
313

Leo Gordon's avatar
Leo Gordon committed
314 315
    my $job_id = $job->dbID;

316
    my $sql = "UPDATE job SET status='".$job->status."' ";
Leo Gordon's avatar
Leo Gordon committed
317

318
    if($job->status eq 'DONE') {
319
        $sql .= ",when_completed=CURRENT_TIMESTAMP";
320 321 322
        $sql .= ",runtime_msec=".$job->runtime_msec;
        $sql .= ",query_count=".$job->query_count;
    } elsif($job->status eq 'PASSED_ON') {
323
        $sql .= ", when_completed=CURRENT_TIMESTAMP";
324 325
    } elsif($job->status eq 'READY') {
    }
Leo Gordon's avatar
Leo Gordon committed
326

Leo Gordon's avatar
Leo Gordon committed
327
    $sql .= " WHERE job_id='$job_id' ";
328

329
        # This particular query is infamous for collisions and 'deadlock' situations; let's wait and retry:
Leo Gordon's avatar
Leo Gordon committed
330
    $self->dbc->protected_prepare_execute( [ $sql ],
331
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_job_message( $job_id, "checking the job in".$after, 0 ); }
Leo Gordon's avatar
Leo Gordon committed
332
    );
Jessica Severin's avatar
Jessica Severin committed
333 334
}

335

Jessica Severin's avatar
Jessica Severin committed
336
=head2 store_out_files
Jessica Severin's avatar
Jessica Severin committed
337

338
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisJob $job
Jessica Severin's avatar
Jessica Severin committed
339
  Example    :
340
  Description: update locations of log files, if present
Jessica Severin's avatar
Jessica Severin committed
341 342
  Returntype : 
  Exceptions :
Jessica Severin's avatar
Jessica Severin committed
343 344
  Caller     : Bio::EnsEMBL::Hive::Worker

Jessica Severin's avatar
Jessica Severin committed
345 346 347
=cut

sub store_out_files {
348
    my ($self, $job) = @_;
349

350 351
    # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.

352
    my $delete_sql  = 'DELETE from job_file WHERE job_id=' . $job->dbID . ' AND retry='.$job->retry_count;
353 354
    $self->dbc->do( $delete_sql );

355
    if($job->stdout_file or $job->stderr_file) {
356 357 358 359
        my $insert_sql = 'INSERT INTO job_file (job_id, retry, role_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
        my $insert_sth = $self->dbc->prepare($insert_sql);
        $insert_sth->execute( $job->dbID, $job->retry_count, $job->role_id, $job->stdout_file, $job->stderr_file );
        $insert_sth->finish();
360
    }
Jessica Severin's avatar
Jessica Severin committed
361 362 363
}


364 365 366
=head2 reset_or_grab_job_by_dbID

  Arg [1]    : int $job_id
367 368
  Arg [2]    : int $role_id (optional)
  Description: resets a job to to 'READY' (if no $role_id given) or directly to 'CLAIMED' so it can be run again, and fetches it.
369 370 371 372 373 374 375 376
               NB: Will also reset a previously 'SEMAPHORED' job to READY.
               The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
               but will not change retry_count if a job has never *really* started.
  Returntype : Bio::EnsEMBL::Hive::AnalysisJob or undef

=cut

sub reset_or_grab_job_by_dbID {
377
    my ($self, $job_id, $role_id) = @_;
378

379
    my $new_status  = $role_id ? 'CLAIMED' : 'READY';
380 381 382 383

        # Note: the order of the fields being updated is critical!
    my $sql = qq{
        UPDATE job
384
           SET retry_count = CASE WHEN (status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END
385
             , status=?
386
             , role_id=?
387 388
         WHERE job_id=?
    };
389
    my @values = ($new_status, $role_id, $job_id);
390 391 392 393 394 395

    my $sth = $self->prepare( $sql );
    my $return_code = $sth->execute( @values )
        or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
    $sth->finish;

396
    my $job = $self->fetch_by_job_id_AND_status($job_id, $new_status) ;
397 398 399 400 401

    return $job;
}


402
=head2 grab_jobs_for_role
Jessica Severin's avatar
Jessica Severin committed
403

404 405
  Arg [1]           : Bio::EnsEMBL::Hive::Role object $role
  Arg [2]           : int $how_many_this_role
Leo Gordon's avatar
Leo Gordon committed
406
  Example: 
407
    my $jobs  = $job_adaptor->grab_jobs_for_role( $role, $how_many );
Leo Gordon's avatar
Leo Gordon committed
408
  Description: 
409
    For the specified Role, it will search available jobs, 
410
    and using the how_many_this_batch parameter, claim/fetch that
Leo Gordon's avatar
Leo Gordon committed
411 412 413 414 415 416
    number of jobs, and then return them.
  Returntype : 
    reference to array of Bio::EnsEMBL::Hive::AnalysisJob objects
  Caller     : Bio::EnsEMBL::Hive::Worker::run

=cut
Jessica Severin's avatar
Jessica Severin committed
417

418 419
sub grab_jobs_for_role {
    my ($self, $role, $how_many_this_batch) = @_;
420 421

    return [] unless( $how_many_this_batch );
Leo Gordon's avatar
Leo Gordon committed
422
  
423 424 425 426
    my $analysis_id     = $role->analysis_id;
    my $role_id         = $role->dbID;
    my $role_rank       = $self->db->get_RoleAdaptor->get_role_rank( $role );
    my $offset          = $how_many_this_batch * $role_rank;
427

428
    my $prefix_sql = ($self->dbc->driver eq 'mysql') ? qq{
429 430 431 432 433 434
         UPDATE job j
           JOIN (
                            SELECT job_id
                              FROM job
                             WHERE analysis_id='$analysis_id'
                               AND status='READY'
435 436
    } : qq{
         UPDATE job
437
           SET role_id='$role_id', status='CLAIMED'
438 439 440 441 442
         WHERE job_id in (
                            SELECT job_id
                              FROM job
                             WHERE analysis_id='$analysis_id'
                               AND status='READY'
443
    };
444 445 446
    my $virgin_sql = qq{       AND retry_count=0 };
    my $limit_sql  = qq{     LIMIT $how_many_this_batch };
    my $offset_sql = qq{    OFFSET $offset };
447
    my $suffix_sql = ($self->dbc->driver eq 'mysql') ? qq{
448 449
                 ) as x
         USING (job_id)
450
           SET j.role_id='$role_id', j.status='CLAIMED'
451
         WHERE j.status='READY'
452 453 454
    } : qq{
                 )
           AND status='READY'
455
    };
456

Leo Gordon's avatar
Leo Gordon committed
457 458
    my $claim_count;

459
        # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
Leo Gordon's avatar
Leo Gordon committed
460 461 462 463 464 465 466 467 468
    if(  0 == ($claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql . $virgin_sql . $limit_sql . $offset_sql . $suffix_sql ],
                    sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a virgin batch of offset jobs".$after, 0 ); }
    ))) {
        if( 0 == ($claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql .               $limit_sql . $offset_sql . $suffix_sql ],
                        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a non-virgin batch of offset jobs".$after, 0 ); }
        ))) {
             $claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql .               $limit_sql .               $suffix_sql ],
                            sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a non-virgin batch of non-offset jobs".$after, 0 ); }
             );
469
        }
470
    }
Leo Gordon's avatar
Leo Gordon committed
471

472 473
    $self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'ready_job_count', -$claim_count, $analysis_id );

Leo Gordon's avatar
Leo Gordon committed
474
    return $claim_count ? $self->fetch_all_by_role_id_AND_status($role_id, 'CLAIMED') : [];
475 476 477
}


478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
sub release_claimed_jobs_from_role {
    my ($self, $role) = @_;

        # previous value of role_id is not important, because that Role never had a chance to run the jobs
    my $num_released_jobs = $self->dbc->protected_prepare_execute( [ "UPDATE job SET status='READY', role_id=NULL WHERE role_id=? AND status='CLAIMED'", $role->dbID ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "releasing claimed jobs from role".$after, 0 ); }
    );

    my $analysis_stats_adaptor  = $self->db->get_AnalysisStatsAdaptor;
    my $analysis_id             = $role->analysis_id;

    $analysis_stats_adaptor->increment_a_counter( 'ready_job_count', $num_released_jobs, $analysis_id );

#    $analysis_stats_adaptor->update_status( $analysis_id, 'LOADING' );
}


495
=head2 release_undone_jobs_from_role
496

497
  Arg [1]    : Bio::EnsEMBL::Hive::Role object
498
  Arg [2]    : optional message to be recorded in 'job_message' table
499
  Example    :
500
  Description: If a Worker has died some of its jobs need to be reset back to 'READY'
501 502
               so they can be rerun.
               Jobs in state CLAIMED as simply reset back to READY.
503
               If jobs was 'in progress' (PRE_CLEANUP, FETCH_INPUT, RUN, WRITE_OUTPUT, POST_CLEANUP) 
504
               the retry_count is increased and the status set back to READY.
505 506
               If the retry_count >= $max_retry_count (3 by default) the job is set
               to 'FAILED' and not rerun again.
507
  Exceptions : $role must be defined
508 509 510
  Caller     : Bio::EnsEMBL::Hive::Queen

=cut
511

512 513
sub release_undone_jobs_from_role {
    my ($self, $role, $msg) = @_;
514

515 516 517 518
    my $role_id         = $role->dbID;
    my $analysis        = $role->analysis;
    my $max_retry_count = $analysis->max_retry_count;
    my $worker          = $role->worker;
519

Leo Gordon's avatar
Leo Gordon committed
520
        #first just reset the claimed jobs, these don't need a retry_count index increment:
521
    $self->release_claimed_jobs_from_role( $role );
522

Leo Gordon's avatar
Leo Gordon committed
523
    my $sth = $self->prepare( qq{
524 525
        SELECT job_id
          FROM job
526
         WHERE role_id='$role_id'
527
           AND status in ('PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP')
Leo Gordon's avatar
Leo Gordon committed
528
    } );
529 530
    $sth->execute();

531
    my $cod = $worker->cause_of_death() || 'UNKNOWN';
532 533
    $msg ||= "GarbageCollector: The worker died because of $cod";

534 535 536
    my $resource_overusage = ($cod eq 'MEMLIMIT') || ($cod eq 'RUNLIMIT' and $worker->work_done()==0);

    while(my ($job_id) = $sth->fetchrow_array()) {
Leo Gordon's avatar
Leo Gordon committed
537

Leo Gordon's avatar
Leo Gordon committed
538 539
        my $passed_on = 0;  # the flag indicating that the garbage_collection was attempted and was successful

Leo Gordon's avatar
Leo Gordon committed
540
        if( $resource_overusage ) {
Leo Gordon's avatar
Leo Gordon committed
541
            if($passed_on = $self->gc_dataflow( $analysis, $job_id, $cod )) {
542 543
                $msg .= ', performing gc_dataflow';
            }
Leo Gordon's avatar
Leo Gordon committed
544
        }
545
        unless($passed_on) {
Leo Gordon's avatar
Leo Gordon committed
546
            if($passed_on = $self->gc_dataflow( $analysis, $job_id, 'ANYFAILURE' )) {
547 548
                $msg .= ", performing 'ANYFAILURE' gc_dataflow";
            }
Leo Gordon's avatar
Leo Gordon committed
549
        }
550

551
        $self->db()->get_LogMessageAdaptor()->store_job_message($job_id, $msg, not $passed_on );
Leo Gordon's avatar
Leo Gordon committed
552

Leo Gordon's avatar
Leo Gordon committed
553 554 555
        unless($passed_on) {
            $self->release_and_age_job( $job_id, $max_retry_count, not $resource_overusage );
        }
556 557

        $role->register_attempt( 0 );
558
    }
Leo Gordon's avatar
Leo Gordon committed
559
    $sth->finish();
560 561
}

562

Leo Gordon's avatar
Leo Gordon committed
563
sub release_and_age_job {
564
    my ($self, $job_id, $max_retry_count, $may_retry, $runtime_msec) = @_;
Leo Gordon's avatar
bugfix?  
Leo Gordon committed
565
    $may_retry ||= 0;
566
    $runtime_msec = "NULL" unless(defined $runtime_msec);
Leo Gordon's avatar
Leo Gordon committed
567
        # NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
568
        #
569
        # FIXME: would it be possible to retain role_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
570
        #
571 572 573
    $self->dbc->do( 
        "UPDATE job "
        .( ($self->dbc->driver eq 'pgsql')
574
            ? "SET status = CAST(CASE WHEN ($may_retry != 0) AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END AS jw_status), "
575 576
            : "SET status =      CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END, "
         ).qq{
577 578
               retry_count=retry_count+1,
               runtime_msec=$runtime_msec
579
         WHERE job_id=$job_id
580
           AND status in ('CLAIMED','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP')
Leo Gordon's avatar
Leo Gordon committed
581
    } );
582 583 584 585 586

        # FIXME: move the decision making completely to the API side and so avoid the potential race condition.
    my $job         = $self->fetch_by_dbID( $job_id );

    $self->db->get_AnalysisStatsAdaptor->increment_a_counter( ($job->status eq 'FAILED') ? 'failed_job_count' : 'ready_job_count', 1, $job->analysis_id );
587 588
}

589

Leo Gordon's avatar
Leo Gordon committed
590
=head2 gc_dataflow
Leo Gordon's avatar
Leo Gordon committed
591

Leo Gordon's avatar
Leo Gordon committed
592 593
    Description:    perform automatic dataflow from a dead job that overused resources if a corresponding dataflow rule was provided
                    Should only be called once during garbage collection phase, when the job is definitely 'abandoned' and not being worked on.
Leo Gordon's avatar
Leo Gordon committed
594 595 596 597

=cut

sub gc_dataflow {
598
    my ($self, $analysis, $job_id, $branch_name) = @_;
Leo Gordon's avatar
Leo Gordon committed
599

600 601 602 603
    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name);

    unless( $self->db->get_DataflowRuleAdaptor->count_all_by_from_analysis_id_AND_branch_code($analysis->dbID, $branch_code) ) {
        return 0;   # just return if no corresponding gc_dataflow rule has been defined
Leo Gordon's avatar
Leo Gordon committed
604 605 606
    }

    my $job = $self->fetch_by_dbID($job_id);
607
    $job->analysis( $analysis );
Leo Gordon's avatar
Leo Gordon committed
608

609
    $job->load_parameters();    # input_id_templates still supported, however to a limited extent
Leo Gordon's avatar
Leo Gordon committed
610

611
    $job->dataflow_output_id( $job->input_id() , $branch_name );
Leo Gordon's avatar
Leo Gordon committed
612

613
    $job->set_and_update_status('PASSED_ON');
614

615 616 617
        # PASSED_ON jobs are included in done_job_count
    $self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'done_job_count', 1, $analysis->dbID );

618 619 620
    if(my $semaphored_job_id = $job->semaphored_job_id) {
        $self->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore
    }
Leo Gordon's avatar
Leo Gordon committed
621 622
    
    return 1;
Leo Gordon's avatar
Leo Gordon committed
623 624
}

625

626
=head2 reset_jobs_for_analysis_id
627 628

  Arg [1]    : int $analysis_id
629 630 631 632
  Arg [2]    : bool $all (false by default)
  Description: Resets either all FAILED jobs of an analysis (default)
                or ALL jobs of an analysis to 'READY' and their retry_count to 0.
  Caller     : beekeeper.pl
633 634 635

=cut

636
sub reset_jobs_for_analysis_id {
637
    my ($self, $list_of_analyses, $input_statuses) = @_;
638

639 640 641
    my $analyses_filter = ( ref($list_of_analyses) eq 'ARRAY' )
        ? 'analysis_id IN ('.join(',', map { $_->dbID } @$list_of_analyses).')'
        : 'analysis_id='.$list_of_analyses;     # compatibility mode (to be deprecated)
642

643 644 645 646 647
    my $statuses_filter = (ref($input_statuses) eq 'ARRAY')
        ? 'AND status IN ('.join(', ', map { "'$_'" } @$input_statuses).')'
        : (!$input_statuses)
            ? "AND status='FAILED'"             # compatibility mode (to be deprecated)
            : '';
648

649
    my $sql = qq{
650
            UPDATE job
651
           SET retry_count = CASE WHEN (status='READY' OR status='CLAIMED') THEN 0 ELSE 1 END,
652 653 654
        }. ( ($self->dbc->driver eq 'pgsql')
        ? "status = CAST(CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END AS jw_status) "
        : "status =      CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END "
655 656
        )." WHERE ".$analyses_filter
        .' '. $statuses_filter;
657

658
    my $sth = $self->prepare($sql);
659
    $sth->execute();
660
    $sth->finish;
661

662 663 664 665 666 667 668
    if( ref($list_of_analyses) eq 'ARRAY' ) {
        foreach my $analysis ( @$list_of_analyses ) {
            $self->db->get_AnalysisStatsAdaptor->update_status($analysis->dbID, 'LOADING');
        }
    } else {
        $self->db->get_AnalysisStatsAdaptor->update_status($list_of_analyses, 'LOADING');   # compatibility mode (to be deprecated)
    }
669 670
}

671

672 673 674 675 676 677 678
=head2 balance_semaphores

  Description: Reset all semaphore_counts to the numbers of unDONE semaphoring jobs.

=cut

sub balance_semaphores {
679
    my ($self, $list_of_analyses) = @_;
680

681 682
    my $analysis_filter = $list_of_analyses
        ? "funnel.analysis_id IN (".join(',', map { $_->dbID } @$list_of_analyses).") AND"
683
        : '';
684 685

    my $find_sql    = qq{
686 687 688 689
                        SELECT * FROM (
                            SELECT funnel.job_id, funnel.semaphore_count AS was, COALESCE(COUNT(CASE WHEN fan.status!='DONE' AND fan.status!='PASSED_ON' THEN 1 ELSE NULL END),0) AS should
                            FROM job funnel
                            LEFT JOIN job fan ON (funnel.job_id=fan.semaphored_job_id)
690
                            WHERE $analysis_filter
691
                            funnel.status in ('SEMAPHORED', 'READY')
692 693 694
                            GROUP BY funnel.job_id
                         ) AS internal WHERE was<>should OR should=0
                     };
695 696

    my $update_sql  = "UPDATE job SET "
697
        ." semaphore_count=semaphore_count+? , "
698 699 700
        .( ($self->dbc->driver eq 'pgsql')
        ? "status = CAST(CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END AS jw_status) "
        : "status =      CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END "
701
        )." WHERE job_id=? AND status IN ('SEMAPHORED', 'READY')";
702 703 704 705

    my $find_sth    = $self->prepare($find_sql);
    my $update_sth  = $self->prepare($update_sql);

706 707
    my $rebalanced_jobs_counter = 0;

708 709
    $find_sth->execute();
    while(my ($job_id, $was, $should) = $find_sth->fetchrow_array()) {
710 711 712 713 714 715 716 717
        my $msg;
        if(0<$should and $should<$was) {    # we choose not to lower the counter if it's not time to unblock yet
            $msg = "Semaphore count may need rebalancing, but it is not critical now, so leaving it on automatic: $was -> $should";
            $self->db->get_LogMessageAdaptor->store_job_message( $job_id, $msg, 0 );
        } else {
            $update_sth->execute($should-$was, $job_id);
            $msg = "Semaphore count needed rebalancing now, so performing: $was -> $should";
            $self->db->get_LogMessageAdaptor->store_job_message( $job_id, $msg, 1 );
718
            $rebalanced_jobs_counter++;
719 720
        }
        warn "[Job $job_id] $msg\n";    # TODO: integrate the STDERR diagnostic output with LogMessageAdaptor calls in general
721 722 723
    }
    $find_sth->finish;
    $update_sth->finish;
724 725

    return $rebalanced_jobs_counter;
726 727 728
}


729 730 731 732 733 734 735 736 737 738 739 740 741 742
sub fetch_input_ids_for_job_ids {
    my ($self, $job_ids_csv, $id_scale, $id_offset) = @_;
    $id_scale   ||= 1;
    $id_offset  ||= 0;

    my %input_ids = ();

    if( $job_ids_csv ) {

        my $sql = "SELECT job_id, input_id FROM job WHERE job_id in ($job_ids_csv)";
        my $sth = $self->prepare( $sql );
        $sth->execute();

        while(my ($job_id, $input_id) = $sth->fetchrow_array() ) {
743
            if($input_id =~ /^_ext(?:\w+)_data_id (\d+)$/) {
744
                $input_id = $self->db->get_AnalysisDataAdaptor->fetch_by_analysis_data_id_TO_data($1);
745
            }
746 747 748 749 750 751 752
            $input_ids{$job_id * $id_scale + $id_offset} = $input_id;
        }
    }
    return \%input_ids;
}


Jessica Severin's avatar
Jessica Severin committed
753 754
1;