Commit 1c3d8c6b authored by Leo Gordon's avatar Leo Gordon
Browse files

rationalized dynamic batch_size estimation: it is now done centrally both for...

rationalized dynamic batch_size estimation: it is now done centrally both for single Workers (claiming) and the Queen (sync procedure)
parent 404e8a07
......@@ -31,6 +31,13 @@ use Bio::EnsEMBL::Analysis;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::Worker;
## Minimum amount of time in msec that a worker should run before reporting
## back to the hive. This is used when setting the batch_size automatically.
sub min_batch_time {
return 2*60*1000;
}
sub new {
my ($class,@args) = @_;
my $self = bless {}, $class;
......@@ -74,12 +81,6 @@ sub increase_hive_capacity {
$self->adaptor->increase_hive_capacity($self->analysis_id);
}
sub get_running_worker_count {
my $self = shift;
return unless ($self->adaptor);
return $self->adaptor->get_running_worker_count($self);
}
sub analysis_id {
my $self = shift;
$self->{'_analysis_id'} = shift if(@_);
......@@ -104,10 +105,30 @@ sub status {
}
sub batch_size {
my $self = shift;
$self->{'_batch_size'} = shift if(@_);
$self->{'_batch_size'}=1 unless(defined($self->{'_batch_size'}));
return $self->{'_batch_size'};
my $self = shift;
$self->{'_batch_size'} = shift if(@_);
$self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # do we need to initialize it at all?
return $self->{'_batch_size'};
}
sub get_or_estimate_batch_size {
my $self = shift;
if( (my $batch_size = $self->batch_size())>0 ) { # set to positive or not set (and auto-initialized within $self->batch_size)
return $batch_size;
# otherwise it is a request for dynamic estimation:
} elsif( my $avg_msec_per_job = $self->avg_msec_per_job() ) { # further estimations from collected stats
$avg_msec_per_job = 100 if($avg_msec_per_job<100);
return POSIX::ceil( $self->min_batch_time() / $avg_msec_per_job );
} else { # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
return -$batch_size || 1;
}
}
sub avg_msec_per_job {
......@@ -279,16 +300,19 @@ sub print_stats {
$self->running_job_count,
$self->failed_job_count,
$self->avg_msec_per_job,
$self->num_required_workers, $self->hive_capacity,
$self->num_required_workers,
$self->hive_capacity,
$self->seconds_since_last_update,
);
);
} elsif ($mode == 2) {
printf("%-27s(%2d) %11s [%d/%d workers] (sync'd %d sec ago)\n",
$self->get_analysis->logic_name,
$self->analysis_id,
$self->status,
$self->num_required_workers, $self->hive_capacity,
$self->seconds_since_last_update);
$self->num_required_workers,
$self->hive_capacity,
$self->seconds_since_last_update
);
printf(" msec_per_job : %d\n", $self->avg_msec_per_job);
printf(" cpu_min_total : %d\n", $self->cpu_minutes_remaining);
......@@ -331,8 +355,7 @@ sub determine_status {
if($self->total_job_count == $self->unclaimed_job_count) {
$self->status('READY');
}
if($self->unclaimed_job_count>0 and
$self->total_job_count > $self->unclaimed_job_count) {
if( 0 < $self->unclaimed_job_count and $self->unclaimed_job_count < $self->total_job_count ) {
$self->status('WORKING');
}
}
......
......@@ -100,9 +100,9 @@ sub CreateNewJob {
}
my $dbc = $analysis->adaptor->db->dbc;
my $insertion_method = ($dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
my $status = $blocked ? 'BLOCKED' : 'READY';
my $insertion_method = ($dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
my $status = $blocked ? 'BLOCKED' : 'READY';
my $analysis_id = $analysis->dbID();
my $sql = qq{$insertion_method INTO job
(input_id, prev_job_id,analysis_id,status,semaphore_count,semaphored_job_id)
......@@ -110,15 +110,17 @@ sub CreateNewJob {
my $sth = $dbc->prepare($sql);
$sth->execute($input_id, $prev_job_id, $analysis->dbID, $status, $semaphore_count || 0, $semaphored_job_id);
$sth->execute($input_id, $prev_job_id, $analysis_id, $status, $semaphore_count || 0, $semaphored_job_id);
my $job_id = $dbc->db_handle->last_insert_id(undef, undef, 'job', 'job_id');
$sth->finish;
$dbc->do("UPDATE analysis_stats SET ".
"total_job_count=total_job_count+1 ".
",unclaimed_job_count=unclaimed_job_count+1 ".
",status='LOADING' ".
"WHERE status!='BLOCKED' and analysis_id='".$analysis->dbID ."'");
$dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
,unclaimed_job_count=unclaimed_job_count+1
,status = (CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END)
WHERE analysis_id=$analysis_id
});
return $job_id;
}
......
......@@ -211,11 +211,14 @@ sub create_new_worker {
$worker->hive_output_dir($hive_output_dir);
if($batch_size) {
$worker->set_worker_batch_size($batch_size);
$worker->batch_size($batch_size);
}
if($job_limit) {
$worker->job_limit($job_limit);
$worker->life_span(0);
if($job_limit < $worker->batch_size()) {
$worker->batch_size( $job_limit );
}
}
if($life_span) {
$worker->life_span($life_span * 60);
......@@ -512,6 +515,8 @@ sub synchronize_AnalysisStats {
return $analysisStats unless($analysisStats);
return $analysisStats unless($analysisStats->analysis_id);
my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor or return undef;
$analysisStats->refresh(); ## Need to get the new hive_capacity for dynamic analyses
$analysisStats->total_job_count(0);
......@@ -529,34 +534,33 @@ sub synchronize_AnalysisStats {
my $done_here = 0;
my $done_elsewhere = 0;
while (my ($status, $count, $semaphore_count)=$sth->fetchrow_array()) {
# print STDERR "$status - $count\n";
while (my ($status, $job_count, $semaphore_count)=$sth->fetchrow_array()) {
# print STDERR "$status: $job_count\n";
my $total = $analysisStats->total_job_count();
$analysisStats->total_job_count($total + $count);
my $curr_total = $analysisStats->total_job_count();
$analysisStats->total_job_count($curr_total + $job_count);
if(($status eq 'READY') and ($semaphore_count<=0)) {
$analysisStats->unclaimed_job_count($count);
my $numWorkers;
if($analysisStats->batch_size > 0) {
$numWorkers = POSIX::ceil($count / $analysisStats->batch_size);
} else {
my $job_msec = $analysisStats->avg_msec_per_job;
$job_msec = 100 if($job_msec>0 and $job_msec<100);
$numWorkers = POSIX::ceil(($count * $job_msec) / (3*60*1000));
# guess num needed workers by total jobs / (num jobs a worker could do in 3 minutes)
}
$numWorkers=$count if($numWorkers==0);
if($analysisStats->hive_capacity>0 and $numWorkers > $analysisStats->hive_capacity) {
$numWorkers=$analysisStats->hive_capacity;
}
$analysisStats->num_required_workers($numWorkers);
$analysisStats->unclaimed_job_count($job_count);
my $required_workers = POSIX::ceil( $job_count / $analysisStats->get_or_estimate_batch_size() );
# adjust_stats_for_living_workers:
if($hive_capacity > 0) {
my $capacity_allows_to_add = $hive_capacity - $analysis_stats_adaptor->get_running_worker_count($analysisStats);
if($capacity_allows_to_add < $required_workers ) {
$required_workers = (0 < $capacity_allows_to_add) ? $capacity_allows_to_add : 0;
}
}
$analysisStats->num_required_workers( $required_workers );
} elsif($status eq 'DONE' and $semaphore_count<=0) {
$done_here = $count;
$done_here = $job_count;
} elsif($status eq 'PASSED_ON' and $semaphore_count<=0) {
$done_elsewhere = $count;
$done_elsewhere = $job_count;
} elsif ($status eq 'FAILED') {
$analysisStats->failed_job_count($count);
$analysisStats->failed_job_count($job_count);
}
}
$sth->finish;
......@@ -569,22 +573,6 @@ sub synchronize_AnalysisStats {
$analysisStats->determine_status();
}
#
# adjust_stats_for_living_workers
#
if($analysisStats->hive_capacity > 0) {
my $liveCount = $analysisStats->get_running_worker_count();
my $numWorkers = $analysisStats->num_required_workers;
my $capacityAdjust = ($numWorkers + $liveCount) - $analysisStats->hive_capacity;
$numWorkers -= $capacityAdjust if($capacityAdjust > 0);
$numWorkers=0 if($numWorkers<0);
$analysisStats->num_required_workers($numWorkers);
}
$analysisStats->update; #update and release sync_lock
return $analysisStats;
......@@ -692,17 +680,17 @@ sub get_num_needed_workers {
next if($analysis_stats->status eq 'BLOCKED');
next if($analysis_stats->num_required_workers == 0);
# FIXME: the following call sometimes returns a stale number greater than the number of workers actually needed for an analysis; resync fixes it
# FIXME: the following call *sometimes* returns a stale number greater than the number of workers actually needed for an analysis; -sync fixes it
my $workers_this_analysis = $analysis_stats->num_required_workers;
if($analysis_stats->hive_capacity > 0) { # if there is a limit, use it for cut-off
my $limit_workers_this_analysis = int($available_load * $analysis_stats->hive_capacity);
if((my $hive_capacity = $analysis_stats->hive_capacity) > 0) { # if there is a limit, use it for cut-off
my $limit_workers_this_analysis = int($available_load * $hive_capacity);
if($workers_this_analysis > $limit_workers_this_analysis) {
$workers_this_analysis = $limit_workers_this_analysis;
}
$available_load -= 1.0*$workers_this_analysis/$analysis_stats->hive_capacity;
$available_load -= 1.0*$workers_this_analysis/$hive_capacity;
}
$total_workers += $workers_this_analysis;
$rc2workers{$analysis_stats->rc_id} += $workers_this_analysis;
......@@ -776,14 +764,14 @@ sub print_running_worker_status {
"WHERE worker.analysis_id=analysis.analysis_id AND worker.cause_of_death='' ".
"GROUP BY worker.analysis_id";
my $total = 0;
my $total_workers = 0;
my $sth = $self->prepare($sql);
$sth->execute();
while((my $logic_name, my $count)=$sth->fetchrow_array()) {
printf("%20s : %d workers\n", $logic_name, $count);
$total += $count;
while((my $logic_name, my $worker_count)=$sth->fetchrow_array()) {
printf("%20s : %d workers\n", $logic_name, $worker_count);
$total_workers += $worker_count;
}
printf(" %d total workers\n", $total);
printf(" %d total workers\n", $total_workers);
print "===========================\n";
$sth->finish;
}
......@@ -814,7 +802,8 @@ sub monitor {
). qq{
group_concat(DISTINCT logic_name)
FROM worker left join analysis USING (analysis_id)
WHERE cause_of_death = ""};
WHERE cause_of_death = ''
};
my $sth = $self->prepare($sql);
$sth->execute();
......@@ -856,10 +845,10 @@ sub _pick_best_analysis_for_new_worker {
if($stats) {
#synchronize and double check that it can be run
$self->safe_synchronize_AnalysisStats($stats);
return $stats if(($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0));
return $stats if(($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0) and (!defined($rc_id) or ($stats->rc_id == $rc_id)));
}
# ok so no analyses 'need' workers.
# ok so no analyses 'need' workers with the given $rc_id.
if ($self->get_num_failed_analyses()) {
return undef;
}
......@@ -874,10 +863,9 @@ sub _pick_best_analysis_for_new_worker {
return $stats if(($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0) and (!defined($rc_id) or ($stats->rc_id == $rc_id)));
}
# does the following really ever help?
($stats) = @{$statsDBA->fetch_by_needed_workers(1,$self->{maximise_concurrency}, $rc_id)};
return $stats if($stats);
return undef;
return $stats;
}
......
......@@ -79,10 +79,6 @@ use Bio::EnsEMBL::Hive::Process;
use Bio::EnsEMBL::Hive::Utils ('dir_revhash'); # import dir_revhash
## Minimum amount of time in msec that a worker should run before reporting
## back to the hive. This is used when setting the batch_size automatically.
## 120000 msec = 2 minutes
my $MIN_BATCH_TIME = 2*60*1000;
sub new {
my ($class,@args) = @_;
......@@ -422,14 +418,6 @@ sub cleanup_worker_process_temp_directory {
#
###############################
sub set_worker_batch_size {
my $self = shift;
my $batch_size = shift;
if(defined($batch_size)) {
$self->{'_batch_size'} = $batch_size;
}
}
=head2 batch_size
Args : none
......@@ -444,24 +432,12 @@ sub set_worker_batch_size {
Returntype : integer scalar
=cut
sub batch_size {
my $self = shift;
my $self = shift;
my $stats = $self->analysis->stats;
my $batch_size = $stats->batch_size;
if(defined($self->{'_batch_size'})) {
$batch_size = $self->{'_batch_size'};
}
if(($batch_size <= 0) and ($stats->avg_msec_per_job)) {
$batch_size = POSIX::ceil($MIN_BATCH_TIME / $stats->avg_msec_per_job); # num jobs in $MIN_BATCH_TIME msecs
}
$batch_size = 1 if($batch_size < 1); # make sure we grab at least one job
if($self->job_limit and ($self->job_limit < $batch_size)) {
$batch_size = $self->job_limit;
}
return $batch_size;
$self->{'_batch_size'} = shift if(@_);
return $self->{'_batch_size'} || $self->analysis->stats->get_or_estimate_batch_size();
}
......@@ -504,7 +480,8 @@ sub run {
$self->db->dbc->disconnect_when_inactive(0);
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
my $min_batch_time = $self->analysis->stats->min_batch_time();
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
do { # Worker's lifespan loop (ends only when the worker dies)
my $batches_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
......@@ -518,7 +495,7 @@ sub run {
$self->cause_of_death('JOB_LIMIT');
} else { # a proper "BATCHES" loop
while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $MIN_BATCH_TIME) {
while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_worker_id( $self->dbID ) }) ) {
my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
......@@ -540,7 +517,7 @@ sub run {
}
# The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
# so they are not allowed to happen too frequently (not before $MIN_BATCH_TIME of work has been done)
# so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
#
$self->db->get_AnalysisStatsAdaptor->interval_update_work_done(
$self->analysis->dbID,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment