Commit e6fb56d1 authored by Jessica Severin's avatar Jessica Severin
Browse files

refactored synchronization logic to allow for worker distributed syncing.

The synchronization of the analysis_stat summary statistics was done by
the beekeeper at the top of it's loop.  For graphs with 40,000+ analyses
this centralized syncing became a bottle neck.  This new system allows
the Queen attached to each worker process to synchronize it's analysis.
Syncing happens when a worker 'checks in' and when it dies.  The sync on
'check in' only updates if the stats are >60secs out of date to prevent
over syncing.
The beekeeper still needs to do whole system syncs when a subsection has
finished and the next section needs to be 'unblocked'.  For homology this
will happen 2 times in a 16 hour run.
parent 9dc9569a
...@@ -6,12 +6,15 @@ ...@@ -6,12 +6,15 @@
=pod =pod
=head1 NAME =head1 NAME
Bio::EnsEMBL::Hive::Queen Bio::EnsEMBL::Hive::Queen
=head1 SYNOPSIS =head1 SYNOPSIS
The Queen of the Hive based job control system The Queen of the Hive based job control system
=head1 DESCRIPTION =head1 DESCRIPTION
The Queen of the Hive based job control system is responsible to 'birthing' the The Queen of the Hive based job control system is responsible to 'birthing' the
correct number of workers of the right type so that they can find jobs to do. correct number of workers of the right type so that they can find jobs to do.
It will also free up jobs of Workers that died unexpectantly so that other workers It will also free up jobs of Workers that died unexpectantly so that other workers
...@@ -26,9 +29,9 @@ ...@@ -26,9 +29,9 @@
into the Hive, creates a RunnableDB instance of the Analysis->module, into the Hive, creates a RunnableDB instance of the Analysis->module,
gets $worker->batch_size() jobs from the analysis_job table, does its gets $worker->batch_size() jobs from the analysis_job table, does its
work, creates the next layer of analysis_job entries by interfacing to work, creates the next layer of analysis_job entries by interfacing to
the DataflowRuleAdaptor to determine the analyses it needs to pass it's the DataflowRuleAdaptor to determine the analyses it needs to pass its
output data to and creates jobs on the next analysis's database. output data to and creates jobs on the next analysis database.
It repeats this cycle until it's lived it's lifetime or until there are no It repeats this cycle until it has lived its lifetime or until there are no
more jobs left. more jobs left.
The lifetime limit is just a safety limit to prevent these from 'infecting' The lifetime limit is just a safety limit to prevent these from 'infecting'
a system. a system.
...@@ -39,16 +42,18 @@ ...@@ -39,16 +42,18 @@
over the work. over the work.
The Beekeeper is in charge of interfacing between the Queen and a compute resource The Beekeeper is in charge of interfacing between the Queen and a compute resource
or 'compute farm'. It's job is to query Queens if they need any workers and to or 'compute farm'. Its job is to query Queens if they need any workers and to
send the requested number of workers to open machines via the runWorker.pl script. send the requested number of workers to open machines via the runWorker.pl script.
It is also responsible for interfacing with the Queen to identify worker which died It is also responsible for interfacing with the Queen to identify worker which died
unexpectantly. unexpectantly.
=head1 CONTACT =head1 CONTACT
Contact Jessica Severin on EnsEMBL::Hive implemetation/design detail: jessica@ebi.ac.uk Contact Jessica Severin on EnsEMBL::Hive implemetation/design detail: jessica@ebi.ac.uk
Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk
=head1 APPENDIX =head1 APPENDIX
The rest of the documentation details each of the object methods. The rest of the documentation details each of the object methods.
Internal methods are usually preceded with a _ Internal methods are usually preceded with a _
...@@ -76,9 +81,14 @@ our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor); ...@@ -76,9 +81,14 @@ our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
=head2 create_new_worker =head2 create_new_worker
Arg [1] : $analysis_id Arg [1] : $analysis_id (optional)
Example : Example :
Description: Description: If analysis_id is specified it will try to create a worker based
on that analysis. If not specified the queen will analyze the hive
and pick the analysis that has the most amount of work to be done.
It creates an entry in the hive table, and returns a Worker object
based on that insert. This guarantees that each worker registered
in this queens hive is properly registered.
Returntype : Bio::EnsEMBL::Hive::Worker Returntype : Bio::EnsEMBL::Hive::Worker
Exceptions : Exceptions :
Caller : Caller :
...@@ -96,15 +106,15 @@ sub create_new_worker { ...@@ -96,15 +106,15 @@ sub create_new_worker {
return undef if($self->get_hive_current_load() >= 1.5); return undef if($self->get_hive_current_load() >= 1.5);
unless($analysis_id) { my $analysisStats;
my ($anal_stats) = @{$analStatsDBA->fetch_by_needed_workers(1)}; if($analysis_id) {
return undef unless($anal_stats); $analysisStats = $analStatsDBA->fetch_by_analysis_id($analysis_id);
$analysis_id = $anal_stats->analysis_id; } else {
$analStatsDBA->decrement_needed_workers($analysis_id); ($analysisStats) = @{$analStatsDBA->fetch_by_needed_workers(1)};
} }
my $analysisStats = $analStatsDBA->fetch_by_analysis_id($analysis_id);
return undef unless($analysisStats); return undef unless($analysisStats);
$analStatsDBA->decrement_needed_workers($analysisStats->analysis_id);
$analysisStats->print_stats; $analysisStats->print_stats;
if($analysisStats->status eq 'BLOCKED') { if($analysisStats->status eq 'BLOCKED') {
...@@ -120,14 +130,11 @@ sub create_new_worker { ...@@ -120,14 +130,11 @@ sub create_new_worker {
$pid = getppid unless($pid); $pid = getppid unless($pid);
$beekeeper = '' unless($beekeeper); $beekeeper = '' unless($beekeeper);
my $sql = "INSERT INTO hive SET born=now(), last_check_in=now()". my $sql = "INSERT INTO hive SET born=now(), last_check_in=now(), " .
",process_id='$pid' ". "process_id=?, analysis_id=?, beekeeper=?, host=?";
",analysis_id='$analysis_id' ".
",beekeeper='$beekeeper' ".
",host='$host'";
my $sth = $self->prepare($sql); my $sth = $self->prepare($sql);
$sth->execute(); $sth->execute($pid, $analysisStats->analysis_id, $beekeeper, $host);
my $hive_id = $sth->{'mysql_insertid'}; my $hive_id = $sth->{'mysql_insertid'};
$sth->finish; $sth->finish;
...@@ -135,7 +142,7 @@ sub create_new_worker { ...@@ -135,7 +142,7 @@ sub create_new_worker {
$worker=undef unless($worker and $worker->analysis); $worker=undef unless($worker and $worker->analysis);
if($worker and $analysisStats) { if($worker and $analysisStats) {
$analStatsDBA->update_status($analysis_id, 'WORKING'); $analysisStats->update_status('WORKING');
} }
return $worker; return $worker;
} }
...@@ -165,6 +172,9 @@ sub register_worker_death { ...@@ -165,6 +172,9 @@ sub register_worker_death {
#print("FATAL DEATH Arrrrgggghhhhhhhh (hive_id=",$worker->hive_id,")\n"); #print("FATAL DEATH Arrrrgggghhhhhhhh (hive_id=",$worker->hive_id,")\n");
$self->db->get_AnalysisJobAdaptor->reset_dead_jobs_for_worker($worker); $self->db->get_AnalysisJobAdaptor->reset_dead_jobs_for_worker($worker);
} }
# always re-sync the analysis_stats when a worker dies
$self->synchronize_AnalysisStats($worker->analysis->stats);
} }
...@@ -179,6 +189,13 @@ sub worker_check_in { ...@@ -179,6 +189,13 @@ sub worker_check_in {
my $sth = $self->prepare($sql); my $sth = $self->prepare($sql);
$sth->execute(); $sth->execute();
$sth->finish; $sth->finish;
# if analysis_stats for this worker's analysis is more than a minutes
# out of date, then re-synchronize it
my $stats = $worker->analysis->stats;
if($stats->seconds_since_last_update >= 60) {
$self->synchronize_AnalysisStats($stats);
}
} }
...@@ -193,77 +210,151 @@ sub fetch_overdue_workers { ...@@ -193,77 +210,151 @@ sub fetch_overdue_workers {
} }
sub update_analysis_stats { =head2 synchronize_hive
Example : $queen->synchronize_hive();
Description: Runs through all analyses in the system and synchronizes
the analysis_stats summary with the states in the analysis_job
and hive tables. Then follows by checking all the blocking rules
and blocks/unblocks analyses as needed.
Exceptions : none
Caller : general
=cut
sub synchronize_hive {
my $self = shift; my $self = shift;
my $allAnalysis = $self->db->get_AnalysisAdaptor->fetch_all; my $start_time = time();
my $allAnalysis = $self->db->get_AnalysisAdaptor->fetch_all;
print("analyze ", scalar(@$allAnalysis), "\n");
foreach my $analysis (@$allAnalysis) {
my $stats = $analysis->stats;
$self->synchronize_AnalysisStats($stats);
}
foreach my $analysis (@$allAnalysis) { foreach my $analysis (@$allAnalysis) {
$self->check_blocking_control_rules_for_AnalysisStats($analysis->stats);
}
print((time() - $start_time), " secs to synchronize_hive\n");
}
my $analysisStats = $analysis->stats;
$analysisStats->total_job_count(0);
$analysisStats->unclaimed_job_count(0);
$analysisStats->done_job_count(0);
$analysisStats->failed_job_count(0);
$analysisStats->num_required_workers(0);
my $sql = "SELECT status, count(*) FROM analysis_job ".
"WHERE analysis_id=? GROUP BY status";
my $sth = $self->prepare($sql);
$sth->execute($analysis->dbID);
while (my ($status, $count)=$sth->fetchrow_array()) {
my $total = $analysisStats->total_job_count();
$analysisStats->total_job_count($total + $count);
if($status eq 'READY') {
$analysisStats->unclaimed_job_count($count);
my $numWorkers = $count/$analysisStats->batch_size;
$numWorkers=1 if($numWorkers<1);
if($analysisStats->hive_capacity>0 and $numWorkers > $analysisStats->hive_capacity) {
$numWorkers=$analysisStats->hive_capacity;
}
$analysisStats->num_required_workers($numWorkers);
}
if($status eq 'DONE') { $analysisStats->done_job_count($count); }
if($status eq 'FAILED') { $analysisStats->failed_job_count($count); }
}
$analysisStats->determine_status()->update() if($analysisStats);
$sth->finish;
} =head2 synchronize_AnalysisStats
$self->adjust_stats_for_living_workers(); Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
} Example : $self->synchronize($analysisStats);
Description: Queries the analysis_job and hive tables to get summary counts
and rebuilds the AnalysisStats object. Then updates the
analysis_stats table with the new summary info
Returntype : newly synced Bio::EnsEMBL::Hive::AnalysisStats object
Exceptions : none
Caller : general
=cut
sub adjust_stats_for_living_workers { sub synchronize_AnalysisStats {
my $self = shift; my $self = shift;
my $analysisStats = shift;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor; return $analysisStats unless($analysisStats);
return $analysisStats unless($analysisStats->analysis_id);
return $analysisStats if($analysisStats->status eq 'BLOCKED');
return $analysisStats if($analysisStats->status eq 'SYNCHING');
my $sql = "SELECT analysis_id, count(*) FROM hive ". $analysisStats->update_status('SYNCHING');
"WHERE cause_of_death='' GROUP BY analysis_id";
my $sth = $self->prepare($sql); $analysisStats->total_job_count(0);
$sth->execute(); $analysisStats->unclaimed_job_count(0);
while (my ($analysis_id, $liveCount)=$sth->fetchrow_array()) { $analysisStats->done_job_count(0);
$analysisStats->failed_job_count(0);
my $analysis_stats = $statsDBA->fetch_by_analysis_id($analysis_id); $analysisStats->num_required_workers(0);
my $sql = "SELECT status, count(*) FROM analysis_job ".
"WHERE analysis_id=? GROUP BY status";
my $sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id);
while (my ($status, $count)=$sth->fetchrow_array()) {
my $total = $analysisStats->total_job_count();
$analysisStats->total_job_count($total + $count);
if($status eq 'READY') {
$analysisStats->unclaimed_job_count($count);
my $numWorkers = $count/$analysisStats->batch_size;
$numWorkers=1 if($numWorkers<1);
if($analysisStats->hive_capacity>0 and $numWorkers > $analysisStats->hive_capacity) {
$numWorkers=$analysisStats->hive_capacity;
}
$analysisStats->num_required_workers($numWorkers);
}
if($status eq 'DONE') { $analysisStats->done_job_count($count); }
if($status eq 'FAILED') { $analysisStats->failed_job_count($count); }
}
$sth->finish;
$analysisStats->determine_status();
#
# adjust_stats_for_living_workers
#
if($analysisStats->hive_capacity > 0) {
my $sql = "SELECT count(*) FROM hive WHERE cause_of_death='' and analysis_id=?";
$sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id);
my($liveCount)=$sth->fetchrow_array();
$sth->finish;
my $numWorkers = $analysisStats->num_required_workers;
my $capacityAdjust = ($numWorkers + $liveCount) - $analysisStats->hive_capacity;
$numWorkers -= $capacityAdjust if($capacityAdjust > 0);
$numWorkers=0 if($numWorkers<0);
$analysisStats->num_required_workers($numWorkers);
}
$analysisStats->update;
return $analysisStats;
}
if($analysis_stats->hive_capacity > 0) {
my $numWorkers = $analysis_stats->num_required_workers;
my $capacityAdjust = ($numWorkers + $liveCount) - $analysis_stats->hive_capacity; sub check_blocking_control_rules_for_AnalysisStats
$numWorkers -= $capacityAdjust if($capacityAdjust > 0); {
$numWorkers=0 if($numWorkers<0); my $self = shift;
my $stats = shift;
return unless($stats);
#print("check ctrl on analysis "); $stats->print_stats;
my $ctrlRules = $self->db->get_AnalysisCtrlRuleAdaptor->
fetch_by_ctrled_analysis_id($stats->analysis_id);
my $allRulesDone = 1;
if(scalar @$ctrlRules > 0) {
#print("HAS blocking_ctrl_rules to check\n");
foreach my $ctrlrule (@{$ctrlRules}) {
#use this method because the condition_analysis objects can be
#network distributed to a different database so use it's adaptor to get
#the AnalysisStats object
#$ctrlrule->print_rule;
my $condAnalysis = $ctrlrule->condition_analysis;
my $condStats = $condAnalysis->stats if($condAnalysis);
$allRulesDone = 0 unless($condStats and $condStats->status eq 'DONE');
#print(" "); $condStats->print_stats;
}
$analysis_stats->num_required_workers($numWorkers); if($allRulesDone) {
$analysis_stats->update; if($stats->status eq 'BLOCKED') {
#print(" UNBLOCK analysis : all conditions met\n");
$stats->update_status('READY');
}
} else {
#print(" RE-BLOCK analysis : some conditions failed\n");
$stats->update_status('BLOCKED');
} }
} }
$sth->finish;
} }
...@@ -321,42 +412,6 @@ sub get_num_needed_workers { ...@@ -321,42 +412,6 @@ sub get_num_needed_workers {
} }
sub check_blocking_control_rules
{
my $self = shift;
my $analysisStatsList = $self->db->get_AnalysisStatsAdaptor->fetch_all();
foreach my $stats (@{$analysisStatsList}) {
#print("BLOCKED analysis "); $stats->print_stats;
my $ctrlRules = $self->db->get_AnalysisCtrlRuleAdaptor->
fetch_by_ctrled_analysis_id($stats->analysis_id);
my $allRulesDone = 1;
if(scalar @$ctrlRules > 0) {
#print("HAS blocking_ctrl_rules to check\n");
foreach my $ctrlrule (@{$ctrlRules}) {
#use this method because the condition_analysis objects can be
#network distributed to a different database so use it's adaptor to get
#the AnalysisStats object
#$ctrlrule->print_rule;
my $condAnalysis = $ctrlrule->condition_analysis;
my $condStats = $condAnalysis->stats if($condAnalysis);
$allRulesDone = 0 unless($condStats and $condStats->status eq 'DONE');
#print(" "); $condStats->print_stats;
}
if($allRulesDone) {
if($stats->status eq 'BLOCKED') {
#print(" UNBLOCK analysis : all conditions met\n");
$stats->adaptor->update_status($stats->analysis_id, 'READY');
}
} else {
#print(" RE-BLOCK analysis : all conditions met\n");
$stats->adaptor->update_status($stats->analysis_id, 'BLOCKED');
}
}
}
}
sub print_hive_status sub print_hive_status
{ {
my $self = shift; my $self = shift;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment