AnalysisJobAdaptor.pm 43.1 KB
Newer Older
1
=pod
2 3 4

=head1 NAME

5
    Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
6 7 8

=head1 SYNOPSIS

9 10
    $analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
    $analysisJobAdaptor = $analysisJob->adaptor;
11 12 13

=head1 DESCRIPTION

14 15 16 17 18
    Module to encapsulate all db access for persistent class AnalysisJob.
    There should be just one per application and database connection.

=head1 LICENSE

19
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
nwillhoft's avatar
nwillhoft committed
20
    Copyright [2016-2021] EMBL-European Bioinformatics Institute
21 22 23 24 25 26 27 28 29

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
30 31 32

=head1 CONTACT

33
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
34 35 36

=head1 APPENDIX

37 38
    The rest of the documentation details each of the object methods.
    Internal methods are preceded with a _
39 40 41 42 43

=cut


package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
Jessica Severin's avatar
Jessica Severin committed
44 45

use strict;
46
use warnings;
47

48
use Bio::EnsEMBL::Hive::Cacheable;
49
use Bio::EnsEMBL::Hive::Semaphore;
50
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
51
use Bio::EnsEMBL::Hive::Utils ('stringify', 'destringify');
52

53 54 55
use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');


56 57 58 59 60
# This variable must be kept up-to-date ! It is used in a number of queries.
# CLAIMED is missing on purpose because not all the queries actually need it.
my $ALL_STATUSES_OF_RUNNING_JOBS = q{'PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_HEALTHCHECK','POST_CLEANUP'};


61 62 63 64 65
sub default_table_name {
    return 'job';
}


66
sub default_insertion_method {
67
    return 'INSERT';
68 69 70
}


71 72 73 74 75 76 77 78 79 80 81 82 83 84
sub object_class {
    return 'Bio::EnsEMBL::Hive::AnalysisJob';
}


sub default_overflow_limit {
    return {
        'input_id'          => 255,
        'param_id_stack'    =>  64,
        'accu_id_stack'     =>  64,
    };
}


85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
=head2 job_status_cast

  Example     : $job_adaptor->job_status_cast();
  Description : Returns a job-status expression that the SQL driver understands.
                This is needed for PostgreSQL
  Returntype  : String
  Exceptions  : none

=cut

sub job_status_cast {
    my ($self, $status_string) = @_;
    if ($self->dbc->driver eq 'pgsql') {
        return "CAST($status_string AS job_status)";
    } else {
        return $status_string;
    }
}


105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
=head2 fetch_by_analysis_id_and_input_id

  Arg [1]    : Integer $analysis_id
  Arg [2]    : String $input_id
  Example    : $funnel_job = $job_adaptor->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id);
  Description: Attempts to find the job by contents, then makes another attempt if the input_id is expected to have overflown into analysis_data
  Returntype : AnalysisJob object

=cut

sub fetch_by_analysis_id_and_input_id {     # It is a special case not covered by AUTOLOAD; note the lowercase _and_
    my ($self, $analysis_id, $input_id) = @_;

    my $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id, $input_id);

    if(!$job and length($input_id)>$self->default_overflow_limit->{input_id}) {
121
        if(my $ext_data_id = $self->db->get_AnalysisDataAdaptor->fetch_by_data_to_analysis_data_id( $input_id )) {
122 123 124 125 126 127 128
            $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id, "_extended_data_id $ext_data_id");
        }
    }
    return $job;
}


129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
sub class_specific_execute {
    my ($self, $object, $sth, $values) = @_;

    my $return_code;

    eval {
        $return_code = $self->SUPER::class_specific_execute($object, $sth, $values);
        1;
    } or do {
        my $duplicate_regex = {
            'mysql'     => qr/Duplicate entry.+?for key/s,
            'sqlite'    => qr/columns.+?are not unique|UNIQUE constraint failed/s,  # versions around 3.8 spit the first msg, versions around 3.15 - the second
            'pgsql'     => qr/duplicate key value violates unique constraint/s,
        }->{$self->db->dbc->driver};

        if( $@ =~ $duplicate_regex ) {      # implementing 'INSERT IGNORE' of Jobs on the API side
            my $emitting_job_id = $object->prev_job_id;
            my $analysis_id     = $object->analysis_id;
            my $input_id        = $object->input_id;
            my $msg             = "Attempt to insert a duplicate job (analysis_id=$analysis_id, input_id=$input_id) intercepted and ignored";

            $self->db->get_LogMessageAdaptor->store_job_message( $emitting_job_id, $msg, 'INFO' );

            $return_code = '0E0';
        } else {
            die $@;
        }
    };

    return $return_code;
}


162 163 164 165
=head2 store_jobs_and_adjust_counters

  Arg [1]    : arrayref of Bio::EnsEMBL::Hive::AnalysisJob $jobs_to_store
  Arg [2]    : (optional) boolean $push_new_semaphore
166
  Arg [3]    : (optional) Int $emitting_job_id
167 168 169 170 171 172
  Example    : my @output_job_ids = @{ $job_adaptor->store_jobs_and_adjust_counters( \@jobs_to_store ) };
  Description: Attempts to store a list of jobs, returns an arrayref of successfully stored job_ids
  Returntype : Reference to list of job_dbIDs

=cut

173
sub store_jobs_and_adjust_counters {
174
    my ($self, $jobs, $push_new_semaphore, $emitting_job_id) = @_;
175

176 177
    my @output_job_ids                      = ();

178 179 180 181 182 183
        # NB: our use patterns assume all jobs from the same storing batch share the same controlled_semaphore:
    my $controlled_semaphore                = scalar(@$jobs) && $jobs->[0]->controlled_semaphore;
    my @jobs_that_failed_to_store           = ();

    if( $controlled_semaphore && !$push_new_semaphore ) {   # only if it has not been done yet
        $controlled_semaphore->increase_by( $jobs );  # "pre-increase" the semaphore counts before creating the controlling jobs
184
    }
185 186

    foreach my $job (@$jobs) {
187

188 189
        my $analysis    = $job->analysis;
        my $job_adaptor = $analysis ? $analysis->adaptor->db->get_AnalysisJobAdaptor : $self;   # if analysis object is undefined, consider the job local
190
        my $prev_adaptor= ($job->prev_job && $job->prev_job->adaptor) || '';
191 192 193 194 195 196 197 198 199 200 201 202 203 204
        my $job_is_local_to_parent  = $prev_adaptor eq $job_adaptor;

        if( $controlled_semaphore ) {
            my $job_hive_pipeline = $job->hive_pipeline;

            if( $controlled_semaphore->hive_pipeline ne $job_hive_pipeline ) {      # if $job happens to be remote to $controlled_semaphore,
                                                                                    # introduce another job-local semaphore between $job and $controlled_semaphore:
                my $job_local_semaphore = Bio::EnsEMBL::Hive::Semaphore->new(
                    'hive_pipeline'             => $job_hive_pipeline,
                    'dependent_semaphore_url'   => $controlled_semaphore->relative_url( $job_hive_pipeline ),
                    'local_jobs_counter'        => 1,
                    'remote_jobs_counter'       => 0,
                );
                $job_adaptor->db->get_SemaphoreAdaptor->store( $job_local_semaphore );
205

206
                $job->controlled_semaphore( $job_local_semaphore );
207 208
            }
        }
209

210
        if( $job_adaptor ne $prev_adaptor ) {
211
            $job->prev_job_id( undef );             # job_ids are local, so for remote jobs they have to be cleaned up before storing
212
        }
213

214
        my ($job, $stored_this_time) = $job_adaptor->store( $job );
215 216 217

        if($stored_this_time) {

218
            unless($job_adaptor->db->hive_pipeline->hive_use_triggers()) {
219
                $job_adaptor->dbc->do(qq{
220 221 222 223 224 225
                        UPDATE analysis_stats
                        SET total_job_count=total_job_count+1
                    }
                    .(($job->status eq 'READY')
                        ? " ,ready_job_count=ready_job_count+1 "
                        : " ,semaphored_job_count=semaphored_job_count+1 "
226
                    ).(($job_adaptor->dbc->driver eq 'pgsql')
227 228 229 230 231 232
                        ? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
                        : " ,status =      CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
                    )." WHERE analysis_id=".$job->analysis_id
                );
            }

233
            push @output_job_ids, $job->dbID();     # FIXME: this ID may not make much cross-db sense
234

235
        } else {
236 237
            push @jobs_that_failed_to_store, $job;

238
            my $msg = "JobAdaptor failed to store the "
239
                     . ($job_is_local_to_parent ? 'local' : 'remote')
240
                     . " Job( analysis_id=".$job->analysis_id.', '.$job->input_id." ), possibly due to a collision";
241
            if ($job_is_local_to_parent && $emitting_job_id) {
242
                $self->db->get_LogMessageAdaptor->store_job_message($emitting_job_id, $msg, 'PIPELINE_CAUTION');
243
            } else {
244
                $self->db->get_LogMessageAdaptor->store_hive_message($msg, 'PIPELINE_CAUTION');
245
            }
246

247
        }
248
    }
249

250 251
    if( $controlled_semaphore && scalar(@jobs_that_failed_to_store) ) {
        $controlled_semaphore->decrease_by( \@jobs_that_failed_to_store );
252
    }
253 254

    return \@output_job_ids;
255 256 257
}


258 259 260 261 262 263
=head2 store_a_semaphored_group_of_jobs

  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisJob $funnel_job
  Arg [2]    : arrayref of Bio::EnsEMBL::Hive::AnalysisJob $fan_jobs
  Arg [3]    : (optional) Bio::EnsEMBL::Hive::AnalysisJob $emitting_job
  Arg [4]    : (optional) boolean $no_leeching
264
  Example    : my ($funnel_semaphore_id, $funnel_job_id, @fan_job_ids) = $job_adaptor->store_a_semaphored_group_of_jobs( $funnel_job, $fan_jobs, $emitting_job );
265
  Description: Attempts to store a semaphored group of jobs, returns a list of successfully stored job_ids
266
  Returntype : ($funnel_semaphore_id, $funnel_job_id, @fan_job_ids)
267 268 269 270 271 272 273 274 275

=cut

sub store_a_semaphored_group_of_jobs {
    my ($self, $funnel_job, $fan_jobs, $emitting_job, $no_leeching) = @_;

    my $emitting_job_id;

    if($emitting_job) {
276 277 278 279
        if($funnel_job) {
            $funnel_job->prev_job( $emitting_job );
            $funnel_job->controlled_semaphore( $emitting_job->controlled_semaphore );   # propagate parent's semaphore if any
        }
280 281 282
        $emitting_job_id = $emitting_job->dbID;
    }

283 284 285
    my $funnel_semaphore;
    my $funnel_semaphore_adaptor    = $self->db->get_SemaphoreAdaptor;  # assuming $self was $funnel_job_adaptor

286
    my ($funnel_job_id)     = $funnel_job ? @{ $self->store_jobs_and_adjust_counters( [ $funnel_job ], 0, $emitting_job_id) } : ();
287

288
    if($funnel_job && !$funnel_job_id) {    # apparently the funnel_job has been created previously, trying to leech to it:
289

290 291 292 293 294 295
        if($no_leeching) {
            die "The funnel job could not be stored, but leeching was not allowed, so bailing out";

        } elsif( $funnel_job = $self->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id) ) {
            $funnel_job_id = $funnel_job->dbID;

296 297 298 299 300 301 302
            # If the job hasn't run yet, we can still block it
            if ($funnel_job->status eq 'READY') {
                # Mark the job as SEMAPHORED to make sure it's not taken by any worker
                $self->semaphore_job_by_id($funnel_job_id);
                $self->refresh($funnel_job);
            }

303
            if( $funnel_job->status eq 'SEMAPHORED' ) {
304

305 306 307 308 309 310 311 312 313 314 315 316
                $funnel_semaphore = $funnel_job->fetch_local_blocking_semaphore();

                # Create if it was missing
                unless ($funnel_semaphore) {
                    $funnel_semaphore = Bio::EnsEMBL::Hive::Semaphore->new(
                        'hive_pipeline'         => $funnel_job->hive_pipeline,
                        'dependent_job_id'      => $funnel_job_id,
                        'local_jobs_counter'    => 0,   # Will be updated below
                        'remote_jobs_counter'   => 0,   # Will be updated below
                    );
                    $funnel_semaphore_adaptor->store( $funnel_semaphore );
                }
317 318

                $funnel_semaphore->increase_by( $fan_jobs );  # "pre-increase" the semaphore counts before creating the controlling jobs
319

320
                $self->db->get_LogMessageAdaptor->store_job_message($emitting_job_id, "Discovered and using an existing funnel ".$funnel_job->toString, 'INFO');
321 322 323 324 325 326
            } else {
                die "The funnel job (id=$funnel_job_id) fetched from the database was not in SEMAPHORED status";
            }
        } else {
            die "The funnel job could neither be stored nor fetched";
        }
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
    } else {    # Either the $funnel_job was successfully stored, or there wasn't any $funnel_job to start with:

        my $whose_hive_pipeline = $funnel_job || $self->db;

        my ($local_count, $remote_count)    = Bio::EnsEMBL::Hive::Cacheable::count_local_and_remote_objects( $whose_hive_pipeline, $fan_jobs );

        $funnel_semaphore = Bio::EnsEMBL::Hive::Semaphore->new(
            'hive_pipeline'         => $whose_hive_pipeline->hive_pipeline,
            'dependent_job_id'      => $funnel_job_id,
            'local_jobs_counter'    => $local_count,
            'remote_jobs_counter'   => $remote_count,
        );
        $funnel_semaphore_adaptor->store( $funnel_semaphore );

        $funnel_semaphore->release_if_ripe();
342 343 344
    }

    foreach my $fan_job (@$fan_jobs) {  # set the funnel in every fan's job:
345
        $fan_job->controlled_semaphore( $funnel_semaphore );
346 347 348 349
    }

    my (@fan_job_ids) = @{ $self->store_jobs_and_adjust_counters( $fan_jobs, 1, $emitting_job_id) };

350
    return ($funnel_semaphore->dbID, $funnel_job_id, @fan_job_ids);
351 352 353 354
}



355
=head2 fetch_all_by_analysis_id_status
356

357
  Arg [1]    : (optional) listref $list_of_analyses
358 359 360
  Arg [2]    : (optional) string $status
  Arg [3]    : (optional) int $retry_at_least
  Example    : $all_failed_jobs = $adaptor->fetch_all_by_analysis_id_status(undef, 'FAILED');
361
               $analysis_done_jobs = $adaptor->fetch_all_by_analysis_id_status( $list_of_analyses, 'DONE');
362
  Description: Returns a list of all jobs filtered by given analysis_id (if specified) and given status (if specified).
363 364 365 366
  Returntype : reference to list of Bio::EnsEMBL::Hive::AnalysisJob objects

=cut

367
sub fetch_all_by_analysis_id_status {
368
    my ($self, $list_of_analyses, $status, $retry_count_at_least) = @_;
369

370
    my @constraints = ();
371 372 373 374 375 376 377 378 379

    if($list_of_analyses) {
        if(ref($list_of_analyses) eq 'ARRAY') {
            push @constraints, "analysis_id IN (".(join(',', map {$_->dbID} @$list_of_analyses)).")";
        } else {
            push @constraints, "analysis_id=$list_of_analyses"; # for compatibility with old interface
        }
    }

380 381 382 383
    push @constraints, "status='$status'"                     if ($status);
    push @constraints, "retry_count >= $retry_count_at_least" if ($retry_count_at_least);

    return $self->fetch_all( join(" AND ", @constraints) );
384 385 386
}


387 388
sub fetch_some_by_analysis_id_limit {
    my ($self, $analysis_id, $limit) = @_;
389

390
    return $self->fetch_all( "analysis_id = '$analysis_id' LIMIT $limit" );
391
}
Jessica Severin's avatar
Jessica Severin committed
392 393


394 395
sub fetch_all_incomplete_jobs_by_role_id {
    my ($self, $role_id) = @_;
396

397
    my $constraint = "status IN ('CLAIMED',$ALL_STATUSES_OF_RUNNING_JOBS) AND role_id='$role_id'";
398
    return $self->fetch_all($constraint);
399 400 401
}


402 403 404 405 406 407 408 409
sub fetch_all_unfinished_jobs_with_no_roles {
    my $self = shift;

        # the list should contain all status'es that are not "in progress":
    return $self->fetch_all( "role_id IS NULL AND status NOT IN ('DONE', 'READY', 'FAILED', 'PASSED_ON', 'SEMAPHORED')" );
}


410 411 412 413
sub fetch_by_url_query {
    my ($self, $field_name, $field_value) = @_;

    if($field_name eq 'dbID' and $field_value) {
414

415
        return $self->fetch_by_dbID($field_value);
416

417 418 419
    } else {

        return;
420 421 422 423

    }
}

424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444

sub fetch_job_counts_hashed_by_status {
    my ($self, $requested_analysis_id) = @_;

    my %job_counts = ();

        # Note: this seemingly useless dummy_analysis_id is here to force MySQL use existing index on (analysis_id, status)
    my $sql = "SELECT analysis_id, status, count(*) FROM job WHERE analysis_id=? GROUP BY analysis_id, status";
    my $sth = $self->prepare($sql);
    $sth->execute( $requested_analysis_id );

    while (my ($dummy_analysis_id, $status, $job_count)=$sth->fetchrow_array()) {
        $job_counts{ $status } = $job_count;
    }

    $sth->finish;

    return \%job_counts;
}


445
########################
Jessica Severin's avatar
Jessica Severin committed
446 447 448
#
# STORE / UPDATE METHODS
#
449
########################
Jessica Severin's avatar
Jessica Severin committed
450

451 452 453
sub semaphore_job_by_id {    # used in the end of reblocking a semaphore chain
    my $self    = shift @_;
    my $job_id  = shift @_ or return;
454

455
    my $sql = "UPDATE job SET status = 'SEMAPHORED' WHERE job_id=? AND status NOT IN ('CLAIMED', 'COMPILATION', $ALL_STATUSES_OF_RUNNING_JOBS)";
456

457 458
    $self->dbc->protected_prepare_execute( [ $sql, $job_id ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'semaphoring a job'.$after, 'INFO' ); }
Leo Gordon's avatar
Leo Gordon committed
459
    );
460 461
}

462 463 464
sub unsemaphore_job_by_id {    # used in semaphore annihilation or unsuccessful creation
    my $self    = shift @_;
    my $job_id  = shift @_ or return;
465

466 467 468 469
    my $sql = "UPDATE job SET status = 'READY' WHERE job_id=? AND status='SEMAPHORED'";

    $self->dbc->protected_prepare_execute( [ $sql, $job_id ],
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'unsemaphoring a job'.$after, 'INFO' ); }
Leo Gordon's avatar
Leo Gordon committed
470
    );
471 472
}

473

474
sub prelock_semaphore_for_update {  # currently defunct, but may be needed to resolve situations of heavy load on semaphore/job tables
475 476 477 478 479
    my $self    = shift @_;
    my $job_id  = shift @_ or return;

    if(my $dbc = $self->dbc) {
        if($dbc->driver ne 'sqlite') {
480 481 482
            $self->dbc->protected_prepare_execute( [ "SELECT 1 FROM job WHERE job_id=? FOR UPDATE", $job_id ],
                sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( "prelocking semaphore job_id=$job_id".$after, 0 ); }
            );
483 484 485 486 487
        }
    }
}


488
=head2 check_in_job
Jessica Severin's avatar
Jessica Severin committed
489

Jessica Severin's avatar
Jessica Severin committed
490 491
  Arg [1]    : $analysis_id
  Example    :
492
  Description: updates the job.status in the database
Jessica Severin's avatar
Jessica Severin committed
493
  Returntype : 
Jessica Severin's avatar
Jessica Severin committed
494
  Exceptions :
Jessica Severin's avatar
Jessica Severin committed
495 496
  Caller     : general

Jessica Severin's avatar
Jessica Severin committed
497
=cut
Jessica Severin's avatar
Jessica Severin committed
498

499
sub check_in_job {
500
    my ($self, $job) = @_;
Jessica Severin's avatar
Jessica Severin committed
501

Leo Gordon's avatar
Leo Gordon committed
502 503
    my $job_id = $job->dbID;

504
    my $sql = "UPDATE job SET status='".$job->status."' ";
Leo Gordon's avatar
Leo Gordon committed
505

506
    if($job->status eq 'DONE') {
507
        $sql .= ",when_completed=CURRENT_TIMESTAMP";
508 509 510
        $sql .= ",runtime_msec=".$job->runtime_msec;
        $sql .= ",query_count=".$job->query_count;
    } elsif($job->status eq 'PASSED_ON') {
511
        $sql .= ", when_completed=CURRENT_TIMESTAMP";
512 513
    } elsif($job->status eq 'READY') {
    }
Leo Gordon's avatar
Leo Gordon committed
514

Leo Gordon's avatar
Leo Gordon committed
515
    $sql .= " WHERE job_id='$job_id' ";
516

517
        # This particular query is infamous for collisions and 'deadlock' situations; let's wait and retry:
Leo Gordon's avatar
Leo Gordon committed
518
    $self->dbc->protected_prepare_execute( [ $sql ],
519
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_job_message( $job_id, "checking the job in".$after, 'INFO' ); }
Leo Gordon's avatar
Leo Gordon committed
520
    );
Jessica Severin's avatar
Jessica Severin committed
521 522
}

523

Jessica Severin's avatar
Jessica Severin committed
524
=head2 store_out_files
Jessica Severin's avatar
Jessica Severin committed
525

526
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisJob $job
Jessica Severin's avatar
Jessica Severin committed
527
  Example    :
528
  Description: update locations of log files, if present
Jessica Severin's avatar
Jessica Severin committed
529 530
  Returntype : 
  Exceptions :
Jessica Severin's avatar
Jessica Severin committed
531 532
  Caller     : Bio::EnsEMBL::Hive::Worker

Jessica Severin's avatar
Jessica Severin committed
533 534 535
=cut

sub store_out_files {
536
    my ($self, $job) = @_;
537

538 539
    # FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.

540
    my $delete_sql  = 'DELETE from job_file WHERE job_id=' . $job->dbID . ' AND retry='.$job->retry_count;
541 542
    $self->dbc->do( $delete_sql );

543
    if($job->stdout_file or $job->stderr_file) {
544 545 546 547
        my $insert_sql = 'INSERT INTO job_file (job_id, retry, role_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
        my $insert_sth = $self->dbc->prepare($insert_sql);
        $insert_sth->execute( $job->dbID, $job->retry_count, $job->role_id, $job->stdout_file, $job->stderr_file );
        $insert_sth->finish();
548
    }
Jessica Severin's avatar
Jessica Severin committed
549 550 551
}


552 553 554
=head2 reset_or_grab_job_by_dbID

  Arg [1]    : int $job_id
555 556
  Arg [2]    : int $role_id (optional)
  Description: resets a job to to 'READY' (if no $role_id given) or directly to 'CLAIMED' so it can be run again, and fetches it.
557 558 559 560 561 562 563 564
               NB: Will also reset a previously 'SEMAPHORED' job to READY.
               The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
               but will not change retry_count if a job has never *really* started.
  Returntype : Bio::EnsEMBL::Hive::AnalysisJob or undef

=cut

sub reset_or_grab_job_by_dbID {
565
    my ($self, $job_id, $role_id) = @_;
566

567
    my $new_status  = $role_id ? 'CLAIMED' : 'READY';
568 569 570 571

        # Note: the order of the fields being updated is critical!
    my $sql = qq{
        UPDATE job
572
           SET retry_count = CASE WHEN (status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END
573
             , status=?
574
             , role_id=?
575 576
         WHERE job_id=?
    };
577
    my @values = ($new_status, $role_id, $job_id);
578 579 580 581 582 583

    my $sth = $self->prepare( $sql );
    my $return_code = $sth->execute( @values )
        or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
    $sth->finish;

584
    my $job = $self->fetch_by_job_id_AND_status($job_id, $new_status) ;
585 586 587 588 589

    return $job;
}


590
=head2 grab_jobs_for_role
Jessica Severin's avatar
Jessica Severin committed
591

592 593
  Arg [1]           : Bio::EnsEMBL::Hive::Role object $role
  Arg [2]           : int $how_many_this_role
Leo Gordon's avatar
Leo Gordon committed
594
  Example: 
595
    my $jobs  = $job_adaptor->grab_jobs_for_role( $role, $how_many );
Leo Gordon's avatar
Leo Gordon committed
596
  Description: 
597
    For the specified Role, it will search available jobs, 
598
    and using the how_many_this_batch parameter, claim/fetch that
Leo Gordon's avatar
Leo Gordon committed
599 600 601 602 603 604
    number of jobs, and then return them.
  Returntype : 
    reference to array of Bio::EnsEMBL::Hive::AnalysisJob objects
  Caller     : Bio::EnsEMBL::Hive::Worker::run

=cut
Jessica Severin's avatar
Jessica Severin committed
605

606 607
sub grab_jobs_for_role {
    my ($self, $role, $how_many_this_batch) = @_;
608 609

    return [] unless( $how_many_this_batch );
Leo Gordon's avatar
Leo Gordon committed
610
  
611 612 613 614
    my $analysis_id     = $role->analysis_id;
    my $role_id         = $role->dbID;
    my $role_rank       = $self->db->get_RoleAdaptor->get_role_rank( $role );
    my $offset          = $how_many_this_batch * $role_rank;
615

616
    my $prefix_sql = ($self->dbc->driver eq 'mysql') ? qq{
617 618 619 620 621 622
         UPDATE job j
           JOIN (
                            SELECT job_id
                              FROM job
                             WHERE analysis_id='$analysis_id'
                               AND status='READY'
623 624
    } : qq{
         UPDATE job
625
           SET role_id='$role_id', status='CLAIMED'
626 627 628 629 630
         WHERE job_id in (
                            SELECT job_id
                              FROM job
                             WHERE analysis_id='$analysis_id'
                               AND status='READY'
631
    };
632 633 634
    my $virgin_sql = qq{       AND retry_count=0 };
    my $limit_sql  = qq{     LIMIT $how_many_this_batch };
    my $offset_sql = qq{    OFFSET $offset };
635
    my $suffix_sql = ($self->dbc->driver eq 'mysql') ? qq{
636 637
                 ) as x
         USING (job_id)
638
           SET j.role_id='$role_id', j.status='CLAIMED'
639
         WHERE j.status='READY'
640 641 642
    } : qq{
                 )
           AND status='READY'
643
    };
644

Leo Gordon's avatar
Leo Gordon committed
645 646
    my $claim_count;

647
        # we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
Leo Gordon's avatar
Leo Gordon committed
648
    if(  0 == ($claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql . $virgin_sql . $limit_sql . $offset_sql . $suffix_sql ],
649
                    sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a virgin batch of offset jobs".$after, 'INFO' ); }
Leo Gordon's avatar
Leo Gordon committed
650 651
    ))) {
        if( 0 == ($claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql .               $limit_sql . $offset_sql . $suffix_sql ],
652
                        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a non-virgin batch of offset jobs".$after, 'INFO' ); }
Leo Gordon's avatar
Leo Gordon committed
653 654
        ))) {
             $claim_count = $self->dbc->protected_prepare_execute( [ $prefix_sql .               $limit_sql .               $suffix_sql ],
655
                            sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "grabbing a non-virgin batch of non-offset jobs".$after, 'INFO' ); }
Leo Gordon's avatar
Leo Gordon committed
656
             );
657
        }
658
    }
Leo Gordon's avatar
Leo Gordon committed
659

660 661
    $self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'ready_job_count', -$claim_count, $analysis_id );

Leo Gordon's avatar
Leo Gordon committed
662
    return $claim_count ? $self->fetch_all_by_role_id_AND_status($role_id, 'CLAIMED') : [];
663 664 665
}


666 667 668 669 670
sub release_claimed_jobs_from_role {
    my ($self, $role) = @_;

        # previous value of role_id is not important, because that Role never had a chance to run the jobs
    my $num_released_jobs = $self->dbc->protected_prepare_execute( [ "UPDATE job SET status='READY', role_id=NULL WHERE role_id=? AND status='CLAIMED'", $role->dbID ],
671
        sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "releasing claimed jobs from role".$after, 'INFO' ); }
672 673 674 675 676 677 678 679 680 681 682
    );

    my $analysis_stats_adaptor  = $self->db->get_AnalysisStatsAdaptor;
    my $analysis_id             = $role->analysis_id;

    $analysis_stats_adaptor->increment_a_counter( 'ready_job_count', $num_released_jobs, $analysis_id );

#    $analysis_stats_adaptor->update_status( $analysis_id, 'LOADING' );
}


683
=head2 release_undone_jobs_from_role
684

685
  Arg [1]    : Bio::EnsEMBL::Hive::Role object
686
  Arg [2]    : optional message to be recorded in 'job_message' table
687
  Example    :
688
  Description: If a Worker has died some of its jobs need to be reset back to 'READY'
689 690
               so they can be rerun.
               Jobs in state CLAIMED as simply reset back to READY.
691
               If jobs was 'in progress' (see the $ALL_STATUSES_OF_RUNNING_JOBS variable)
692
               the retry_count is increased and the status set back to READY.
693 694
               If the retry_count >= $max_retry_count (3 by default) the job is set
               to 'FAILED' and not rerun again.
695
  Exceptions : $role must be defined
696 697 698
  Caller     : Bio::EnsEMBL::Hive::Queen

=cut
699

700 701
sub release_undone_jobs_from_role {
    my ($self, $role, $msg) = @_;
702

703 704 705 706
    my $role_id         = $role->dbID;
    my $analysis        = $role->analysis;
    my $max_retry_count = $analysis->max_retry_count;
    my $worker          = $role->worker;
707

Leo Gordon's avatar
Leo Gordon committed
708
        #first just reset the claimed jobs, these don't need a retry_count index increment:
709
    $self->release_claimed_jobs_from_role( $role );
710

Leo Gordon's avatar
Leo Gordon committed
711
    my $sth = $self->prepare( qq{
712 713
        SELECT job_id
          FROM job
714
         WHERE role_id='$role_id'
715
           AND status in ($ALL_STATUSES_OF_RUNNING_JOBS)
Leo Gordon's avatar
Leo Gordon committed
716
    } );
717 718
    $sth->execute();

719
    my $cod = $worker->cause_of_death() || 'UNKNOWN';
720 721
    $msg ||= "GarbageCollector: The worker died because of $cod";

722 723 724
    my $resource_overusage = ($cod eq 'MEMLIMIT') || ($cod eq 'RUNLIMIT' and $worker->work_done()==0);

    while(my ($job_id) = $sth->fetchrow_array()) {
Leo Gordon's avatar
Leo Gordon committed
725

Leo Gordon's avatar
Leo Gordon committed
726 727
        my $passed_on = 0;  # the flag indicating that the garbage_collection was attempted and was successful

Leo Gordon's avatar
Leo Gordon committed
728
        if( $resource_overusage ) {
Leo Gordon's avatar
Leo Gordon committed
729
            if($passed_on = $self->gc_dataflow( $analysis, $job_id, $cod )) {
730 731
                $msg .= ', performing gc_dataflow';
            }
Leo Gordon's avatar
Leo Gordon committed
732
        }
733
        unless($passed_on) {
Leo Gordon's avatar
Leo Gordon committed
734
            if($passed_on = $self->gc_dataflow( $analysis, $job_id, 'ANYFAILURE' )) {
735 736
                $msg .= ", performing 'ANYFAILURE' gc_dataflow";
            }
Leo Gordon's avatar
Leo Gordon committed
737
        }
738

739
        $self->db()->get_LogMessageAdaptor()->store_job_message($job_id, $msg, $passed_on ? 'INFO' : 'WORKER_ERROR');
Leo Gordon's avatar
Leo Gordon committed
740

Leo Gordon's avatar
Leo Gordon committed
741 742 743
        unless($passed_on) {
            $self->release_and_age_job( $job_id, $max_retry_count, not $resource_overusage );
        }
744 745

        $role->register_attempt( 0 );
746
    }
Leo Gordon's avatar
Leo Gordon committed
747
    $sth->finish();
748 749
}

750

Leo Gordon's avatar
Leo Gordon committed
751
sub release_and_age_job {
752
    my ($self, $job_id, $max_retry_count, $may_retry, $runtime_msec) = @_;
753 754 755

    # Default values
    $max_retry_count //= $self->db->hive_pipeline->hive_default_max_retry_count;
Leo Gordon's avatar
bugfix?  
Leo Gordon committed
756
    $may_retry ||= 0;
757
    $runtime_msec = "NULL" unless(defined $runtime_msec);
758

Leo Gordon's avatar
Leo Gordon committed
759
        # NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
760
        #
761
        # FIXME: would it be possible to retain role_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
762
        #
763 764 765
    $self->dbc->do( 
        "UPDATE job "
        .( ($self->dbc->driver eq 'pgsql')
Matthieu Muffato's avatar
Matthieu Muffato committed
766
            ? "SET status = CAST(CASE WHEN ($may_retry != 0) AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END AS job_status), "
767 768 769 770 771
            : "SET status =      CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END, "
         ).qq{
               retry_count=retry_count+1,
               runtime_msec=$runtime_msec
         WHERE job_id=$job_id
772
           AND status in ('CLAIMED',$ALL_STATUSES_OF_RUNNING_JOBS)
Leo Gordon's avatar
Leo Gordon committed
773
    } );
774 775 776 777 778

        # FIXME: move the decision making completely to the API side and so avoid the potential race condition.
    my $job         = $self->fetch_by_dbID( $job_id );

    $self->db->get_AnalysisStatsAdaptor->increment_a_counter( ($job->status eq 'FAILED') ? 'failed_job_count' : 'ready_job_count', 1, $job->analysis_id );
779 780
}

781

Leo Gordon's avatar
Leo Gordon committed
782
=head2 gc_dataflow
Leo Gordon's avatar
Leo Gordon committed
783

Leo Gordon's avatar
Leo Gordon committed
784 785
    Description:    perform automatic dataflow from a dead job that overused resources if a corresponding dataflow rule was provided
                    Should only be called once during garbage collection phase, when the job is definitely 'abandoned' and not being worked on.
Leo Gordon's avatar
Leo Gordon committed
786 787 788 789

=cut

sub gc_dataflow {
790
    my ($self, $analysis, $job_id, $branch_name) = @_;
Leo Gordon's avatar
Leo Gordon committed
791

792 793
    my $branch_code = Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor::branch_name_2_code($branch_name);

794
    unless( $analysis->dataflow_rules_by_branch->{$branch_code} ) {
795
        return 0;   # just return if no corresponding gc_dataflow rule has been defined
Leo Gordon's avatar
Leo Gordon committed
796 797 798
    }

    my $job = $self->fetch_by_dbID($job_id);
799
    $job->analysis( $analysis );
Leo Gordon's avatar
Leo Gordon committed
800

801
    $job->load_parameters();    # input_id_templates still supported, however to a limited extent
Leo Gordon's avatar
Leo Gordon committed
802

803
    $job->dataflow_output_id( undef, $branch_name );
Leo Gordon's avatar
Leo Gordon committed
804

805
    $job->set_and_update_status('PASSED_ON');
806

807 808 809
        # PASSED_ON jobs are included in done_job_count
    $self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'done_job_count', 1, $analysis->dbID );

810 811
    if( my $controlled_semaphore = $job->controlled_semaphore ) {
        $controlled_semaphore->decrease_by( [ $job ] );
812
    }
Leo Gordon's avatar
Leo Gordon committed
813 814
    
    return 1;
Leo Gordon's avatar
Leo Gordon committed
815 816
}

817

818
=head2 reset_jobs_for_analysis_id
819

Matthieu Muffato's avatar
Matthieu Muffato committed
820 821 822 823 824 825
  Arg [1]    : arrayref of Analyses
  Arg [2]    : arrayref of job statuses $input_statuses
  Description: Resets all the jobs of the selected analyses that have one of the
               required statuses to 'READY' and their retry_count to 0.
               Semaphores are updated accordingly.
  Caller     : beekeeper.pl and guiHive
826 827 828

=cut

829
sub reset_jobs_for_analysis_id {
830
    my ($self, $list_of_analyses, $input_statuses) = @_;
831

832
    return if !scalar(@$input_statuses);  # No statuses to reset
833

834 835
    my $analyses_filter = 'j.analysis_id IN ('.join(',', map { $_->dbID } @$list_of_analyses).')';
    my $statuses_filter = 'AND j.status IN ('.join(', ', map { "'$_'" } @$input_statuses).')';
836

837 838 839
    # Get the list of semaphores, and by how much their local_jobs_counter should be increased.
    # Only DONE and PASSED_ON jobs of the matching analyses and statuses should be counted
    #
840
    my $sql1 = qq{
841 842 843
        SELECT COUNT(*) AS local_delta, controlled_semaphore_id
        FROM job j
        WHERE controlled_semaphore_id IS NOT NULL
844
              AND $analyses_filter $statuses_filter AND status IN ('DONE', 'PASSED_ON')