Commit 85d85982 authored by Matthieu Muffato's avatar Matthieu Muffato
Browse files

Temporary commit (not fully tested): consider the job runtime to follow a...

Temporary commit (not fully tested): consider the job runtime to follow a log-normal distribution, and take into account active jobs
parent 05088da1
......@@ -39,6 +39,7 @@ package Bio::EnsEMBL::Hive::AnalysisStats;
use strict;
use warnings;
use List::Util 'sum';
use Math::Trig;
use POSIX;
use Term::ANSIColor;
......@@ -89,6 +90,13 @@ sub max_lock_sec {
}
# How much time the worker will be pending. Queen will use this value when
# there is no empirical data
sub default_pending_time_sec {
return 30;
}
=head1 AUTOLOADED
analysis_id / analysis
......@@ -305,6 +313,8 @@ sub estimate_num_required_workers { # this doesn't count the workers that ar
# NOTE: This is only greater than zero when jobs are quicker than $self->min_worker_runtime
my $jobs_for_running_workers = $self->num_running_workers * ($num_jobs_per_worker - 1);
# FIXME add the concept of startup cost in seconds and target worker lifespan
# NOTE: This means 1 worker per job when jobs_for_running_workers = 0 (no running workers or jobs take more than min_worker_runtime)
my $num_required_workers = POSIX::ceil( ($jobs_to_do - $jobs_for_running_workers) / $num_jobs_per_worker);
# NOTE: Since it can be negative when there are fewer jobs to do than what we have earmarked for running workers,
......@@ -358,37 +368,77 @@ sub estimate_num_required_workers { # this doesn't count the workers that ar
}
# avg_msec_per_job, or _estimate_avg_msec_per_job, or min_job_runtime_msec
# avg_msec_per_job, or _estimate_avg_msec_per_job_from_active_roles, or min_job_runtime_msec
sub get_or_estimate_avg_msec_per_job {
my $self = shift;
# Cached value
return $self->{'_estimated_avg_msec_per_job'} if exists $self->{'_estimated_avg_msec_per_job'};
my $weight_factor = 3; # makes it more sensitive to the dynamics of the farm
my $avg_msec_per_job = $self->avg_msec_per_job;
my $job_count = $self->done_job_count;
my $tot_msec_of_jobs = $avg_msec_per_job * $job_count;
my $est_a = $self->_estimate_avg_msec_per_job_from_active_roles;
if ($est_a) {
$tot_msec_of_jobs += $est_a->[0];
$job_count += $est_a->[1];
}
unless ($self->avg_msec_per_job) {
unless (exists $self->{'_estimated_avg_msec_per_job'}) {
$self->{'_estimated_avg_msec_per_job'} = $self->_estimate_avg_msec_per_job;
$self->{'_estimated_avg_msec_per_job'} = $self->_estimate_avg_msec_per_job_from_active_roles;
}
return $self->{'_estimated_avg_msec_per_job'} // $self->min_job_runtime_msec;
}
return $self->avg_msec_per_job;
}
sub _estimate_avg_msec_per_job {
sub _estimate_avg_msec_per_job_from_active_roles {
my $self = shift;
return unless $self->num_running_workers;
# No role has yet been finalized otherwise avg_msec_per_job would be set
my $active_roles = $self->adaptor->db->get_RoleAdaptor->fetch_all( 'when_finished IS NULL AND analysis_id=' . $self->analysis_id );
my $active_roles = $self->adaptor->db->get_RoleAdaptor->fetch_all( 'when_finished IS NULL AND seconds_since_when_started > 0 AND analysis_id=' . $self->analysis_id );
return unless @$active_roles;
my $tot_lifespan_secs = sum( map {$_->seconds_since_when_started} @$active_roles );
return unless $tot_lifespan_secs;;
# The distribution is log-normal
my $sum = 0;
my $n = 0;
foreach my $role (@$active_roles) {
$sum += log($role->seconds_since_when_started);
$n += $role->attempted_jobs + 0.5
}
my $avg_msec_per_job = 100 * exp($sum / $n);
return $avg_msec_per_job;
}
# Active roles are currently doing a job, which is not yet counted in attempted_jobs
my $tot_job_attempts = sum( map {$_->attempted_jobs+1} @$active_roles );
sub _reestimate_avg_msec_per_job_from_active_roles {
my $self = shift;
return 1000 * $tot_lifespan_secs / $tot_job_attempts;
}
# FIXME default return values
return unless $self->num_running_workers;
# No role has yet been finalized otherwise avg_msec_per_job would be set
my $active_roles = $self->adaptor->db->get_RoleAdaptor->fetch_all( 'when_finished IS NULL AND seconds_since_when_started > 0 AND analysis_id=' . $self->analysis_id );
return unless @$active_roles;
# The distribution is log-normal
my $sum = 0;
my $n = 0;
foreach my $role (@$active_roles) {
$sum += log(1000 * $role->seconds_since_when_started + $role->_estimate_current_job_runtime_msec($self));
$n += $role->attempted_jobs + 1
}
my $curr_avg_msec_per_job = exp($sum / $n);
my $weight_factor = 3;
my $avg_msec_per_job = ($self->avg_msec_per_job * $self->done_job_count / $weight_factor + $curr_avg_msec_per_job)
/ ($self->done_job_count / $weight_factor + $n);
return $avg_msec_per_job;
}
sub get_job_throughput {
my $self = shift;
......@@ -520,6 +570,8 @@ sub _toString_fields {
# We don't want to default to min_job_runtime_msec
my ($avg_runtime, $avg_runtime_unit);
# FIXME: can these be joined ?
# This is to avoid the default min_job_runtime_msec
$self->get_or_estimate_avg_msec_per_job;
if ($self->avg_msec_per_job) {
($avg_runtime, $avg_runtime_unit) = $self->friendly_avg_job_runtime($self->avg_msec_per_job);
......
......@@ -522,6 +522,8 @@ sub add_objects_from_config {
$parameters_hash ||= {}; # in case nothing was given
die "'-parameters' has to be a hash" unless(ref($parameters_hash) eq 'HASH');
$batch_size = 0;
($analysis) = $pipeline->add_new_or_update( 'Analysis', $self->o('hive_debug_init'), # NB: add_new_or_update returns a list
'logic_name' => $logic_name,
'module' => $module,
......
......@@ -458,6 +458,26 @@ sub get_submitted_worker_counts_by_meadow_type_rc_name_for_meadow_user {
return \%counts_by_meadow_type_rc_name;
}
sub get_average_pending_time_by_resource_class {
my ($self, $resource_class_id) = @_;
my $timestamp_diff_seconds = $self->dbc->_interval_seconds_sql('when_submitted', 'IFNULL(when_born,CURRENT_TIMESTAMP)');
my $sql = qq{
SELECT AVG($timestamp_diff_seconds)
FROM worker
WHERE when_submitted IS NOT NULL
AND when_born IS NOT NULL
AND resource_class_id IS NOT NULL AND resource_class_id = ?
};
my $sth = $self->prepare($sql);
$sth->execute($resource_class_id);
my ($avg_pending_time) = $sth->fetchrow_array();
$sth->finish;
return $avg_pending_time;
}
sub check_for_dead_workers { # scans the whole Valley for lost Workers (but ignores unreachable ones)
my ($self, $valley, $check_buried_in_haste, $bury_unkwn_workers) = @_;
......@@ -737,6 +757,8 @@ sub synchronize_hive {
}
# FIXME: These two synchronization methods would fit better in AnalysisStats itself
=head2 safe_synchronize_AnalysisStats
Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
......
......@@ -39,6 +39,10 @@ package Bio::EnsEMBL::Hive::Role;
use strict;
use warnings;
use POSIX;
use Math::Gauss;
use base ( 'Bio::EnsEMBL::Hive::Storable' );
......@@ -85,6 +89,67 @@ sub done_jobs {
}
# Estimate how much time is needed for the current job
sub _estimate_current_job_runtime_msec {
my $self = shift;
my $analysis_stats = shift;
# Guaranteed to be non-zero
my $avg_msec_per_job = $analysis_stats->avg_msec_per_job // $analysis_stats->min_job_runtime_msec;
# Will hold how time has been spent on the current job
my $runtime_msec_of_current_job = $self->seconds_since_when_started * 1000;
if ($self->attempted_jobs) {
# The role has already attempted several jobs. We need to subtract that
$runtime_msec_of_current_job -= $avg_msec_per_job * $self->attempted_jobs;
# Can be negative if the jobs were faster than the average
$runtime_msec_of_current_job = 0 if $runtime_msec_of_current_job < 0;
}
# The job hasn't started yet
return $avg_msec_per_job unless $runtime_msec_of_current_job;
# Job runtimes are modelled by a log-normal distribution
# Empirical data show that sigma itself follows a log-normal distribution (-0.04219451,0.62547091) so we take the median
my $sigma = exp(-0.04219451);
# $avg_msec_per_job is the average job runtime
my $mu = log($avg_msec_per_job) - $sigma**2/2;
# This is E[X | X > $runtime_msec_of_current_job] * P(X > $runtime_msec_of_current_job)
# (see https://en.wikipedia.org/wiki/Log-normal_distribution#Partial_expectation)
my $k = $runtime_msec_of_current_job;
my $ecp = $avg_msec_per_job * cdf(($mu + $sigma**2 - log($k)) / $sigma);
# And this is is P(X > $runtime_msec_of_current_job) using:
# # P(X > $runtime_msec_of_current_job) = 1 - P(X <= $runtime_msec_of_current_job)
# # https://en.wikipedia.org/wiki/Log-normal_distribution#Cumulative_distribution_function
# # the symmetry of cdf around 0, i.e. cdf(x) + cdf(-x) = 1
my $cp = cdf(-(log($k) - $mu) / $sigma);
my $remaining_runtime_msec_of_current_job = $ecp / $cp;
return $remaining_runtime_msec_of_current_job;
}
# Estimate how many jobs extra jobs this role can take in the next $interval_msec
sub _estimate_number_job_attempts {
my $self = shift;
my $analysis_stats = shift;
my $interval_msec = shift;
# Guaranteed to be non-zero
my $avg_msec_per_job = $analysis_stats->avg_msec_per_job // $analysis_stats->min_job_runtime_msec;
my $remaining_runtime_msec_of_current_job = $self->_estimate_current_job_runtime_msec($analysis_stats);
if ($remaining_runtime_msec_of_current_job > $interval_msec) {
return 0;
} else {
# We acknowledge that the current job will take a bit longer, and
# we fall back to $avg_msec_per_job for the rest of the interval
return POSIX::ceil(($interval_msec - $remaining_runtime_msec_of_current_job) / $avg_msec_per_job);
}
}
sub register_attempt {
my $self = shift;
my $success = shift;
......
......@@ -522,6 +522,7 @@ CREATE TABLE worker (
temp_directory_name VARCHAR(255) DEFAULT NULL,
log_dir VARCHAR(255) DEFAULT NULL,
KEY resource_class_id (resource_class_id),
KEY meadow_process (meadow_type, meadow_name, process_id)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......@@ -593,7 +594,7 @@ CREATE TABLE role (
done_jobs INTEGER NOT NULL DEFAULT 0,
KEY worker (worker_id),
KEY analysis (analysis_id)
KEY analysis_when_finished (analysis_id,when_finished)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......@@ -728,7 +729,9 @@ CREATE TABLE analysis_stats_monitor (
when_updated TIMESTAMP NULL, -- mysql's special for "TIMESTAMP DEFAULT NULL"
sync_lock SMALLINT NOT NULL DEFAULT 0,
is_excluded SMALLINT NOT NULL DEFAULT 0
is_excluded SMALLINT NOT NULL DEFAULT 0,
KEY analysis_sync_lock (analysis_id, sync_lock, when_logged)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment