Commit b4743e73 authored by Leo Gordon's avatar Leo Gordon
Browse files

Dropped 'BLOCKED' job status and introduced 'SEMAPHORED' status that is...

Dropped 'BLOCKED' job status and introduced 'SEMAPHORED' status that is maintained in sync with semaphore_counts; less confusing and more efficient (with new 3-part index)
parent 01e034eb
...@@ -79,10 +79,8 @@ use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor'); ...@@ -79,10 +79,8 @@ use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
sub CreateNewJob { sub CreateNewJob {
my ($class, @args) = @_; my ($class, @args) = @_;
return undef unless(scalar @args); my ($input_id, $analysis, $prev_job, $prev_job_id, $semaphore_count, $semaphored_job_id, $push_new_semaphore) =
rearrange([qw(INPUT_ID ANALYSIS PREV_JOB INPUT_JOB_ID SEMAPHORE_COUNT SEMAPHORED_JOB_ID PUSH_NEW_SEMAPHORE)], @args);
my ($input_id, $analysis, $prev_job, $prev_job_id, $blocked, $semaphore_count, $semaphored_job_id, $push_new_semaphore) =
rearrange([qw(INPUT_ID ANALYSIS PREV_JOB INPUT_JOB_ID BLOCK SEMAPHORE_COUNT SEMAPHORED_JOB_ID PUSH_NEW_SEMAPHORE)], @args);
throw("must define input_id") unless($input_id); throw("must define input_id") unless($input_id);
throw("must define analysis") unless($analysis); throw("must define analysis") unless($analysis);
...@@ -103,10 +101,12 @@ sub CreateNewJob { ...@@ -103,10 +101,12 @@ sub CreateNewJob {
$input_id = "_ext_input_analysis_data_id $input_data_id"; $input_id = "_ext_input_analysis_data_id $input_data_id";
} }
$semaphore_count ||= 0;
my $dba = $analysis->adaptor->db; my $dba = $analysis->adaptor->db;
my $dbc = $dba->dbc; my $dbc = $dba->dbc;
my $insertion_method = ($dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE'; my $insertion_method = ($dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
my $status = $blocked ? 'BLOCKED' : 'READY'; my $status = ($semaphore_count>0) ? 'SEMAPHORED' : 'READY';
my $analysis_id = $analysis->dbID(); my $analysis_id = $analysis->dbID();
my $sql = qq{$insertion_method INTO job my $sql = qq{$insertion_method INTO job
...@@ -114,7 +114,7 @@ sub CreateNewJob { ...@@ -114,7 +114,7 @@ sub CreateNewJob {
VALUES (?,?,?,?,?,?)}; VALUES (?,?,?,?,?,?)};
my $sth = $dbc->prepare($sql); my $sth = $dbc->prepare($sql);
my @values = ($input_id, $prev_job_id, $analysis_id, $status, $semaphore_count || 0, $semaphored_job_id); my @values = ($input_id, $prev_job_id, $analysis_id, $status, $semaphore_count, $semaphored_job_id);
my $return_code = $sth->execute(@values) my $return_code = $sth->execute(@values)
# using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true: # using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
...@@ -359,12 +359,18 @@ sub _objs_from_sth { ...@@ -359,12 +359,18 @@ sub _objs_from_sth {
# #
################ ################
sub decrease_semaphore_count_for_jobid { # used in semaphore annihilation or unsuccessful creation sub decrease_semaphore_count_for_jobid { # used in semaphore annihilation or unsuccessful creation
my $self = shift @_; my $self = shift @_;
my $jobid = shift @_ or return; my $jobid = shift @_ or return;
my $dec = shift @_ || 1; my $dec = shift @_ || 1;
my $sql = "UPDATE job SET semaphore_count=semaphore_count-? WHERE job_id=?"; my $sql = qq{
UPDATE job
SET semaphore_count=semaphore_count-?,
status=(CASE WHEN semaphore_count>0 THEN 'SEMAPHORED' ELSE 'READY' END)
WHERE job_id=? AND status='SEMAPHORED'
};
my $sth = $self->prepare($sql); my $sth = $self->prepare($sql);
$sth->execute($dec, $jobid); $sth->execute($dec, $jobid);
...@@ -376,7 +382,11 @@ sub increase_semaphore_count_for_jobid { # used in semaphore propagation ...@@ -376,7 +382,11 @@ sub increase_semaphore_count_for_jobid { # used in semaphore propagation
my $jobid = shift @_ or return; my $jobid = shift @_ or return;
my $inc = shift @_ || 1; my $inc = shift @_ || 1;
my $sql = "UPDATE job SET semaphore_count=semaphore_count+? WHERE job_id=?"; my $sql = qq{
UPDATE jobs
SET semaphore_count=semaphore_count+?
WHERE job_id=? AND status='SEMAPHORED'
};
my $sth = $self->prepare($sql); my $sth = $self->prepare($sql);
$sth->execute($inc, $jobid); $sth->execute($inc, $jobid);
...@@ -641,7 +651,7 @@ sub gc_dataflow { ...@@ -641,7 +651,7 @@ sub gc_dataflow {
Arg [1] : int $job_id Arg [1] : int $job_id
Example : Example :
Description: Forces a job to be reset to 'READY' so it can be run again. Description: Forces a job to be reset to 'READY' so it can be run again.
Will also reset a previously 'BLOCKED' jobs to READY. Will also reset a previously 'SEMAPHORED' jobs to READY (FIXME?).
The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them, The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
but will not change retry_count if a job has never *really* started. but will not change retry_count if a job has never *really* started.
Exceptions : $job_id must not be false or zero Exceptions : $job_id must not be false or zero
...@@ -656,7 +666,7 @@ sub reset_job_by_dbID { ...@@ -656,7 +666,7 @@ sub reset_job_by_dbID {
# Note: the order of the fields being updated is critical! # Note: the order of the fields being updated is critical!
$self->dbc->do( qq{ $self->dbc->do( qq{
UPDATE job UPDATE job
SET retry_count = (CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED' OR status='BLOCKED') THEN retry_count ELSE 1 END) SET retry_count = (CASE WHEN (status='COMPILATION' OR status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END)
, status='READY' , status='READY'
WHERE job_id=$job_id WHERE job_id=$job_id
} ); } );
......
...@@ -128,10 +128,10 @@ sub pipeline_analyses { ...@@ -128,10 +128,10 @@ sub pipeline_analyses {
{ 'a_multiplier' => $self->o('second_mult'), 'b_multiplier' => $self->o('first_mult') }, { 'a_multiplier' => $self->o('second_mult'), 'b_multiplier' => $self->o('first_mult') },
], ],
-flow_into => { -flow_into => {
2 => [ 'part_multiply' ], # will create a fan of jobs # 2 => [ 'part_multiply' ], # will create a fan of jobs
1 => [ 'add_together' ], # will create a funnel job to wait for the fan to complete and add the results # 1 => [ 'add_together' ], # will create a funnel job to wait for the fan to complete and add the results
# '2->A' => [ 'part_multiply' ], # will create a semaphored fan of jobs (comment out the -wait_for rule from 'add_together') '2->A' => [ 'part_multiply' ], # will create a semaphored fan of jobs (comment out the -wait_for rule from 'add_together')
# 'A->1' => [ 'add_together' ], # will create a semaphored funnel job to wait for the fan to complete and add the results 'A->1' => [ 'add_together' ], # will create a semaphored funnel job to wait for the fan to complete and add the results
}, },
}, },
...@@ -153,7 +153,7 @@ sub pipeline_analyses { ...@@ -153,7 +153,7 @@ sub pipeline_analyses {
-input_ids => [ -input_ids => [
# (jobs for this analysis will be flown_into via branch-1 from 'start' jobs above) # (jobs for this analysis will be flown_into via branch-1 from 'start' jobs above)
], ],
-wait_for => [ 'part_multiply' ], # we can only start adding when all partial products have been computed # -wait_for => [ 'part_multiply' ], # we can only start adding when all partial products have been computed
-flow_into => { -flow_into => {
1 => [ ':////final_result' ], 1 => [ ':////final_result' ],
}, },
......
...@@ -580,21 +580,20 @@ sub synchronize_AnalysisStats { ...@@ -580,21 +580,20 @@ sub synchronize_AnalysisStats {
$analysisStats->failed_job_count(0); $analysisStats->failed_job_count(0);
$analysisStats->num_required_workers(0); $analysisStats->num_required_workers(0);
my $sql = "SELECT status, semaphore_count, count(*) FROM job ". # ask for analysis_id to force MySQL to use existing index on (analysis_id, status)
"WHERE analysis_id=? GROUP BY status, semaphore_count"; my $sql = "SELECT analysis_id, status, count(*) FROM job WHERE analysis_id=? GROUP BY status";
my $sth = $self->prepare($sql); my $sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id); $sth->execute($analysisStats->analysis_id);
my $done_here = 0; my $done_here = 0;
my $done_elsewhere = 0; my $done_elsewhere = 0;
my $total_job_count = 0; my $total_job_count = 0;
while (my ($status, $semaphore_count, $job_count)=$sth->fetchrow_array()) { while (my ($dummy_analysis_id, $status, $job_count)=$sth->fetchrow_array()) {
# print STDERR "$status: $job_count\n"; # print STDERR "$status: $job_count\n";
$total_job_count += $job_count; $total_job_count += $job_count;
if(($status eq 'READY') and ($semaphore_count<=0)) { if($status eq 'READY') {
$analysisStats->unclaimed_job_count($job_count); $analysisStats->unclaimed_job_count($job_count);
my $required_workers = POSIX::ceil( $job_count / $analysisStats->get_or_estimate_batch_size() ); my $required_workers = POSIX::ceil( $job_count / $analysisStats->get_or_estimate_batch_size() );
......
## Dropping 'BLOCKED' state (of Jobs) and adding 'SEMAPHORED' state that should simplify several things.
ALTER TABLE job MODIFY COLUMN status enum('SEMAPHORED','READY','CLAIMED','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL;
## Add a more efficient 3-part index instead of older 4-part index (based on the schema/API change):
ALTER TABLE job ADD INDEX analysis_status_retry (analysis_id, status, retry_count);
...@@ -187,7 +187,7 @@ CREATE TABLE job ( ...@@ -187,7 +187,7 @@ CREATE TABLE job (
analysis_id int(10) unsigned NOT NULL, analysis_id int(10) unsigned NOT NULL,
input_id char(255) NOT NULL, input_id char(255) NOT NULL,
worker_id int(10) unsigned DEFAULT NULL, worker_id int(10) unsigned DEFAULT NULL,
status enum('READY','BLOCKED','CLAIMED','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL, status enum('SEMAPHORED','READY','CLAIMED','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL,
retry_count int(10) default 0 NOT NULL, retry_count int(10) default 0 NOT NULL,
completed datetime DEFAULT NULL, completed datetime DEFAULT NULL,
runtime_msec int(10) default NULL, runtime_msec int(10) default NULL,
...@@ -198,7 +198,7 @@ CREATE TABLE job ( ...@@ -198,7 +198,7 @@ CREATE TABLE job (
PRIMARY KEY (job_id), PRIMARY KEY (job_id),
UNIQUE KEY input_id_analysis (input_id, analysis_id), UNIQUE KEY input_id_analysis (input_id, analysis_id),
INDEX analysis_status_sema_retry (analysis_id, status, semaphore_count, retry_count), INDEX analysis_status_retry (analysis_id, status, retry_count),
INDEX worker_id (worker_id) INDEX worker_id (worker_id)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB; ) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......
...@@ -174,7 +174,7 @@ CREATE TABLE job ( ...@@ -174,7 +174,7 @@ CREATE TABLE job (
analysis_id INTEGER NOT NULL, analysis_id INTEGER NOT NULL,
input_id char(255) NOT NULL, input_id char(255) NOT NULL,
worker_id INTEGER DEFAULT NULL, worker_id INTEGER DEFAULT NULL,
status TEXT DEFAULT 'READY' NOT NULL, /* enum('READY','BLOCKED','CLAIMED','COMPILATION','FETCH_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL, */ status TEXT DEFAULT 'READY' NOT NULL, /* enum('SEMAPHORED','READY','CLAIMED','COMPILATION','FETCH_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL, */
retry_count int(10) DEFAULT 0 NOT NULL, retry_count int(10) DEFAULT 0 NOT NULL,
completed datetime DEFAULT NULL, completed datetime DEFAULT NULL,
runtime_msec int(10) DEFAULT NULL, runtime_msec int(10) DEFAULT NULL,
...@@ -184,7 +184,7 @@ CREATE TABLE job ( ...@@ -184,7 +184,7 @@ CREATE TABLE job (
semaphored_job_id INTEGER DEFAULT NULL semaphored_job_id INTEGER DEFAULT NULL
); );
CREATE UNIQUE INDEX IF NOT EXISTS input_id_analysis_id_idx ON job (input_id, analysis_id); CREATE UNIQUE INDEX IF NOT EXISTS input_id_analysis_id_idx ON job (input_id, analysis_id);
CREATE INDEX IF NOT EXISTS analysis_status_sema_retry_idx ON job (analysis_id, status, semaphore_count, retry_count); CREATE INDEX IF NOT EXISTS analysis_status_retry_idx ON job (analysis_id, status, retry_count);
CREATE INDEX IF NOT EXISTS worker_idx ON job (worker_id); CREATE INDEX IF NOT EXISTS worker_idx ON job (worker_id);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment