Commit a56e4824 authored by Leo Gordon's avatar Leo Gordon
Browse files

experimental: job_claim has been removed; tested on a small db - OK

parent 5671f069
No preview for this file type
docs/hive_schema.png

230 KB | W: | H:

docs/hive_schema.png

233 KB | W: | H:

docs/hive_schema.png
docs/hive_schema.png
docs/hive_schema.png
docs/hive_schema.png
  • 2-up
  • Swipe
  • Onion skin
......@@ -62,7 +62,6 @@
use strict;
use Sys::Hostname;
use Data::UUID;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
......
......@@ -36,13 +36,12 @@ sub new {
my $self = bless {}, $class;
my($dbID, $analysis_id, $input_id, $job_claim, $worker_id, $status, $retry_count, $completed, $runtime_msec, $query_count, $semaphore_count, $semaphored_job_id, $adaptor) =
rearrange([qw(dbID analysis_id input_id job_claim worker_id status retry_count completed runtime_msec query_count semaphore_count semaphored_job_id adaptor) ], @_);
my($dbID, $analysis_id, $input_id, $worker_id, $status, $retry_count, $completed, $runtime_msec, $query_count, $semaphore_count, $semaphored_job_id, $adaptor) =
rearrange([qw(dbID analysis_id input_id worker_id status retry_count completed runtime_msec query_count semaphore_count semaphored_job_id adaptor) ], @_);
$self->dbID($dbID) if(defined($dbID));
$self->analysis_id($analysis_id) if(defined($analysis_id));
$self->input_id($input_id) if(defined($input_id));
$self->job_claim($job_claim) if(defined($job_claim));
$self->worker_id($worker_id) if(defined($worker_id));
$self->status($status) if(defined($status));
$self->retry_count($retry_count) if(defined($retry_count));
......@@ -86,12 +85,6 @@ sub analysis_id {
return $self->{'_analysis_id'};
}
sub job_claim {
my $self = shift;
$self->{'_job_claim'} = shift if(@_);
return $self->{'_job_claim'};
}
sub status {
my $self = shift;
$self->{'_status'} = shift if(@_);
......
......@@ -39,7 +39,6 @@
package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use strict;
use Data::UUID;
use Bio::EnsEMBL::DBSQL::BaseAdaptor;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
......@@ -293,7 +292,6 @@ sub _columns {
a.prev_analysis_job_id
a.analysis_id
a.input_id
a.job_claim
a.worker_id
a.status
a.retry_count
......@@ -335,7 +333,6 @@ sub _objs_from_sth {
-DBID => $column{'analysis_job_id'},
-ANALYSIS_ID => $column{'analysis_id'},
-INPUT_ID => $input_id,
-JOB_CLAIM => $column{'job_claim'},
-WORKER_ID => $column{'worker_id'},
-STATUS => $column{'status'},
-RETRY_COUNT => $column{'retry_count'},
......@@ -405,12 +402,9 @@ sub update_status {
$sql .= ",completed=now()";
$sql .= ",runtime_msec=".$job->runtime_msec;
$sql .= ",query_count=".$job->query_count;
} elsif($job->status eq 'READY') {
$sql .= ",job_claim=''";
} elsif($job->status eq 'PASSED_ON') {
$sql .= ",job_claim='', completed=now()";
$sql .= ", completed=now()";
} elsif($job->status eq 'READY') {
}
$sql .= " WHERE analysis_job_id='".$job->dbID."' ";
......@@ -420,22 +414,6 @@ sub update_status {
$sth->finish;
}
sub reclaim_job {
my $self = shift;
my $job = shift;
my $ug = new Data::UUID;
my $uuid = $ug->create();
$job->job_claim($ug->to_string( $uuid ));
my $sql = "UPDATE analysis_job SET status='CLAIMED', job_claim=?, worker_id=? WHERE analysis_job_id=?";
#print("$sql\n");
my $sth = $self->prepare($sql);
$sth->execute($job->job_claim, $job->worker_id, $job->dbID);
$sth->finish;
}
=head2 store_out_files
......@@ -491,16 +469,14 @@ sub store_out_files {
sub grab_jobs_for_worker {
my ($self, $worker) = @_;
my $ug = new Data::UUID;
my $uuid = $ug->create();
my $claim = $ug->to_string( $uuid );
my $analysis_id = $worker->analysis->dbID();
my $worker_id = $worker->worker_id();
my $sql_base = "UPDATE analysis_job SET job_claim='$claim'".
" , worker_id='". $worker->worker_id ."'".
" , status='CLAIMED'".
" WHERE job_claim='' AND status='READY' AND semaphore_count<=0 ".
" AND analysis_id='$analysis_id'";
my $sql_base = qq{
UPDATE analysis_job
SET worker_id='$worker_id', status='CLAIMED'
WHERE analysis_id='$analysis_id' AND status='READY' AND semaphore_count<=0
};
my $sql_virgin = $sql_base .
" AND retry_count=0".
......@@ -514,11 +490,30 @@ sub grab_jobs_for_worker {
$claim_count = $self->dbc->do($sql_any);
}
my $constraint = "a.status='CLAIMED' AND a.job_claim='$claim' AND a.analysis_id='$analysis_id'";
my $constraint = "a.analysis_id='$analysis_id' AND a.worker_id='$worker_id' AND a.status='CLAIMED'";
return $self->_generic_fetch($constraint);
}
sub reclaim_job_for_worker {
my $self = shift;
my $worker = shift or return;
my $job = shift or return;
my $worker_id = $worker->worker_id();
my $job_id = $job->dbID;
my $sql = "UPDATE analysis_job SET status='CLAIMED', worker_id=? WHERE analysis_job_id=? AND status='READY'";
my $sth = $self->prepare($sql);
$sth->execute($worker_id, $job_id);
$sth->finish;
my $constraint = "a.analysis_job_id='$job_id' AND a.worker_id='$worker_id' AND a.status='CLAIMED'";
return $self->_generic_fetch($constraint);
}
=head2 release_undone_jobs_from_worker
Arg [1] : Bio::EnsEMBL::Hive::Worker object
......@@ -543,9 +538,10 @@ sub release_undone_jobs_from_worker {
my $worker_id = $worker->worker_id();
#first just reset the claimed jobs, these don't need a retry_count index increment:
# (previous worker_id does not matter, because that worker has never had a chance to run the job)
$self->dbc->do( qq{
UPDATE analysis_job
SET job_claim='', status='READY'
SET status='READY', worker_id=NULL
WHERE status='CLAIMED'
AND worker_id='$worker_id'
} );
......@@ -594,11 +590,14 @@ sub release_and_age_job {
$may_retry ||= 0;
# NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
#
# FIXME: would it be possible to retain worker_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
#
$self->dbc->do( qq{
UPDATE analysis_job
SET worker_id=NULL, job_claim='', status=IF( $may_retry AND (retry_count<$max_retry_count), 'READY', 'FAILED'), retry_count=retry_count+1
WHERE status in ('COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT')
AND analysis_job_id=$job_id
SET status=IF( $may_retry AND (retry_count<$max_retry_count), 'READY', 'FAILED'), retry_count=retry_count+1
WHERE analysis_job_id=$job_id
AND status in ('COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT')
} );
}
......@@ -645,7 +644,7 @@ sub reset_job_by_dbID {
$self->dbc->do( qq{
UPDATE analysis_job
SET worker_id=NULL, job_claim='', status='READY', retry_count=0
SET status='READY', retry_count=0
WHERE analysis_job_id=$job_id
} );
}
......@@ -675,7 +674,7 @@ sub reset_all_jobs_for_analysis_id {
throw("must define analysis_id") unless($analysis_id);
my ($sql, $sth);
$sql = "UPDATE analysis_job SET job_claim='', status='READY' WHERE status!='BLOCKED' and analysis_id=?";
$sql = "UPDATE analysis_job SET status='READY' WHERE status!='BLOCKED' and analysis_id=?";
$sth = $self->prepare($sql);
$sth->execute($analysis_id);
$sth->finish;
......
......@@ -377,18 +377,6 @@ sub reset_and_fetch_job_by_dbID {
}
sub worker_reclaim_job {
my $self = shift;
my $worker = shift;
my $job = shift;
return undef unless($job and $worker);
$job->worker_id($worker->worker_id);
$self->db->get_AnalysisJobAdaptor->reclaim_job($job);
return $job;
}
######################################
#
# Public API interface for beekeeper
......
......@@ -515,7 +515,7 @@ sub run {
my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
if( my $specific_job = $self->_specific_job() ) {
$jobs_done_by_batches_loop += $self->run_one_batch( [ $self->queen->worker_reclaim_job($self, $specific_job) ] );
$jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->reclaim_job_for_worker($self, $specific_job) );
$self->cause_of_death('JOB_LIMIT');
} else { # a proper "BATCHES" loop
......
......@@ -44,7 +44,7 @@ CREATE PROCEDURE show_progress_analysis(IN param_logic_name char(64))
DROP PROCEDURE IF EXISTS reset_failed_jobs_for_analysis;
CREATE PROCEDURE reset_failed_jobs_for_analysis(IN param_logic_name char(64))
UPDATE analysis_job j, analysis a
SET j.status='READY', j.retry_count=0, j.job_claim=''
SET j.status='READY', j.retry_count=0
WHERE a.logic_name=param_logic_name
AND a.analysis_id=j.analysis_id
AND j.status='FAILED';
......
......@@ -206,16 +206,14 @@ CREATE TABLE analysis_ctrl_rule (
-- overview:
-- The analysis_job is the heart of this system. It is the kiosk or blackboard
-- where workers find things to do and then post work for other works to do.
-- The job_claim is a UUID set with an UPDATE LIMIT by worker as they fight
-- over the work. These jobs are created prior to work being done, are claimed
-- by workers, are updated as the work is done, with a final update on completion.
-- These jobs are created prior to work being done, are claimed by workers,
-- are updated as the work is done, with a final update on completion.
--
-- semantics:
-- analysis_job_id - autoincrement id
-- prev_analysis_job_id - previous analysis_job which created this one (and passed input_id)
-- analysis_id - the analysis_id needed to accomplish this job.
-- input_id - input data passed into Analysis:RunnableDB to control the work
-- job_claim - UUID set by workers as the fight over jobs
-- worker_id - link to hive table to define which worker claimed this job
-- status - state the job is in
-- retry_count - number times job had to be reset when worker failed to run it
......@@ -231,7 +229,6 @@ CREATE TABLE analysis_job (
prev_analysis_job_id int(10) DEFAULT NULL, #analysis_job which created this from rules
analysis_id int(10) unsigned NOT NULL,
input_id char(255) NOT NULL,
job_claim char(40) NOT NULL DEFAULT '', #UUID
worker_id int(10) unsigned DEFAULT NULL,
status enum('READY','BLOCKED','CLAIMED','COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL,
retry_count int(10) default 0 not NULL,
......@@ -248,8 +245,7 @@ CREATE TABLE analysis_job (
PRIMARY KEY (analysis_job_id),
UNIQUE KEY input_id_analysis (input_id, analysis_id),
INDEX claim_analysis_status (job_claim, analysis_id, status, semaphore_count),
INDEX analysis_status (analysis_id, status, semaphore_count),
INDEX analysis_status_sema (analysis_id, status, semaphore_count),
INDEX worker_id (worker_id)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......@@ -460,7 +456,7 @@ CREATE TABLE monitor (
workers int(10) NOT NULL default '0',
throughput float default NULL,
per_worker float default NULL,
analysis varchar(255) default NULL, # not just one, but a list of logic_names
analysis varchar(255) default NULL # not just one, but a list of logic_names
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment