Commit 76b1900d authored by Leo Gordon's avatar Leo Gordon
Browse files

make num_running_workers updatable by triggers + better updates during worker check-in

parent 2d5ff61c
...@@ -203,11 +203,11 @@ sub update { ...@@ -203,11 +203,11 @@ sub update {
$sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count(); $sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count();
$sql .= ",done_job_count=" . $stats->done_job_count(); $sql .= ",done_job_count=" . $stats->done_job_count();
$sql .= ",failed_job_count=" . $stats->failed_job_count(); $sql .= ",failed_job_count=" . $stats->failed_job_count();
}
$stats->num_running_workers( $self->db->get_Queen->count_running_workers( $stats->analysis_id() ) ); $stats->num_running_workers( $self->db->get_Queen->count_running_workers( $stats->analysis_id() ) );
$sql .= ",num_running_workers=" . $stats->num_running_workers();
}
$sql .= ",num_running_workers=" . $stats->num_running_workers();
$sql .= ",num_required_workers=" . $stats->num_required_workers(); $sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",last_update=CURRENT_TIMESTAMP"; $sql .= ",last_update=CURRENT_TIMESTAMP";
$sql .= ",sync_lock='0'"; $sql .= ",sync_lock='0'";
...@@ -308,18 +308,7 @@ sub decrease_running_workers ...@@ -308,18 +308,7 @@ sub decrease_running_workers
$self->dbc->do($sql); $self->dbc->do($sql);
} }
sub decrease_running_workers_on_hive_overload { sub decrease_required_workers
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
"WHERE num_running_workers > hive_capacity AND analysis_id = $analysis_id ";
my $row_count = $self->dbc->do($sql);
return $row_count;
}
sub decrease_needed_workers
{ {
my $self = shift; my $self = shift;
my $analysis_id = shift; my $analysis_id = shift;
...@@ -331,7 +320,7 @@ sub decrease_needed_workers ...@@ -331,7 +320,7 @@ sub decrease_needed_workers
} }
sub increase_needed_workers sub increase_required_workers
{ {
my $self = shift; my $self = shift;
my $analysis_id = shift; my $analysis_id = shift;
......
...@@ -178,11 +178,21 @@ sub create_new_worker { ...@@ -178,11 +178,21 @@ sub create_new_worker {
return; return;
} }
$analysis_stats_adaptor->decrease_needed_workers($analysisStats->analysis_id); $analysis_stats_adaptor->decrease_required_workers($analysisStats->analysis_id);
$analysis_stats_adaptor->increase_running_workers($analysisStats->analysis_id);
$analysisStats->print_stats; $analysisStats->print_stats;
} }
# The following increment used to be done only when no specific task was given to the worker,
# thereby excluding such "special task" workers from being counted in num_running_workers.
#
# However this may be tricky to emulate by triggers that know nothing about "special tasks",
# so I am (temporarily?) simplifying the accounting algorithm.
#
unless( $self->db->hive_use_triggers() ) {
$analysis_stats_adaptor->increase_running_workers($analysisStats->analysis_id);
}
my $sql = q{INSERT INTO worker my $sql = q{INSERT INTO worker
(born, last_check_in, meadow_type, process_id, host, analysis_id) (born, last_check_in, meadow_type, process_id, host, analysis_id)
VALUES (CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, ?,?,?,?)}; VALUES (CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, ?,?,?,?)};
...@@ -243,11 +253,6 @@ sub register_worker_death { ...@@ -243,11 +253,6 @@ sub register_worker_death {
my $cod = $worker->cause_of_death(); my $cod = $worker->cause_of_death();
unless ($cod eq 'HIVE_OVERLOAD') {
## HIVE_OVERLOAD occurs after a successful update of the analysis_stats teble. (c.f. Worker.pm)
$worker->analysis->stats->adaptor->decrease_running_workers($worker->analysis->stats->analysis_id);
}
my $sql = "UPDATE worker SET died=CURRENT_TIMESTAMP, last_check_in=CURRENT_TIMESTAMP"; my $sql = "UPDATE worker SET died=CURRENT_TIMESTAMP, last_check_in=CURRENT_TIMESTAMP";
$sql .= " ,status='DEAD'"; $sql .= " ,status='DEAD'";
$sql .= " ,work_done='" . $worker->work_done . "'"; $sql .= " ,work_done='" . $worker->work_done . "'";
...@@ -256,6 +261,10 @@ sub register_worker_death { ...@@ -256,6 +261,10 @@ sub register_worker_death {
$self->dbc->do( $sql ); $self->dbc->do( $sql );
unless( $self->db->hive_use_triggers() ) {
$worker->analysis->stats->adaptor->decrease_running_workers($worker->analysis->stats->analysis_id);
}
if($cod eq 'NO_WORK') { if($cod eq 'NO_WORK') {
$self->db->get_AnalysisStatsAdaptor->update_status($worker->analysis->dbID, 'ALL_CLAIMED'); $self->db->get_AnalysisStatsAdaptor->update_status($worker->analysis->dbID, 'ALL_CLAIMED');
} }
...@@ -270,7 +279,7 @@ sub register_worker_death { ...@@ -270,7 +279,7 @@ sub register_worker_death {
if($self->safe_synchronize_AnalysisStats($worker->analysis->stats)->status ne 'DONE') { if($self->safe_synchronize_AnalysisStats($worker->analysis->stats)->status ne 'DONE') {
# since I'm dying I should make sure there is someone to take my place after I'm gone ... # since I'm dying I should make sure there is someone to take my place after I'm gone ...
# above synch still sees me as a 'living worker' so I need to compensate for that # above synch still sees me as a 'living worker' so I need to compensate for that
$self->db->get_AnalysisStatsAdaptor->increase_needed_workers($worker->analysis->dbID); $self->db->get_AnalysisStatsAdaptor->increase_required_workers($worker->analysis->dbID);
} }
} }
...@@ -334,17 +343,12 @@ sub check_for_dead_workers { # a bit counter-intuitively only looks for curre ...@@ -334,17 +343,12 @@ sub check_for_dead_workers { # a bit counter-intuitively only looks for curre
} }
} }
sub worker_check_in {
my ($self, $worker) = @_;
return unless($worker); # a new version that both checks in and updates the status
my $sql = "UPDATE worker SET last_check_in=CURRENT_TIMESTAMP"; sub check_in_worker {
$sql .= " ,work_done='" . $worker->work_done . "'"; my ($self, $worker) = @_;
$sql .= " WHERE worker_id='" . $worker->dbID ."'";
my $sth = $self->prepare($sql); $self->dbc->do("UPDATE worker SET last_check_in=CURRENT_TIMESTAMP, status='".$worker->status."', work_done='".$worker->work_done."' WHERE worker_id='".$worker->dbID."'");
$sth->execute();
$sth->finish;
} }
...@@ -527,10 +531,10 @@ sub synchronize_AnalysisStats { ...@@ -527,10 +531,10 @@ sub synchronize_AnalysisStats {
# adjust_stats_for_living_workers: # adjust_stats_for_living_workers:
if($hive_capacity > 0) { if($hive_capacity > 0) {
my $capacity_allows_to_add = $hive_capacity - $self->count_running_workers( $analysisStats->analysis_id() ); my $unfulfilled_capacity = $hive_capacity - $analysisStats->num_running_workers();
if($capacity_allows_to_add < $required_workers ) { if($unfulfilled_capacity < $required_workers ) {
$required_workers = (0 < $capacity_allows_to_add) ? $capacity_allows_to_add : 0; $required_workers = (0 < $unfulfilled_capacity) ? $unfulfilled_capacity : 0;
} }
} }
$analysisStats->num_required_workers( $required_workers ); $analysisStats->num_required_workers( $required_workers );
...@@ -563,10 +567,10 @@ sub synchronize_AnalysisStats { ...@@ -563,10 +567,10 @@ sub synchronize_AnalysisStats {
# adjust_stats_for_living_workers: # adjust_stats_for_living_workers:
if($hive_capacity > 0) { if($hive_capacity > 0) {
my $capacity_allows_to_add = $hive_capacity - $self->count_running_workers( $analysisStats->analysis_id() ); my $unfulfilled_capacity = $hive_capacity - $self->count_running_workers( $analysisStats->analysis_id() );
if($capacity_allows_to_add < $required_workers ) { if($unfulfilled_capacity < $required_workers ) {
$required_workers = (0 < $capacity_allows_to_add) ? $capacity_allows_to_add : 0; $required_workers = (0 < $unfulfilled_capacity) ? $unfulfilled_capacity : 0;
} }
} }
$analysisStats->num_required_workers( $required_workers ); $analysisStats->num_required_workers( $required_workers );
...@@ -653,12 +657,6 @@ sub count_running_workers { ...@@ -653,12 +657,6 @@ sub count_running_workers {
} }
sub enter_status {
my ($self, $worker, $status) = @_;
$self->dbc->do("UPDATE worker SET status = '$status' WHERE worker_id = ".$worker->dbID);
}
=head2 get_num_needed_workers =head2 get_num_needed_workers
Arg[1] : Bio::EnsEMBL::Analysis object (optional) Arg[1] : Bio::EnsEMBL::Analysis object (optional)
......
...@@ -532,12 +532,12 @@ sub run { ...@@ -532,12 +532,12 @@ sub run {
} }
if (!$self->cause_of_death if (!$self->cause_of_death
and $self->analysis->stats->hive_capacity >= 0 and 0 <= $self->analysis->stats->hive_capacity
and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity and $self->analysis->stats->hive_capacity < $self->analysis->stats->num_running_workers
and $self->analysis->stats->adaptor->decrease_running_workers_on_hive_overload( $self->analysis->dbID ) # careful with order, this operation has side-effect
) { ) {
$self->cause_of_death('HIVE_OVERLOAD'); $self->cause_of_death('HIVE_OVERLOAD');
} }
} while (!$self->cause_of_death); # /Worker's lifespan loop } while (!$self->cause_of_death); # /Worker's lifespan loop
if($self->perform_cleanup) { if($self->perform_cleanup) {
...@@ -569,7 +569,7 @@ sub run_one_batch { ...@@ -569,7 +569,7 @@ sub run_one_batch {
my $max_retry_count = $self->analysis->stats->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs my $max_retry_count = $self->analysis->stats->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
$self->queen->worker_check_in($self); $self->queen->check_in_worker( $self );
$self->queen->safe_synchronize_AnalysisStats($self->analysis->stats); $self->queen->safe_synchronize_AnalysisStats($self->analysis->stats);
$self->cause_of_death('NO_WORK') unless(scalar @{$jobs}); $self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
...@@ -609,20 +609,21 @@ sub run_one_batch { ...@@ -609,20 +609,21 @@ sub run_one_batch {
or $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination or $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
or $job->lethal_for_worker ) { # trust the job's expert knowledge or $job->lethal_for_worker ) { # trust the job's expert knowledge
my $reason = ($self->status eq 'COMPILATION') ? 'compilation error' my $reason = ($self->status eq 'COMPILATION') ? 'compilation error'
: $self->prev_job_error ? 'two failed jobs in a row' : $self->prev_job_error ? 'two failed jobs in a row'
: 'suggested by job itself'; : 'suggested by job itself';
warn "Job's error has contaminated the Worker ($reason), so the Worker will now die\n"; warn "Job's error has contaminated the Worker ($reason), so the Worker will now die\n";
$self->cause_of_death('CONTAMINATED'); $self->cause_of_death('CONTAMINATED');
return $jobs_done_here; return $jobs_done_here;
} }
} else { # job successfully completed: } else { # job successfully completed:
if(my $semaphored_job_id = $job->semaphored_job_id) {
$job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # step-unblock the semaphore
}
$self->more_work_done; $self->more_work_done;
$jobs_done_here++; $jobs_done_here++;
$job->update_status('DONE'); $job->update_status('DONE');
if(my $semaphored_job_id = $job->semaphored_job_id) {
$job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # step-unblock the semaphore
}
} }
$self->prev_job_error( $job->incomplete ); $self->prev_job_error( $job->incomplete );
...@@ -639,8 +640,7 @@ sub run_module_with_job { ...@@ -639,8 +640,7 @@ sub run_module_with_job {
$job->incomplete(1); $job->incomplete(1);
$job->autoflow(1); $job->autoflow(1);
$self->enter_status('COMPILATION'); $self->enter_status('COMPILATION', $job);
$job->update_status('COMPILATION');
my $runObj = $self->analysis->process or die "Unknown compilation error"; my $runObj = $self->analysis->process or die "Unknown compilation error";
my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart(); my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
...@@ -662,26 +662,20 @@ sub run_module_with_job { ...@@ -662,26 +662,20 @@ sub run_module_with_job {
$job->param_init( 0, $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() ); # Well, why not? $job->param_init( 0, $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() ); # Well, why not?
} }
$self->enter_status('GET_INPUT'); $self->enter_status('GET_INPUT', $job);
$job->update_status('GET_INPUT');
print("\nGET_INPUT\n") if($self->debug);
$self->{'fetching_stopwatch'}->continue(); $self->{'fetching_stopwatch'}->continue();
$runObj->fetch_input; $runObj->fetch_input;
$self->{'fetching_stopwatch'}->pause(); $self->{'fetching_stopwatch'}->pause();
$self->enter_status('RUN'); $self->enter_status('RUN', $job);
$job->update_status('RUN');
print("\nRUN\n") if($self->debug);
$self->{'running_stopwatch'}->continue(); $self->{'running_stopwatch'}->continue();
$runObj->run; $runObj->run;
$self->{'running_stopwatch'}->pause(); $self->{'running_stopwatch'}->pause();
if($self->execute_writes) { if($self->execute_writes) {
$self->enter_status('WRITE_OUTPUT'); $self->enter_status('WRITE_OUTPUT', $job);
$job->update_status('WRITE_OUTPUT');
print("\nWRITE_OUTPUT\n") if($self->debug);
$self->{'writing_stopwatch'}->continue(); $self->{'writing_stopwatch'}->continue();
$runObj->write_output; $runObj->write_output;
...@@ -692,7 +686,7 @@ sub run_module_with_job { ...@@ -692,7 +686,7 @@ sub run_module_with_job {
$job->dataflow_output_id(); $job->dataflow_output_id();
} }
} else { } else {
print("\n\n!!!! NOT write_output\n\n\n") if($self->debug); print("\n!!! *no* WRITE_OUTPUT and *no* AUTOFLOW\n") if($self->debug);
} }
$job->query_count($self->queen->dbc->query_count); $job->query_count($self->queen->dbc->query_count);
...@@ -702,8 +696,17 @@ sub run_module_with_job { ...@@ -702,8 +696,17 @@ sub run_module_with_job {
} }
sub enter_status { sub enter_status {
my ($self, $status) = @_; my ($self, $status, $job) = @_;
return $self->queen->enter_status($self, $status);
if($self->debug) {
printf("\n%s : $status\n", $job ? 'job '.$job->dbID : 'worker');
}
if($job) {
$job->update_status( $status );
}
$self->status( $status );
$self->queen->check_in_worker( $self );
} }
sub start_job_output_redirection { sub start_job_output_redirection {
......
...@@ -18,16 +18,40 @@ FOR EACH ROW ...@@ -18,16 +18,40 @@ FOR EACH ROW
failed_job_count = failed_job_count - (CASE OLD.status WHEN 'FAILED' THEN 1 ELSE 0 END) failed_job_count = failed_job_count - (CASE OLD.status WHEN 'FAILED' THEN 1 ELSE 0 END)
WHERE analysis_id = OLD.analysis_id; WHERE analysis_id = OLD.analysis_id;
DELIMITER $$
CREATE TRIGGER update_job AFTER UPDATE ON job CREATE TRIGGER update_job AFTER UPDATE ON job
FOR EACH ROW FOR EACH ROW
BEGIN
UPDATE analysis_stats SET
total_job_count = total_job_count - 1,
unclaimed_job_count = unclaimed_job_count - (CASE OLD.status WHEN 'READY' THEN 1 ELSE 0 END),
done_job_count = done_job_count - (CASE OLD.status WHEN 'DONE' THEN 1 WHEN 'PASSED_ON' THEN 1 ELSE 0 END),
failed_job_count = failed_job_count - (CASE OLD.status WHEN 'FAILED' THEN 1 ELSE 0 END)
WHERE analysis_id = OLD.analysis_id;
UPDATE analysis_stats SET UPDATE analysis_stats SET
unclaimed_job_count = unclaimed_job_count + (CASE NEW.status WHEN 'READY' THEN 1 ELSE 0 END) total_job_count = total_job_count + 1,
- (CASE OLD.status WHEN 'READY' THEN 1 ELSE 0 END), unclaimed_job_count = unclaimed_job_count + (CASE NEW.status WHEN 'READY' THEN 1 ELSE 0 END),
done_job_count = done_job_count + (CASE NEW.status WHEN 'DONE' THEN 1 WHEN 'PASSED_ON' THEN 1 ELSE 0 END) done_job_count = done_job_count + (CASE NEW.status WHEN 'DONE' THEN 1 WHEN 'PASSED_ON' THEN 1 ELSE 0 END),
- (CASE OLD.status WHEN 'DONE' THEN 1 WHEN 'PASSED_ON' THEN 1 ELSE 0 END),
failed_job_count = failed_job_count + (CASE NEW.status WHEN 'FAILED' THEN 1 ELSE 0 END) failed_job_count = failed_job_count + (CASE NEW.status WHEN 'FAILED' THEN 1 ELSE 0 END)
- (CASE OLD.status WHEN 'FAILED' THEN 1 ELSE 0 END)
WHERE analysis_id = NEW.analysis_id; WHERE analysis_id = NEW.analysis_id;
END$$
DELIMITER ;
CREATE TRIGGER add_worker AFTER INSERT ON worker
FOR EACH ROW
UPDATE analysis_stats SET
num_running_workers = num_running_workers + 1
WHERE analysis_id = NEW.analysis_id;
CREATE TRIGGER update_worker AFTER UPDATE ON worker
FOR EACH ROW
UPDATE analysis_stats SET
num_running_workers = num_running_workers - 1
WHERE analysis_id = NEW.analysis_id
AND OLD.status <> 'DEAD'
AND NEW.status = 'DEAD';
# inform the runtime part of the system that triggers are in place: # inform the runtime part of the system that triggers are in place:
......
...@@ -26,14 +26,38 @@ CREATE TRIGGER update_job AFTER UPDATE ON job ...@@ -26,14 +26,38 @@ CREATE TRIGGER update_job AFTER UPDATE ON job
FOR EACH ROW FOR EACH ROW
BEGIN BEGIN
UPDATE analysis_stats SET UPDATE analysis_stats SET
unclaimed_job_count = unclaimed_job_count + (CASE NEW.status WHEN 'READY' THEN 1 ELSE 0 END) total_job_count = total_job_count - 1,
- (CASE OLD.status WHEN 'READY' THEN 1 ELSE 0 END), unclaimed_job_count = unclaimed_job_count - (CASE OLD.status WHEN 'READY' THEN 1 ELSE 0 END),
done_job_count = done_job_count + (CASE NEW.status WHEN 'DONE' THEN 1 WHEN 'PASSED_ON' THEN 1 ELSE 0 END) done_job_count = done_job_count - (CASE OLD.status WHEN 'DONE' THEN 1 WHEN 'PASSED_ON' THEN 1 ELSE 0 END),
- (CASE OLD.status WHEN 'DONE' THEN 1 WHEN 'PASSED_ON' THEN 1 ELSE 0 END), failed_job_count = failed_job_count - (CASE OLD.status WHEN 'FAILED' THEN 1 ELSE 0 END)
failed_job_count = failed_job_count + (CASE NEW.status WHEN 'FAILED' THEN 1 ELSE 0 END) WHERE analysis_id = OLD.analysis_id;
- (CASE OLD.status WHEN 'FAILED' THEN 1 ELSE 0 END) UPDATE analysis_stats SET
total_job_count = total_job_count + 1,
unclaimed_job_count = unclaimed_job_count + (CASE NEW.status WHEN 'READY' THEN 1 ELSE 0 END),
done_job_count = done_job_count + (CASE NEW.status WHEN 'DONE' THEN 1 WHEN 'PASSED_ON' THEN 1 ELSE 0 END),
failed_job_count = failed_job_count + (CASE NEW.status WHEN 'FAILED' THEN 1 ELSE 0 END)
WHERE analysis_id = NEW.analysis_id;
END;
CREATE TRIGGER add_worker AFTER INSERT ON worker
FOR EACH ROW
BEGIN
UPDATE analysis_stats SET
num_running_workers = num_running_workers + 1
WHERE analysis_id = NEW.analysis_id; WHERE analysis_id = NEW.analysis_id;
END; END;
# inform the runtime part of the system that triggers are in place: CREATE TRIGGER update_worker AFTER UPDATE ON worker
FOR EACH ROW
BEGIN
UPDATE analysis_stats SET
num_running_workers = num_running_workers - 1
WHERE analysis_id = NEW.analysis_id
AND OLD.status <> 'DEAD'
AND NEW.status = 'DEAD';
END;
-- inform the runtime part of the system that triggers are in place:
INSERT INTO meta (meta_key, meta_value) VALUES ('hive_use_triggers', '1'); INSERT INTO meta (meta_key, meta_value) VALUES ('hive_use_triggers', '1');
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment