Commit dc123f42 authored by Leo Gordon's avatar Leo Gordon
Browse files

schema_change: both job and job_file now point to role instead of worker

parent 27be78f7
......@@ -80,10 +80,10 @@ sub accu_id_stack {
return $self->{'_accu_id_stack'};
}
sub worker_id {
sub role_id {
my $self = shift;
$self->{'_worker_id'} = shift if(@_);
return $self->{'_worker_id'};
$self->{'_role_id'} = shift if(@_);
return $self->{'_role_id'};
}
sub status {
......
......@@ -166,10 +166,10 @@ sub fetch_some_by_analysis_id_limit {
}
sub fetch_all_incomplete_jobs_by_worker_id {
my ($self, $worker_id) = @_;
sub fetch_all_incomplete_jobs_by_role_id {
my ($self, $role_id) = @_;
my $constraint = "status IN ('CLAIMED','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP') AND worker_id='$worker_id'";
my $constraint = "status IN ('CLAIMED','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP') AND role_id='$role_id'";
return $self->fetch_all($constraint);
}
......@@ -298,12 +298,12 @@ sub store_out_files {
my ($self, $job) = @_;
if($job->stdout_file or $job->stderr_file) {
my $insert_sql = 'REPLACE INTO job_file (job_id, retry, worker_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
my $insert_sql = 'REPLACE INTO job_file (job_id, retry, role_id, stdout_file, stderr_file) VALUES (?,?,?,?,?)';
my $sth = $self->dbc()->prepare($insert_sql);
$sth->execute($job->dbID(), $job->retry_count(), $job->worker_id(), $job->stdout_file(), $job->stderr_file());
$sth->execute($job->dbID(), $job->retry_count(), $job->role_id(), $job->stdout_file(), $job->stderr_file());
$sth->finish();
} else {
my $sql = 'DELETE from job_file WHERE worker_id='.$job->worker_id.' AND job_id='.$job->dbID;
my $sql = 'DELETE from job_file WHERE role_id='.$job->role_id.' AND job_id='.$job->dbID;
$self->dbc->do($sql);
}
}
......@@ -312,8 +312,8 @@ sub store_out_files {
=head2 reset_or_grab_job_by_dbID
Arg [1] : int $job_id
Arg [2] : int $worker_id (optional)
Description: resets a job to to 'READY' (if no $worker_id given) or directly to 'CLAIMED' so it can be run again, and fetches it.
Arg [2] : int $role_id (optional)
Description: resets a job to to 'READY' (if no $role_id given) or directly to 'CLAIMED' so it can be run again, and fetches it.
NB: Will also reset a previously 'SEMAPHORED' job to READY.
The retry_count will be set to 1 for previously run jobs (partially or wholly) to trigger PRE_CLEANUP for them,
but will not change retry_count if a job has never *really* started.
......@@ -322,21 +322,19 @@ sub store_out_files {
=cut
sub reset_or_grab_job_by_dbID {
my $self = shift;
my $job_id = shift;
my $worker_id = shift;
my ($self, $job_id, $role_id) = @_;
my $new_status = ($worker_id?'CLAIMED':'READY');
my $new_status = $role_id ? 'CLAIMED' : 'READY';
# Note: the order of the fields being updated is critical!
my $sql = qq{
UPDATE job
SET retry_count = CASE WHEN (status='READY' OR status='CLAIMED') THEN retry_count ELSE 1 END
, status=?
, worker_id=?
, role_id=?
WHERE job_id=?
};
my @values = ($new_status, $worker_id, $job_id);
my @values = ($new_status, $role_id, $job_id);
my $sth = $self->prepare( $sql );
my $return_code = $sth->execute( @values )
......@@ -349,13 +347,14 @@ sub reset_or_grab_job_by_dbID {
}
=head2 grab_jobs_for_worker
=head2 grab_jobs_for_role
Arg [1] : Bio::EnsEMBL::Hive::Worker object $worker
Arg [1] : Bio::EnsEMBL::Hive::Role object $role
Arg [2] : int $how_many_this_role
Example:
my $jobs = $job_adaptor->grab_jobs_for_worker( $worker );
my $jobs = $job_adaptor->grab_jobs_for_role( $role, $how_many );
Description:
For the specified worker, it will search available jobs,
For the specified Role, it will search available jobs,
and using the how_many_this_batch parameter, claim/fetch that
number of jobs, and then return them.
Returntype :
......@@ -364,13 +363,13 @@ sub reset_or_grab_job_by_dbID {
=cut
sub grab_jobs_for_worker {
my ($self, $worker, $how_many_this_batch, $workers_rank) = @_;
sub grab_jobs_for_role {
my ($self, $role, $how_many_this_batch) = @_;
my $current_role = $worker->current_role;
my $analysis_id = $current_role->analysis_id();
my $worker_id = $worker->dbID();
my $offset = $how_many_this_batch*$workers_rank;
my $analysis_id = $role->analysis_id;
my $role_id = $role->dbID;
my $role_rank = $self->db->get_RoleAdaptor->get_role_rank( $role );
my $offset = $how_many_this_batch * $role_rank;
my $prefix_sql = ($self->dbc->driver eq 'mysql') ? qq{
UPDATE job j
......@@ -381,7 +380,7 @@ sub grab_jobs_for_worker {
AND status='READY'
} : qq{
UPDATE job
SET worker_id='$worker_id', status='CLAIMED'
SET role_id='$role_id', status='CLAIMED'
WHERE job_id in (
SELECT job_id
FROM job
......@@ -394,7 +393,7 @@ sub grab_jobs_for_worker {
my $suffix_sql = ($self->dbc->driver eq 'mysql') ? qq{
) as x
USING (job_id)
SET j.worker_id='$worker_id', j.status='CLAIMED'
SET j.role_id='$role_id', j.status='CLAIMED'
WHERE j.status='READY'
} : qq{
)
......@@ -408,48 +407,48 @@ sub grab_jobs_for_worker {
}
}
return $self->fetch_all_by_worker_id_AND_status($worker_id, 'CLAIMED') ;
return $self->fetch_all_by_role_id_AND_status($role_id, 'CLAIMED') ;
}
=head2 release_undone_jobs_from_worker
=head2 release_undone_jobs_from_role
Arg [1] : Bio::EnsEMBL::Hive::Worker object
Arg [1] : Bio::EnsEMBL::Hive::Role object
Arg [2] : optional message to be recorded in 'job_message' table
Example :
Description: If a worker has died some of its jobs need to be reset back to 'READY'
Description: If a Worker has died some of its jobs need to be reset back to 'READY'
so they can be rerun.
Jobs in state CLAIMED as simply reset back to READY.
If jobs was 'in progress' (PRE_CLEANUP, FETCH_INPUT, RUN, WRITE_OUTPUT, POST_CLEANUP)
the retry_count is increased and the status set back to READY.
If the retry_count >= $max_retry_count (3 by default) the job is set
to 'FAILED' and not rerun again.
Exceptions : $worker must be defined
Exceptions : $role must be defined
Caller : Bio::EnsEMBL::Hive::Queen
=cut
sub release_undone_jobs_from_worker {
my ($self, $worker, $msg) = @_;
sub release_undone_jobs_from_role {
my ($self, $role, $msg) = @_;
my $current_role = $worker->current_role;
my $analysis = $current_role->analysis;
my $max_retry_count = $analysis->max_retry_count();
my $worker_id = $worker->dbID();
my $role_id = $role->dbID;
my $analysis = $role->analysis;
my $max_retry_count = $analysis->max_retry_count;
my $worker = $role->worker;
#first just reset the claimed jobs, these don't need a retry_count index increment:
# (previous worker_id does not matter, because that worker has never had a chance to run the job)
# (previous role_id does not matter, because that Role has never had a chance to run the job)
$self->dbc->do( qq{
UPDATE job
SET status='READY', worker_id=NULL
WHERE worker_id='$worker_id'
SET status='READY', role_id=NULL
WHERE role_id='$role_id'
AND status='CLAIMED'
} );
my $sth = $self->prepare( qq{
SELECT job_id
FROM job
WHERE worker_id='$worker_id'
WHERE role_id='$role_id'
AND status in ('PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP')
} );
$sth->execute();
......@@ -490,7 +489,7 @@ sub release_and_age_job {
$runtime_msec = "NULL" unless(defined $runtime_msec);
# NB: The order of updated fields IS important. Here we first find out the new status and then increment the retry_count:
#
# FIXME: would it be possible to retain worker_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
# FIXME: would it be possible to retain role_id for READY jobs in order to temporarily keep track of the previous (failed) worker?
#
$self->dbc->do(
"UPDATE job "
......
......@@ -55,9 +55,11 @@ sub store_job_message {
# Note: the timestamp 'time' column will be set automatically
my $sql = qq{
INSERT INTO $table_name (job_id, worker_id, retry, status, msg, is_error)
SELECT job_id, worker_id, retry_count, status, ?, ?
FROM job WHERE job_id=?
INSERT INTO $table_name (job_id, role_id, worker_id, retry, status, msg, is_error)
SELECT job_id, role_id, worker_id, retry_count, status, ?, ?
FROM job
JOIN role USING(role_id)
WHERE job_id=?
};
my $sth = $self->prepare( $sql );
......
......@@ -134,5 +134,12 @@ sub print_active_role_counts {
}
sub fetch_all_finished_roles_with_unfinished_jobs {
my $self = shift;
return $self->fetch_all( "JOIN job USING(role_id) WHERE when_finished IS NOT NULL AND status NOT IN ('DONE', 'READY', 'FAILED', 'PASSED_ON') GROUP BY role_id" );
}
1;
......@@ -210,7 +210,7 @@ sub specialize_new_worker {
die "At most one of the options {-analysis_id, -logic_name, -job_id} can be set to pre-specialize a Worker";
}
my ($analysis, $stats, $special_batch);
my ($analysis, $stats);
my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
if($job_id or $analysis_id or $logic_name) { # probably pre-specialized from command-line
......@@ -234,14 +234,7 @@ sub specialize_new_worker {
warn "Increasing the semaphore count of the dependent job";
$job_adaptor->increase_semaphore_count_for_jobid( $job->semaphored_job_id );
}
my $worker_id = $worker->dbID;
if($job = $job_adaptor->reset_or_grab_job_by_dbID($job_id, $worker_id)) {
$special_batch = [ $job ];
$analysis_id = $job->analysis_id;
} else {
die "Could not claim job with dbID='$job_id' for worker with dbID='$worker_id'";
}
$analysis_id = $job->analysis_id;
}
if($logic_name) {
......@@ -263,7 +256,7 @@ sub specialize_new_worker {
$stats = $analysis->stats;
$self->safe_synchronize_AnalysisStats($stats);
unless($special_batch or $force) { # do we really need to run this analysis?
unless($job_id or $force) { # do we really need to run this analysis?
if($self->db->get_RoleAdaptor->get_hive_current_load() >= 1.1) {
$worker->cause_of_death('HIVE_OVERLOAD');
die "Hive is overloaded, can't specialize a worker";
......@@ -298,8 +291,15 @@ sub specialize_new_worker {
$role_adaptor->store( $new_role );
$worker->current_role( $new_role );
if($special_batch) {
$worker->special_batch( $special_batch );
if($job_id) {
my $role_id = $new_role->dbID;
if( my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id, $role_id) ) {
$worker->special_batch( [ $job ] );
} else {
die "Could not claim job with dbID='$job_id' for Role with dbID='$role_id'";
}
} else { # count it as autonomous worker sharing the load of that analysis:
$analysis_stats_adaptor->update_status($analysis_id, 'WORKING');
......@@ -355,7 +355,7 @@ sub register_worker_death {
or $cause_of_death eq 'HIVE_OVERLOAD'
or $cause_of_death eq 'LIFESPAN'
) {
$self->db->get_AnalysisJobAdaptor->release_undone_jobs_from_worker($worker);
$self->db->get_AnalysisJobAdaptor->release_undone_jobs_from_role( $current_role );
}
# re-sync the analysis_stats when a worker dies as part of dynamic sync system
......@@ -444,14 +444,14 @@ sub check_for_dead_workers { # scans the whole Valley for lost Workers (but i
# the following bit is completely Meadow-agnostic and only restores database integrity:
if($check_buried_in_haste) {
warn "GarbageCollector:\tChecking for Workers buried in haste...\n";
my $buried_in_haste_list = $self->fetch_all_dead_workers_with_jobs();
warn "GarbageCollector:\tChecking for Workers/Roles buried in haste...\n";
my $buried_in_haste_list = $self->db->get_RoleAdaptor->fetch_all_finished_roles_with_unfinished_jobs();
if(my $bih_number = scalar(@$buried_in_haste_list)) {
warn "GarbageCollector:\tfound $bih_number jobs, reclaiming.\n\n";
if($bih_number) {
my $job_adaptor = $self->db->get_AnalysisJobAdaptor();
foreach my $worker (@$buried_in_haste_list) {
$job_adaptor->release_undone_jobs_from_worker($worker);
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
foreach my $role (@$buried_in_haste_list) {
$job_adaptor->release_undone_jobs_from_role( $role );
}
}
} else {
......@@ -522,13 +522,6 @@ sub fetch_overdue_workers {
}
sub fetch_all_dead_workers_with_jobs {
my $self = shift;
return $self->fetch_all( "JOIN job j USING(worker_id) WHERE worker.status='DEAD' AND j.status NOT IN ('DONE', 'READY', 'FAILED', 'PASSED_ON') GROUP BY worker_id" );
}
=head2 synchronize_hive
Arg [1] : $filter_analysis (optional)
......
......@@ -156,7 +156,7 @@ sub AUTOLOAD {
# attempt to lazy-load:
} elsif( !$self->{$foo_obj_method_name} and my $foo_object_id = $self->{$foo_id_method_name}) {
my $foo_class = 'Bio::EnsEMBL::Hive::'.$AdaptorType;
my $collection = $foo_class->collection();
my $collection = $foo_class->can('collection') && $foo_class->collection();
if( $collection and $self->{$foo_obj_method_name} = $collection->find_one_by('dbID', $foo_object_id) ) { # careful: $AdaptorType may not be unique (aliases)
# warn "Lazy-loading object from $AdaptorType collection\n";
} elsif(my $adaptor = $self->adaptor) {
......
......@@ -456,12 +456,13 @@ sub run {
} else { # a proper "BATCHES" loop
while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
my $current_role = $self->current_role;
if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_worker_id( $self->dbID ) }) ) {
if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
$self->worker_say( $msg );
$self->cause_of_death('CONTAMINATED');
$job_adaptor->release_undone_jobs_from_worker($self, $msg);
$job_adaptor->release_undone_jobs_from_role($current_role, $msg);
} elsif( $self->job_limiter->reached()) {
$self->worker_say( "job_limit reached (".$self->work_done." jobs completed)" );
......@@ -472,12 +473,10 @@ sub run {
$self->cause_of_death('LIFESPAN');
} else {
my $current_role = $self->current_role;
my $desired_batch_size = $current_role->analysis->stats->get_or_estimate_batch_size();
$desired_batch_size = $self->job_limiter->preliminary_offer( $desired_batch_size );
my $role_rank = $self->adaptor->db->get_RoleAdaptor->get_role_rank( $current_role );
my $actual_batch = $job_adaptor->grab_jobs_for_worker( $self, $desired_batch_size, $role_rank );
my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
if(scalar(@$actual_batch)) {
my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
$jobs_done_by_batches_loop += $jobs_done_by_this_batch;
......
......@@ -47,10 +47,10 @@ ALTER TABLE analysis_base ADD FOREIGN KEY (resource_class_id)
ALTER TABLE resource_description ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE worker ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE job ADD FOREIGN KEY (role_id) REFERENCES role(role_id) ON DELETE CASCADE;
ALTER TABLE job_file ADD FOREIGN KEY (role_id) REFERENCES role(role_id) ON DELETE CASCADE;
ALTER TABLE log_message ADD FOREIGN KEY (role_id) REFERENCES role(role_id) ON DELETE CASCADE;
ALTER TABLE job ADD FOREIGN KEY (worker_id) REFERENCES worker(worker_id) ON DELETE CASCADE;
ALTER TABLE job_file ADD FOREIGN KEY (worker_id) REFERENCES worker(worker_id) ON DELETE CASCADE;
ALTER TABLE log_message ADD FOREIGN KEY (worker_id) REFERENCES worker(worker_id) ON DELETE CASCADE;
ALTER TABLE role ADD FOREIGN KEY (worker_id) REFERENCES worker(worker_id) ON DELETE CASCADE;
ALTER TABLE worker_resource_usage ADD FOREIGN KEY (worker_id) REFERENCES worker(worker_id) ON DELETE CASCADE;
......
-- First remove the ForeignKeys from job.worker_id and job_file.worker_id:
ALTER TABLE job DROP FOREIGN KEY job_ibfk_4;
ALTER TABLE job_file DROP FOREIGN KEY job_file_ibfk_2;
-- Also remove Indices from the old columns:
ALTER TABLE job DROP INDEX worker_id;
ALTER TABLE job_file DROP INDEX worker_id;
-- Use the existing worker_id columns as reference to add role_id columns:
ALTER TABLE job ADD COLUMN role_id INTEGER DEFAULT NULL AFTER worker_id;
ALTER TABLE job_file ADD COLUMN role_id INTEGER DEFAULT NULL AFTER worker_id;
-- Pretend we had role entries from the very beginning (the data is very approximately correct!):
UPDATE job j, role r SET j.role_id=r.role_id WHERE r.worker_id=j.worker_id AND CASE WHEN completed IS NOT NULL THEN when_started<=completed AND (when_finished IS NULL OR completed<=when_finished) ELSE when_finished IS NULL END;
UPDATE job_file jf, job j SET jf.role_id=j.role_id WHERE jf.job_id=j.job_id;
-- Now we can drop the columns themselves:
ALTER TABLE job DROP COLUMN worker_id;
ALTER TABLE job_file DROP COLUMN worker_id;
-- Add new Indices:
ALTER TABLE job ADD INDEX role_status (role_id, status);
ALTER TABLE job_file ADD INDEX role (role_id);
-- Add ForeignKeys on the new columns:
ALTER TABLE job ADD FOREIGN KEY (role_id) REFERENCES role(role_id) ON DELETE CASCADE;
ALTER TABLE job_file ADD FOREIGN KEY (role_id) REFERENCES role(role_id) ON DELETE CASCADE;
-- UPDATE hive_sql_schema_version
UPDATE hive_meta SET meta_value=62 WHERE meta_key='hive_sql_schema_version' AND meta_value='61';
-- First remove the ForeignKeys from job.worker_id and job_file.worker_id:
ALTER TABLE job DROP CONSTRAINT job_worker_id_fkey;
ALTER TABLE job_file DROP CONSTRAINT job_file_worker_id_fkey;
-- Also remove Indices from the old columns:
DROP INDEX job_worker_id_status_idx;
DROP INDEX job_file_worker_id_idx;
-- Add role_id columns:
ALTER TABLE job ADD COLUMN role_id INTEGER DEFAULT NULL;
ALTER TABLE job_file ADD COLUMN role_id INTEGER DEFAULT NULL;
-- Pretend we had role entries from the very beginning (the data is very approximately correct!):
UPDATE job j set role_id = (SELECT r.role_id FROM role r WHERE r.worker_id=j.worker_id AND CASE WHEN completed IS NOT NULL THEN when_started<=completed AND (when_finished IS NULL OR completed<=when_finished) ELSE when_finished IS NULL END);
UPDATE job_file jf set role_id = (SELECT role_id FROM job j WHERE j.job_id=jf.job_id);
-- Now we can drop the columns themselves:
ALTER TABLE job DROP COLUMN worker_id;
ALTER TABLE job_file DROP COLUMN worker_id;
-- Add new Indices:
CREATE INDEX ON job (role_id, status);
CREATE INDEX ON job_file (role_id);
-- Add ForeignKeys on the new columns:
ALTER TABLE job ADD FOREIGN KEY (role_id) REFERENCES role(role_id) ON DELETE CASCADE;
ALTER TABLE job_file ADD FOREIGN KEY (role_id) REFERENCES role(role_id) ON DELETE CASCADE;
-- UPDATE hive_sql_schema_version
UPDATE hive_meta SET meta_value=62 WHERE meta_key='hive_sql_schema_version' AND meta_value='61';
......@@ -312,7 +312,7 @@ CREATE TABLE resource_description (
@column input_id input data passed into Analysis:RunnableDB to control the work
@column param_id_stack a CSV of job_ids whose input_ids contribute to the stack of local variables for the job
@column accu_id_stack a CSV of job_ids whose accu's contribute to the stack of local variables for the job
@column worker_id link to worker table to define which worker claimed this job
@column role_id links to the Role that claimed this job (NULL means it has never been claimed)
@column status state the job is in
@column retry_count number times job had to be reset when worker failed to run it
@column completed when the job was completed
......@@ -329,7 +329,7 @@ CREATE TABLE job (
input_id CHAR(255) NOT NULL,
param_id_stack CHAR(64) NOT NULL DEFAULT '',
accu_id_stack CHAR(64) NOT NULL DEFAULT '',
worker_id INTEGER DEFAULT NULL,
role_id INTEGER DEFAULT NULL,
status ENUM('SEMAPHORED','READY','CLAIMED','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
completed TIMESTAMP NULL, -- mysql's special for "TIMESTAMP DEFAULT NULL"
......@@ -341,7 +341,7 @@ CREATE TABLE job (
UNIQUE KEY input_id_stacks_analysis (input_id, param_id_stack, accu_id_stack, analysis_id), -- to avoid repeating tasks
KEY analysis_status_retry (analysis_id, status, retry_count), -- for claiming jobs
KEY worker_id (worker_id, status) -- for fetching and releasing claimed jobs
KEY role_status (role_id, status) -- for fetching and releasing claimed jobs
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......@@ -356,8 +356,8 @@ CREATE TABLE job (
There is max one entry per job_id and retry.
@column job_id foreign key
@column worker_id link to worker table to define which worker claimed this job
@column retry copy of retry_count of job as it was run
@column role_id links to the Role that claimed this job
@column stdout_file path to the job's STDOUT log
@column stderr_file path to the job's STDERR log
*/
......@@ -365,12 +365,12 @@ CREATE TABLE job (
CREATE TABLE job_file (
job_id INTEGER NOT NULL,
retry INTEGER NOT NULL,
worker_id INTEGER NOT NULL,
role_id INTEGER NOT NULL,
stdout_file VARCHAR(255),
stderr_file VARCHAR(255),
PRIMARY KEY job_retry (job_id, retry),
KEY worker_id (worker_id)
KEY role (role_id)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......
......@@ -315,7 +315,7 @@ CREATE TABLE resource_description (
@column input_id input data passed into Analysis:RunnableDB to control the work
@column param_id_stack a CSV of job_ids whose input_ids contribute to the stack of local variables for the job
@column accu_id_stack a CSV of job_ids whose accu's contribute to the stack of local variables for the job
@column worker_id link to worker table to define which worker claimed this job
@column role_id links to the Role that claimed this job (NULL means it has never been claimed)
@column status state the job is in
@column retry_count number times job had to be reset when worker failed to run it
@column completed when the job was completed
......@@ -335,7 +335,7 @@ CREATE TABLE job (
input_id TEXT NOT NULL,
param_id_stack TEXT NOT NULL DEFAULT '',
accu_id_stack TEXT NOT NULL DEFAULT '',
worker_id INTEGER DEFAULT NULL,
role_id INTEGER DEFAULT NULL,
status jw_status NOT NULL DEFAULT 'READY',
retry_count INTEGER NOT NULL DEFAULT 0,
completed TIMESTAMP DEFAULT NULL,
......@@ -348,7 +348,7 @@ CREATE TABLE job (
UNIQUE (input_id, param_id_stack, accu_id_stack, analysis_id) -- to avoid repeating tasks
);
CREATE INDEX ON job (analysis_id, status, retry_count); -- for claiming jobs
CREATE INDEX ON job (worker_id, status); -- for fetching and releasing claimed jobs
CREATE INDEX ON job (role_id, status); -- for fetching and releasing claimed jobs
/**
......@@ -362,7 +362,7 @@ CREATE INDEX ON job (worker_id, status); -- for fetching and rele
There is max one entry per job_id and retry.
@column job_id foreign key
@column worker_id link to worker table to define which worker claimed this job
@column role_id links to the Role that claimed this job
@column retry copy of retry_count of job as it was run
@column stdout_file path to the job's STDOUT log
@column stderr_file path to the job's STDERR log
......@@ -371,13 +371,13 @@ CREATE INDEX ON job (worker_id, status); -- for fetching and rele
CREATE TABLE job_file (
job_id INTEGER NOT NULL,
retry INTEGER NOT NULL,
worker_id INTEGER NOT NULL,
role_id INTEGER NOT NULL,
stdout_file VARCHAR(255),
stderr_file VARCHAR(255),
PRIMARY KEY (job_id, retry)
);
CREATE INDEX ON job_file (worker_id);
CREATE INDEX ON job_file (role_id);
/**
......
......@@ -310,7 +310,7 @@ CREATE TABLE resource_description (
@column input_id input data passed into Analysis:RunnableDB to control the work
@column param_id_stack a CSV of job_ids whose input_ids contribute to the stack of local variables for the job
@column accu_id_stack a CSV of job_ids whose accu's contribute to the stack of local variables for the job
@column worker_id link to worker table to define which worker claimed this job
@column role_id links to the Role that claimed this job (NULL means it has never been claimed)
@column status state the job is in
@column retry_count number times job had to be reset when worker failed to run it
@column completed when the job was completed
......@@ -327,7 +327,7 @@ CREATE TABLE job (
input_id CHAR(255) NOT NULL,
param_id_stack CHAR(64) NOT NULL DEFAULT '',
accu_id_stack CHAR(64) NOT NULL DEFAULT '',
worker_id INTEGER DEFAULT NULL,
role_id INTEGER DEFAULT NULL,
status TEXT NOT NULL DEFAULT 'READY', /* enum('SEMAPHORED','READY','CLAIMED','COMPILATION','FETCH_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL, */
retry_count INTEGER NOT NULL DEFAULT 0,
completed TIMESTAMP DEFAULT NULL,
......@@ -339,7 +339,7 @@ CREATE TABLE job (
);
CREATE UNIQUE INDEX job_input_id_stacks_analysis_idx ON job (input_id, param_id_stack, accu_id_stack, analysis_id);
CREATE INDEX job_analysis_status_retry_idx ON job (analysis_id, status, retry_count);
CREATE INDEX job_worker_idx ON job (worker_id);
CREATE INDEX job_role_id_status_id_idx ON job (role_id);
/**
......@@ -353,8 +353,8 @@ CREATE INDEX job_worker_idx ON job (worker_id);
There is max one entry per job_id and retry.
@column job_id foreign key
@column worker_id link to worker table to define which worker claimed this job
@column retry copy of retry_count of job as it was run
@column role_id links to the Role that claimed this job
@column stdout_file path to the job's STDOUT log
@column stderr_file path to the job's STDERR log
*/
......@@ -362,13 +362,13 @@ CREATE INDEX job_worker_idx ON job (worker_id);
CREATE TABLE job_file (
job_id INTEGER NOT NULL,
retry INTEGER NOT NULL,
worker_id INTEGER NOT NULL,
role_id INTEGER NOT NULL,
stdout_file VARCHAR(255),
stderr_file VARCHAR(255),
PRIMARY KEY (job_id, retry)
);
CREATE INDEX job_file_worker_idx ON job_file (worker_id);
CREATE INDEX job_file_role_id_idx ON job_file (role_id);
/**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or