Commit 7b5e6d1a authored by Albert Vilella's avatar Albert Vilella
Browse files

adding a silent option (do not try to synchronize too much when the hive load...

adding a silent option (do not try to synchronize too much when the hive load is too high) and maximise_concurrency option (try to run as many concurrent analyses as possible instead of lots of jobs of one single analysis)
parent f9a51707
......@@ -108,6 +108,7 @@ sub create_new_worker {
$analysis_id = $job->analysis_id if(defined($job));
my $analysisStats;
if($analysis_id) {
$analysisStats = $analStatsDBA->fetch_by_analysis_id($analysis_id);
$self->safe_synchronize_AnalysisStats($analysisStats);
......@@ -166,7 +167,6 @@ sub register_worker_death {
my ($self, $worker) = @_;
return unless($worker);
# if called without a defined cause_of_death, assume catastrophic failure
$worker->cause_of_death('FATALITY') unless(defined($worker->cause_of_death));
unless ($worker->cause_of_death() eq "HIVE_OVERLOAD") {
......@@ -436,6 +436,13 @@ sub synchronize_AnalysisStats {
my $self = shift;
my $analysisStats = shift;
# Trying to make hive not synchronize if there is a high load in the
# server, e.g. during blasts (max 450 workers). The best thing I
# could find is the combination of these two numbers
if (($self->get_hive_current_load("silent") > 0.9) && $self->get_num_running_workers("silent") > 400) {
return $analysisStats;
}
return $analysisStats unless($analysisStats);
return $analysisStats unless($analysisStats->analysis_id);
......@@ -446,8 +453,12 @@ sub synchronize_AnalysisStats {
$analysisStats->failed_job_count(0);
$analysisStats->num_required_workers(0);
my $sql = "SELECT status, count(*) FROM analysis_job ".
"WHERE analysis_id=? GROUP BY status";
# my $sql = "SELECT status, count(*) FROM analysis_job ".
# "WHERE analysis_id=? GROUP BY status";
# This should be better in terms of performance
# http://www.mysqlperformanceblog.com/2007/08/16/how-much-overhead-is-caused-by-on-disk-temporary-tables/
my $sql = "SELECT status, count(status) FROM analysis_job ".
"WHERE analysis_id=? GROUP BY status ORDER BY NULL LIMIT 10";
my $sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id);
......@@ -577,7 +588,8 @@ sub get_num_failed_analyses
sub get_hive_current_load {
my $self = shift;
my $sql = "SELECT sum(1/analysis_stats.hive_capacity) FROM hive, analysis_stats ".
my $silent = shift;
my $sql = "SELECT /*! SQL_BUFFER_RESULT */ sum(1/analysis_stats.hive_capacity) FROM hive, analysis_stats ".
"WHERE hive.analysis_id=analysis_stats.analysis_id and cause_of_death ='' ".
"AND analysis_stats.hive_capacity>0";
my $sth = $self->prepare($sql);
......@@ -585,20 +597,22 @@ sub get_hive_current_load {
(my $load)=$sth->fetchrow_array();
$sth->finish;
$load=0 unless($load);
print("current hive load = $load\n");
print("current hive load = $load\n") unless (defined($silent));
print("*") if ($silent eq 'silent');
return $load;
}
sub get_num_running_workers {
my $self = shift;
my $silent = shift;
my $sql = "SELECT count(*) FROM hive WHERE cause_of_death =''";
my $sth = $self->prepare($sql);
$sth->execute();
(my $runningCount)=$sth->fetchrow_array();
$sth->finish;
$runningCount=0 unless($runningCount);
print("current hive num_running_workers = $runningCount\n");
print("current hive num_running_workers = $runningCount\n") unless (defined($silent));
return $runningCount;
}
......@@ -626,7 +640,7 @@ sub get_num_needed_workers {
my $analysis = shift;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $neededAnals = $statsDBA->fetch_by_needed_workers();
my $neededAnals = $statsDBA->fetch_by_needed_workers(undef,$self->{maximise_concurrency});
my $deeper_stats_list = $statsDBA->fetch_by_status('LOADING', 'BLOCKED');
push @$neededAnals, @$deeper_stats_list;
......@@ -678,7 +692,7 @@ sub get_num_needed_workers {
sub get_hive_progress
{
my $self = shift;
my $sql = "SELECT sum(done_job_count), sum(failed_job_count), sum(total_job_count), ".
my $sql = "SELECT /*! SQL_BUFFER_RESULT */ sum(done_job_count), sum(failed_job_count), sum(total_job_count), ".
"sum(unclaimed_job_count * analysis_stats.avg_msec_per_job)/1000/60/60 ".
"FROM analysis_stats";
my $sth = $self->prepare($sql);
......@@ -777,7 +791,7 @@ sub _pick_best_analysis_for_new_worker {
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
return undef unless($statsDBA);
my ($stats) = @{$statsDBA->fetch_by_needed_workers(1)};
my ($stats) = @{$statsDBA->fetch_by_needed_workers(1,$self->{maximise_concurrency})};
if($stats) {
#synchronize and double check that it can be run
$self->safe_synchronize_AnalysisStats($stats);
......@@ -801,7 +815,7 @@ sub _pick_best_analysis_for_new_worker {
return $stats if(($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0));
}
($stats) = @{$statsDBA->fetch_by_needed_workers(1)};
($stats) = @{$statsDBA->fetch_by_needed_workers(1,$self->{maximise_concurrency})};
return $stats if($stats);
return undef;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment