Commit 1da00c4c authored by Matthieu Muffato's avatar Matthieu Muffato
Browse files

New attempt table that stores job attempts

- The job's life-cycle is now tracked in attempt.status, which simplifies
  job.status (a single 'IN_PROGRESS' status).
- The job_file functionality is now tracked in the attempt table, and
  attempt_id is used in several tables instead of job_id/role_id
- job.when_completed is the same as attempt.when_updated (for the last
  successful) attempt)
- job.retry_count is removed as it is very tricky to maintain. Instead it
  is computed on the fly when needed
- last_attempt_id tells us whether the job has ever been tried or not
- Remove the unused 'COMPILATION' job status
parent c16e6de4
......@@ -206,14 +206,14 @@ running about 2,900 jobs.
Finally, the "progress" view tells you how your jobs are doing::
> SELECT * FROM progress;
+----------------------+----------------+--------+-------------+-----+----------------+
| analysis_name_and_id | resource_class | status | retry_count | cnt | example_job_id |
+----------------------+----------------+--------+-------------+-----+----------------+
| chrom_sizes(1) | default | DONE | 0 | 1 | 1 |
| base_age_factory(2) | 100Mb | DONE | 0 | 1 | 2 |
| base_age(3) | 3.6Gb | DONE | 0 | 25 | 4 |
| big_bed(4) | 1.8Gb | DONE | 0 | 1 | 3 |
+----------------------+----------------+--------+-------------+-----+----------------+
+----------------------+----------------+--------+----------+-----+----------------+--------------------+
| analysis_name_and_id | resource_class | status | is_retry | cnt | example_job_id | example_attempt_id |
+----------------------+----------------+--------+----------+-----+----------------+--------------------+
| chrom_sizes(1) | default | DONE | 0 | 1 | 1 | 1 |
| base_age_factory(2) | 100Mb | DONE | 0 | 1 | 2 | 2 |
| base_age(3) | 3.6Gb | DONE | 0 | 25 | 4 | 3 |
| big_bed(4) | 1.8Gb | DONE | 0 | 1 | 3 | 28 |
+----------------------+----------------+--------+----------+-----+----------------+--------------------+
If you see Jobs in :hivestatus:`<FAILED>[ FAILED ]` state or Jobs with
retry\_count > 0 (which means they have failed at least once and had
......
......@@ -40,6 +40,7 @@ package Bio::EnsEMBL::Hive::AnalysisJob;
use strict;
use warnings;
use Bio::EnsEMBL::Hive::Attempt;
use Bio::EnsEMBL::Hive::Utils ('stringify', 'destringify', 'throw');
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
use Bio::EnsEMBL::Hive::TheApiary;
......@@ -55,6 +56,8 @@ use base ( 'Bio::EnsEMBL::Hive::Storable', # inherit dbID(), adaptor() and new(
analysis_id / analysis
last_attempt_id / last_attempt
controlled_semaphore_id / controlled_semaphore
=cut
......@@ -96,34 +99,22 @@ sub status {
return $self->{'_status'} || 'READY';
}
sub retry_count {
my $self = shift;
$self->{'_retry_count'} = shift if(@_);
$self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'}));
return $self->{'_retry_count'};
}
sub when_completed {
my $self = shift;
$self->{'_when_completed'} = shift if(@_);
return $self->{'_when_completed'};
}
sub runtime_msec {
sub attempt_count {
my $self = shift;
$self->{'_runtime_msec'} = shift if(@_);
$self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
return $self->{'_runtime_msec'};
# Lazy-loaded
if ((not exists $self->{'_attempt_count'}) and $self->adaptor) {
# Cached because we don't expect the same AnalysisJob instance to live across multiple attempts
$self->{'_attempt_count'} = $self->adaptor->db->get_AttemptAdaptor->count_all_by_job_id($self->dbID);
}
return $self->{'_attempt_count'};
}
sub query_count {
sub when_created {
my $self = shift;
$self->{'_query_count'} = shift if(@_);
$self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
return $self->{'_query_count'};
$self->{'_when_created'} = shift if(@_);
return $self->{'_when_created'};
}
sub set_and_update_status {
my ($self, $status ) = @_;
......@@ -134,18 +125,6 @@ sub set_and_update_status {
}
}
sub stdout_file {
my $self = shift;
$self->{'_stdout_file'} = shift if(@_);
return $self->{'_stdout_file'};
}
sub stderr_file {
my $self = shift;
$self->{'_stderr_file'} = shift if(@_);
return $self->{'_stderr_file'};
}
sub accu_hash {
my $self = shift;
$self->{'_accu_hash'} = shift if(@_);
......@@ -175,6 +154,24 @@ sub autoflow {
}
sub create_new_attempt {
my ($self, $role) = @_;
my $attempt = Bio::EnsEMBL::Hive::Attempt->new(
'role' => $role,
'job' => $self,
);
my $job_adaptor = $self->adaptor;
$job_adaptor->db->get_AttemptAdaptor->store($attempt) if $job_adaptor;
$self->last_attempt($attempt);
$job_adaptor->update_last_attempt_id($self) if $job_adaptor;
return $attempt;
}
##-----------------[indicators to the Worker]--------------------------------
......@@ -407,7 +404,7 @@ sub toString {
? ( $self->analysis->logic_name.'('.$self->analysis_id.')' )
: '(NULL)';
return 'Job dbID='.($self->dbID || '(NULL)')." analysis=$analysis_label, input_id='".$self->input_id."', status=".$self->status.", retry_count=".$self->retry_count;
return 'Job dbID='.($self->dbID || '(NULL)')." analysis=$analysis_label, input_id='".$self->input_id."', status=".$self->status.", attempt_count=".$self->attempt_count;
}
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::Attempt
=head1 DESCRIPTION
An object to describe an attempt of a job.
It is stored in its own table (attempt) indexed by a dbID, and is thus Storable
=head1 LICENSE
Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Copyright [2016-2019] EMBL-European Bioinformatics Institute
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
=head1 CONTACT
Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
=cut
package Bio::EnsEMBL::Hive::Attempt;
use strict;
use warnings;
use base ( 'Bio::EnsEMBL::Hive::Storable' );
# ----------------------------- Cacheable implementation -----------------------------------
sub unikey {
return [ 'dbID' ];
}
# --------------------------------- Getter / Setters ---------------------------------------
sub status {
my $self = shift;
$self->{'_status'} = shift if(@_);
return $self->{'_status'} || 'INITIALIZATION';
}
sub when_initialized {
my $self = shift;
$self->{'_when_initialized'} = shift if(@_);
return $self->{'_when_initialized'};
}
sub when_updated {
my $self = shift;
$self->{'_when_updated'} = shift if(@_);
return $self->{'_when_updated'};
}
sub when_ended {
my $self = shift;
$self->{'_when_ended'} = shift if(@_);
return $self->{'_when_ended'};
}
sub runtime_msec {
my $self = shift;
$self->{'_runtime_msec'} = shift if(@_);
return $self->{'_runtime_msec'};
}
sub is_success {
my $self = shift;
$self->{'_is_success'} = shift if(@_);
return $self->{'_is_success'};
}
sub query_count {
my $self = shift;
$self->{'_query_count'} = shift if(@_);
return $self->{'_query_count'};
}
sub stdout_file {
my $self = shift;
$self->{'_stdout_file'} = shift if(@_);
return $self->{'_stdout_file'};
}
sub stderr_file {
my $self = shift;
$self->{'_stderr_file'} = shift if(@_);
return $self->{'_stderr_file'};
}
# --------------------------------- Compound methods ---------------------------------------
sub toString {
my $self = shift @_;
my $attempt_count = $self->job->attempt_count;
my $suffix = 'th';
$suffix = 'st' if ($attempt_count % 10 == 1) && ($attempt_count != 11);
$suffix = 'nd' if ($attempt_count % 10 == 2) && ($attempt_count != 12);
$suffix = 'rd' if ($attempt_count % 10 == 3) && ($attempt_count != 13);
return $attempt_count.$suffix.' attempt of '.$self->job->toString;
}
# -------------------------------- Convenient methods --------------------------------------
=head2 set_and_update_status
Example : $attempt->set_and_update_status('WRITE_OUTPUT');
Description : Sets the status of the attempt (within the job's life-cycle) and updates
the database accordingly.
Returntype : none
Exceptions : none
Caller : general
Status : Stable
=cut
sub set_and_update_status {
my ($self, $status) = @_;
$self->status($status);
if(my $adaptor = $self->adaptor) {
$adaptor->check_in_attempt($self);
}
}
1;
......@@ -55,10 +55,9 @@ use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
# NOTE: These lists must be kept in sync with the schema !
# They are used in a number of queries.
our $ALL_STATUSES_OF_RUNNING_JOBS = q{'PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_HEALTHCHECK','POST_CLEANUP'};
our $ALL_STATUSES_OF_TAKEN_JOBS = qq{'CLAIMED',$ALL_STATUSES_OF_RUNNING_JOBS};
our $ALL_STATUSES_OF_TAKEN_JOBS = qq{'CLAIMED','IN_PROGRESS'};
our $ALL_STATUSES_OF_COMPLETE_JOBS = q{'DONE','PASSED_ON'};
# Not in any list: SEMAPHORED, READY, COMPILATION (this one is actually not used), FAILED
# Not in any list: SEMAPHORED, READY, FAILED
sub default_table_name {
return 'job';
......@@ -360,7 +359,6 @@ sub store_a_semaphored_group_of_jobs {
Arg [1] : (optional) listref $list_of_analyses
Arg [2] : (optional) string $status
Arg [3] : (optional) int $retry_at_least
Example : $all_failed_jobs = $adaptor->fetch_all_by_analysis_id_status(undef, 'FAILED');
$analysis_done_jobs = $adaptor->fetch_all_by_analysis_id_status( $list_of_analyses, 'DONE');
Description: Returns a list of all jobs filtered by given analysis_id (if specified) and given status (if specified).
......@@ -369,7 +367,7 @@ sub store_a_semaphored_group_of_jobs {
=cut
sub fetch_all_by_analysis_id_status {
my ($self, $list_of_analyses, $status, $retry_count_at_least) = @_;
my ($self, $list_of_analyses, $status) = @_;
my @constraints = ();
......@@ -382,7 +380,6 @@ sub fetch_all_by_analysis_id_status {
}
push @constraints, "status='$status'" if ($status);
push @constraints, "retry_count >= $retry_count_at_least" if ($retry_count_at_least);
return $self->fetch_all( join(" AND ", @constraints) );
}
......@@ -455,7 +452,7 @@ sub semaphore_job_by_id { # used in the end of reblocking a semaphore chain
my $self = shift @_;
my $job_id = shift @_ or return;
my $sql = "UPDATE job SET status = 'SEMAPHORED' WHERE job_id=? AND status NOT IN ('COMPILATION', $ALL_STATUSES_OF_TAKEN_JOBS)";
my $sql = "UPDATE job SET status = 'SEMAPHORED' WHERE job_id=? AND status NOT IN ($ALL_STATUSES_OF_TAKEN_JOBS)";
$self->dbc->protected_prepare_execute( [ $sql, $job_id ],
sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'semaphoring a job'.$after, 'INFO' ); }
......@@ -504,18 +501,7 @@ sub check_in_job {
my $job_id = $job->dbID;
my $sql = "UPDATE job SET status='".$job->status."' ";
if($job->status eq 'DONE') {
$sql .= ",when_completed=CURRENT_TIMESTAMP";
$sql .= ",runtime_msec=".$job->runtime_msec;
$sql .= ",query_count=".$job->query_count;
} elsif($job->status eq 'PASSED_ON') {
$sql .= ", when_completed=CURRENT_TIMESTAMP";
} elsif($job->status eq 'READY') {
}
$sql .= " WHERE job_id='$job_id' ";
my $sql = "UPDATE job SET status='".$job->status."' WHERE job_id='$job_id' ";
# This particular query is infamous for collisions and 'deadlock' situations; let's wait and retry:
$self->dbc->protected_prepare_execute( [ $sql ],
......@@ -524,42 +510,12 @@ sub check_in_job {
}
=head2 store_out_files
Arg [1] : Bio::EnsEMBL::Hive::AnalysisJob $job
Example :
Description: update locations of log files, if present
Returntype :
Exceptions :
Caller : Bio::EnsEMBL::Hive::Worker
=cut
sub store_out_files {
my ($self, $job) = @_;
# FIXME: An UPSERT would be better here, but it is only promised in PostgreSQL starting from 9.5, which is not officially out yet.
my $delete_sql = 'DELETE from job_file WHERE job_id=' . $job->dbID . ' AND retry='.$job->retry_count;
$self->dbc->do( $delete_sql );
if($job->stdout_file or $job->stderr_file) {
my $insert_sql = 'INSERT INTO job_file (job_id, retry, role_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
my $insert_sth = $self->dbc->prepare($insert_sql);
$insert_sth->execute( $job->dbID, $job->retry_count, $job->role_id, $job->stdout_file, $job->stderr_file );
$insert_sth->finish();
}
}
=head2 reset_or_grab_job_by_dbID
Arg [1] : int $job_id
Arg [2] : int $role_id (optional)
Description: resets a job to to 'READY' (if no $role_id given) or directly to 'CLAIMED' so it can be run again, and fetches it.
NB: Will also reset a previously 'SEMAPHORED' job to READY.
The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
but will not change retry_count if a job has never *really* started.
Returntype : Bio::EnsEMBL::Hive::AnalysisJob or undef
=cut
......@@ -572,8 +528,7 @@ sub reset_or_grab_job_by_dbID {
# Note: the order of the fields being updated is critical!
my $sql = qq{
UPDATE job
SET retry_count = CASE WHEN (status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END
, status=?
SET status=?
, role_id=?
WHERE job_id=?
};
......@@ -632,7 +587,7 @@ sub grab_jobs_for_role {
WHERE analysis_id='$analysis_id'
AND status='READY'
};
my $virgin_sql = qq{ AND retry_count=0 };
my $virgin_sql = qq{ AND last_attempt_id IS NULL };
my $limit_sql = qq{ LIMIT $how_many_this_batch };
my $offset_sql = qq{ OFFSET $offset };
my $suffix_sql = ($self->dbc->driver eq 'mysql') ? qq{
......@@ -691,9 +646,9 @@ sub release_claimed_jobs_from_role {
Description: If a Worker has died some of its jobs need to be reset back to 'READY'
so they can be rerun.
Jobs in state CLAIMED as simply reset back to READY.
If jobs was 'in progress' (see the $ALL_STATUSES_OF_RUNNING_JOBS variable)
the retry_count is increased and the status set back to READY.
If the retry_count >= $max_retry_count (3 by default) the job is set
If jobs was IN_PROGRESS, the attempt is marked as failed and the job
status set back to READY.
If the attempt_count > $max_retry_count (3 by default) the job is set
to 'FAILED' and not rerun again.
Exceptions : $role must be defined
Caller : Bio::EnsEMBL::Hive::Queen
......@@ -705,17 +660,16 @@ sub release_undone_jobs_from_role {
my $role_id = $role->dbID;
my $analysis = $role->analysis;
my $max_retry_count = $analysis->max_retry_count;
my $worker = $role->worker;
#first just reset the claimed jobs, these don't need a retry_count index increment:
#first just reset the claimed jobs
$self->release_claimed_jobs_from_role( $role );
my $sth = $self->prepare( qq{
SELECT job_id
SELECT job_id, last_attempt_id
FROM job
WHERE role_id='$role_id'
AND status in ($ALL_STATUSES_OF_RUNNING_JOBS)
AND status = 'IN_PROGRESS'
} );
$sth->execute();
......@@ -723,8 +677,9 @@ sub release_undone_jobs_from_role {
$msg ||= "GarbageCollector: The worker died because of $cod";
my $resource_overusage = ($cod eq 'MEMLIMIT') || ($cod eq 'RUNLIMIT' and $worker->work_done()==0);
my $attempt_adaptor = $self->db->get_AttemptAdaptor;
while(my ($job_id) = $sth->fetchrow_array()) {
while(my ($job_id, $attempt_id) = $sth->fetchrow_array()) {
my $passed_on = 0; # the flag indicating that the garbage_collection was attempted and was successful
......@@ -739,10 +694,11 @@ sub release_undone_jobs_from_role {
}
}
$self->db()->get_LogMessageAdaptor()->store_job_message($job_id, $msg, $passed_on ? 'INFO' : 'WORKER_ERROR');
$self->db()->get_LogMessageAdaptor()->store_attempt_message($attempt_id, $msg, $passed_on ? 'INFO' : 'WORKER_ERROR');
unless($passed_on) {
$self->release_and_age_job( $job_id, $max_retry_count, not $resource_overusage );
$attempt_adaptor->record_attempt_interruption($attempt_id);
$self->release_and_age_job( $job_id, $analysis, not $resource_overusage );
}
$role->register_attempt( 0 );
......@@ -752,33 +708,19 @@ sub release_undone_jobs_from_role {
sub release_and_age_job {
my ($self, $job_id, $max_retry_count, $may_retry, $runtime_msec) = @_;
my ($self, $job_id, $analysis, $may_retry) = @_;
# Default values
$max_retry_count //= $self->db->hive_pipeline->hive_default_max_retry_count;
$may_retry ||= 0;
$runtime_msec = "NULL" unless(defined $runtime_msec);
my $max_retry_count = $analysis->max_retry_count // $self->db->hive_pipeline->hive_default_max_retry_count;
my $n_attempts = $self->db->get_AttemptAdaptor->count_all_by_job_id($job_id);
my $new_status = ($may_retry && $n_attempts <= $max_retry_count) ? 'READY' : 'FAILED';
# NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
#
# FIXME: would it be possible to retain role_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
#
$self->dbc->do(
"UPDATE job "
.( ($self->dbc->driver eq 'pgsql')
? "SET status = CAST(CASE WHEN ($may_retry != 0) AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END AS job_status), "
: "SET status = CASE WHEN $may_retry AND (retry_count<$max_retry_count) THEN 'READY' ELSE 'FAILED' END, "
).qq{
retry_count=retry_count+1,
runtime_msec=$runtime_msec
qq{UPDATE job SET status = '$new_status'
WHERE job_id=$job_id
AND status in ($ALL_STATUSES_OF_TAKEN_JOBS)
} );
# FIXME: move the decision making completely to the API side and so avoid the potential race condition.
my $job = $self->fetch_by_dbID( $job_id );
$self->db->get_AnalysisStatsAdaptor->increment_a_counter( ($job->status eq 'FAILED') ? 'failed_job_count' : 'ready_job_count', 1, $job->analysis_id );
$self->db->get_AnalysisStatsAdaptor->increment_a_counter( ($new_status eq 'FAILED') ? 'failed_job_count' : 'ready_job_count', 1, $analysis->dbID );
}
......@@ -805,6 +747,7 @@ sub gc_dataflow {
$job->dataflow_output_id( undef, $branch_name );
$self->db->get_AttemptAdaptor->record_attempt_interruption($job->last_attempt_id);
$job->set_and_update_status('PASSED_ON');
# PASSED_ON jobs are included in done_job_count
......@@ -823,7 +766,7 @@ sub gc_dataflow {
Arg [1] : arrayref of Analyses
Arg [2] : arrayref of job statuses $input_statuses
Description: Resets all the jobs of the selected analyses that have one of the
required statuses to 'READY' and their retry_count to 0.
required statuses to 'READY'
Semaphores are updated accordingly.
Caller : beekeeper.pl and guiHive
......@@ -869,12 +812,12 @@ sub reset_jobs_for_analysis_id {
UPDATE job j
LEFT JOIN semaphore s
ON (j.job_id=s.dependent_job_id)
SET j.retry_count = CASE WHEN j.status='READY' THEN 0 ELSE 1 END,
SET
j.status = }.$self->job_status_cast("CASE WHEN s.local_jobs_counter+s.remote_jobs_counter>0 THEN 'SEMAPHORED' ELSE 'READY' END").qq{
WHERE $analyses_filter $statuses_filter
} : ($self->dbc->driver eq 'pgsql') ? qq{
UPDATE job
SET retry_count = CASE WHEN j.status='READY' THEN 0 ELSE 1 END,
SET
status = }.$self->job_status_cast("CASE WHEN s.local_jobs_counter+s.remote_jobs_counter>0 THEN 'SEMAPHORED' ELSE 'READY' END").qq{
FROM job j
LEFT JOIN semaphore s
......@@ -882,7 +825,7 @@ sub reset_jobs_for_analysis_id {
WHERE job.job_id=j.job_id AND $analyses_filter $statuses_filter
} : qq{
REPLACE INTO job (job_id, prev_job_id, analysis_id, input_id, param_id_stack, accu_id_stack, role_id, status, retry_count, when_completed, runtime_msec, query_count, controlled_semaphore_id)
REPLACE INTO job (job_id, prev_job_id, analysis_id, input_id, param_id_stack, accu_id_stack, role_id, last_attempt_id, status, controlled_semaphore_id)
SELECT j.job_id,
j.prev_job_id,
j.analysis_id,
......@@ -890,11 +833,8 @@ sub reset_jobs_for_analysis_id {
j.param_id_stack,
j.accu_id_stack,
j.role_id,
j.last_attempt_id,
CASE WHEN s.local_jobs_counter+s.remote_jobs_counter>0 THEN 'SEMAPHORED' ELSE 'READY' END,
CASE WHEN j.status='READY' THEN 0 ELSE 1 END,
j.when_completed,
j.runtime_msec,
j.query_count,
j.controlled_semaphore_id
FROM job j
LEFT JOIN semaphore s
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::DBSQL::AttemptAdaptor
=head1 DESCRIPTION
Module to encapsulate all db access for class Attempt.
=head1 LICENSE
Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Copyright [2016-2019] EMBL-European Bioinformatics Institute
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
=head1 CONTACT
Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
=cut
package Bio::EnsEMBL::Hive::DBSQL::AttemptAdaptor;
use strict;
use warnings;
use Bio::EnsEMBL::Hive::Attempt;
use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
# ----------------------------- ObjectAdaptor implementation -----------------------------------
sub default_table_name {
return '