Commit e3d44c7e authored by Jessica Severin's avatar Jessica Severin
Browse files

New distributed Queen system. Queen/hive updates its state in an incremental

and distributed manner as it interacts with the workers over the course of its life.
When a script starts and asks a queen to create a worker the queen has
a list of known analyses which are 'above the surface' where full hive analysis has
been done and the number of needed workers has been calculated. Full synch requires
joining data between the analysis, analysis_job, analysis_stats, and hive tables.
When this reached 10e7 jobs, 10e4 analyses, 10e3 workers a full hard sync took minutes
and it was clear this bit of the system wasn't scaling and wasn't going to make it
to the next order of magnitude. This occurred in the compara blastz pipeline between
mouse and rat.
Now there are some analyses 'below the surface' that have partial synchronization.
These analyses have been flagged as having 'x' new jobs (AnalysisJobAdaptor updating
analysis_stats on job insert).  If no analysis is found to asign to the newly
created worker, the queen will dip below the surface and start checking
the analyses with the highest probablity of needing the most workers.
This incremental sync is also done in Queen::get_num_needed_workers
When calculating ahead a total worker count, this routine will also dip below
the surface until the hive reaches it's current defined worker saturation.
A beekeeper is no longer a required component for the system to function.
If workers can get onto cpus the hive will run.  The beekeeper is now mainly a
user display program showing the status of the hive.  There is no longer any
central process doing work and one hive can potentially scale
beyond 10e9 jobs in graphs of 10e6 analysis nodes and 10e6 running workers.
parent af03e291
......@@ -110,10 +110,11 @@ sub create_new_worker {
if($analysis_id) {
$analysisStats = $analStatsDBA->fetch_by_analysis_id($analysis_id);
} else {
($analysisStats) = @{$analStatsDBA->fetch_by_needed_workers(1)};
$analysisStats = $self->_pick_best_analysis_for_new_worker;
return undef unless($analysisStats);
......@@ -259,10 +260,6 @@ sub synchronize_AnalysisStats {
return $analysisStats unless($analysisStats);
return $analysisStats unless($analysisStats->analysis_id);
return $analysisStats if($analysisStats->status eq 'BLOCKED');
return $analysisStats if($analysisStats->status eq 'SYNCHING');
......@@ -293,7 +290,9 @@ sub synchronize_AnalysisStats {
if($status eq 'FAILED') { $analysisStats->failed_job_count($count); }
if($analysisStats->status ne 'BLOCKED') {
# adjust_stats_for_living_workers
......@@ -373,10 +372,27 @@ sub get_hive_current_load {
=head2 get_num_needed_workers
Example : $count = $queen->get_num_needed_workers();
Description: Runs through the analyses in the system which are waiting
for workers to be created for them. Calculates the maximum
number of workers needed to fill the current needs of the system
Exceptions : none
Caller : general
sub get_num_needed_workers {
my $self = shift;
my $neededAnals = $self->db->get_AnalysisStatsAdaptor->fetch_by_needed_workers();
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $neededAnals = $statsDBA->fetch_by_needed_workers();
my $deeper_stats_list = $statsDBA->fetch_by_status('LOADING', 'BLOCKED');
push @$neededAnals, @$deeper_stats_list;
return 0 unless($neededAnals);
my $availableLoad = 1.0 - $self->get_hive_current_load();
......@@ -386,6 +402,15 @@ sub get_num_needed_workers {
foreach my $analysis_stats (@{$neededAnals}) {
#digging deeper under the surface so need to sync
if(($analysis_stats->status eq 'LOADING') or ($analysis_stats->status eq 'BLOCKED')) {
next if($analysis_stats->status eq 'BLOCKED');
next if($analysis_stats->num_required_workers == 0);
my $thisLoad = 0.0;
if($analysis_stats->hive_capacity>0) {
$thisLoad = $analysis_stats->num_required_workers * (1/$analysis_stats->hive_capacity);
......@@ -394,13 +419,13 @@ sub get_num_needed_workers {
if(($analysis_stats->hive_capacity<=0) or ($thisLoad < $availableLoad)) {
$numWorkers += $analysis_stats->num_required_workers;
$availableLoad -= $thisLoad;
printf(" %d (%1.9f) ", $numWorkers, $availableLoad);
printf("%5d (%1.3f) ", $numWorkers, $availableLoad);
} else {
my $workerCount = POSIX::ceil($availableLoad * $analysis_stats->hive_capacity );
$numWorkers += $workerCount;
$availableLoad -= $workerCount * (1/$analysis_stats->hive_capacity);
printf(" %d (%1.9f) use only %d ", $numWorkers, $availableLoad, $workerCount);
printf("%5d (%1.3f) use only %3d ", $numWorkers, $availableLoad, $workerCount);
......@@ -442,6 +467,37 @@ sub print_hive_status
sub _pick_best_analysis_for_new_worker {
my $self = shift;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
return undef unless($statsDBA);
my ($stats) = @{$statsDBA->fetch_by_needed_workers(1)};
return $stats if($stats);
# ok so no analyses 'need' workers.
# see if anything needs an update, in case there are
# hidden jobs that haven't made it into the summary stats
print("QUEEN: no obvious needed workers, need to dig deeper\n");
my $stats_list = $statsDBA->fetch_by_status('LOADING', 'BLOCKED');
foreach $stats (@$stats_list) {
return $stats if(($stats->status eq 'READY') and ($stats->num_required_workers > 0));
($stats) = @{$statsDBA->fetch_by_needed_workers(1)};
return $stats if($stats);
return undef;
=head2 _fetch_by_hive_id
Arg [1] : int $id
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment