Commit c540fa9e authored by Leo Gordon's avatar Leo Gordon
Browse files

change of default behaviour: hive_capacity is now off by default (=NULL);...

change of default behaviour: hive_capacity is now off by default (=NULL); setting hive_capacity=0 or analysis_capacity=0 stops scheduling AND specialization to a particular analysis
parent c704cbc3
......@@ -47,7 +47,7 @@ sub new {
$self->can_be_empty($can_be_empty) if($can_be_empty);
$self->priority($priority) if($priority);
$self->meadow_type($meadow_type) if($meadow_type);
$self->analysis_capacity($analysis_capacity) if($analysis_capacity);
$self->analysis_capacity($analysis_capacity) if( defined($analysis_capacity) );
return $self;
}
......
......@@ -84,7 +84,6 @@ sub batch_size {
sub hive_capacity {
my $self = shift;
$self->{'_hive_capacity'} = shift if(@_);
$self->{'_hive_capacity'} = 1 unless(defined($self->{'_hive_capacity'}));
return $self->{'_hive_capacity'};
}
......@@ -292,8 +291,10 @@ sub job_count_breakout {
sub print_stats {
my $self = shift;
printf("%-27s(%2d) %11s jobs(Sem:%d, Rdy:%d, InProg:%d, Done+Pass:%d, Fail:%d)=%d Ave_msec:%d, worker[%d/%d] (sync'd %d sec ago)\n",
$self->get_analysis->logic_name,
my $analysis = $self->get_analysis;
printf("%-27s(%2d) %11s jobs(Sem:%d, Rdy:%d, InProg:%d, Done+Pass:%d, Fail:%d)=%d Ave_msec:%d, ",
$analysis->logic_name,
$self->analysis_id,
$self->status,
......@@ -305,11 +306,11 @@ sub print_stats {
$self->total_job_count,
$self->avg_msec_per_job,
$self->num_required_workers,
$self->hive_capacity,
$self->seconds_since_last_update,
);
print 'req.workers:'.$self->num_required_workers
.' h.cap:' .( defined($self->hive_capacity) ? $self->hive_capacity : '-' )
.' a.cap:' .( defined($analysis->analysis_capacity) ? $analysis->analysis_capacity : '-')
." (sync'd " .$self->seconds_since_last_update." sec ago)\n",
}
......
......@@ -183,7 +183,7 @@ sub update {
my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
$sql .= ",batch_size=" . $stats->batch_size();
$sql .= ",hive_capacity=" . $stats->hive_capacity();
$sql .= ",hive_capacity=" . (defined($stats->hive_capacity()) ? $stats->hive_capacity() : 'NULL');
$sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job();
$sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job();
......
......@@ -44,8 +44,11 @@ sub reached {
sub preliminary_offer {
my ($self, $slots_asked) = @_;
if( defined($self->available_capacity) and (my $multiplier = $self->multiplier) > 0.0 ) { # if multiplier is not positive, capacity stays unaffected
my $slots_available = int($self->available_capacity * $multiplier);
my $available_capacity = $self->available_capacity;
my $multiplier = $self->multiplier;
if( defined($available_capacity) and defined($multiplier) and ($multiplier >= 0.0) ) { # if multiplier is negative it is not limiting
my $slots_available = int($available_capacity * $multiplier);
return ($slots_available<$slots_asked) ? $slots_available : $slots_asked;
}
......@@ -57,9 +60,13 @@ sub preliminary_offer {
sub final_decision {
my ($self, $slots_agreed) = @_;
if( defined($self->available_capacity) and (my $multiplier = $self->multiplier) > 0.0 ) { # if multiplier is not positive, capacity stays unaffected
my $available_capacity = $self->available_capacity;
my $multiplier = $self->multiplier;
if( defined($available_capacity) and defined($multiplier) and ($multiplier > 0.0) ) { # if multiplier is not positive capacity stays unaffected
# and we should not arrive here if $multiplier==0
$self->available_capacity( $self->available_capacity - $slots_agreed/$multiplier );
$self->available_capacity( $available_capacity - $slots_agreed/$multiplier );
}
}
......
......@@ -144,6 +144,7 @@ sub pipeline_analyses {
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::Start',
-meadow_type=> 'LOCAL', # do not bother the farm with such a simple task (and get it done faster)
-parameters => {},
-analysis_capacity => 1, # use per-analysis limiter
-input_ids => [
{ 'a_multiplier' => $self->o('first_mult'), 'b_multiplier' => $self->o('second_mult') },
{ 'a_multiplier' => $self->o('second_mult'), 'b_multiplier' => $self->o('first_mult') },
......@@ -159,7 +160,6 @@ sub pipeline_analyses {
{ -logic_name => 'part_multiply',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply',
-parameters => {},
-hive_capacity => -1, # turn off the reciprocal limiter
-analysis_capacity => 4, # use per-analysis limiter
-input_ids => [
# (jobs for this analysis will be flown_into via branch-2 from 'start' jobs above)
......@@ -172,6 +172,7 @@ sub pipeline_analyses {
{ -logic_name => 'add_together',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::AddTogether',
-parameters => {},
# -analysis_capacity => 0, # this is a way to temporarily block a given analysis
-input_ids => [
# (jobs for this analysis will be flown_into via branch-1 from 'start' jobs above)
],
......
......@@ -573,21 +573,20 @@ sub synchronize_AnalysisStats {
return $analysisStats unless($analysisStats->analysis_id);
$analysisStats->refresh(); ## Need to get the new hive_capacity for dynamic analyses
my $hive_capacity = $analysisStats->hive_capacity;
my $analysis = $analysisStats->get_analysis();
my $scheduling_allowed = ( !defined( $analysisStats->hive_capacity ) or $analysisStats->hive_capacity )
&& ( !defined( $analysis->analysis_capacity ) or $analysis->analysis_capacity );
if($self->db->hive_use_triggers()) {
my $job_count = $analysisStats->ready_job_count();
my $required_workers = $hive_capacity && POSIX::ceil( $job_count / $analysisStats->get_or_estimate_batch_size() );
# adjust_stats_for_living_workers:
if($hive_capacity > 0) {
my $unfulfilled_capacity = $hive_capacity - $analysisStats->num_running_workers();
my $required_workers = $scheduling_allowed
&& ( POSIX::ceil( $job_count / $analysisStats->get_or_estimate_batch_size() )
- $analysisStats->num_running_workers() );
$required_workers = 0 if($required_workers < 0);
if($unfulfilled_capacity < $required_workers ) {
$required_workers = (0 < $unfulfilled_capacity) ? $unfulfilled_capacity : 0;
}
}
$analysisStats->num_required_workers( $required_workers );
} else {
......@@ -614,16 +613,11 @@ sub synchronize_AnalysisStats {
if($status eq 'READY') {
$analysisStats->ready_job_count($job_count);
my $required_workers = $hive_capacity && POSIX::ceil( $job_count / $analysisStats->get_or_estimate_batch_size() );
my $required_workers = $scheduling_allowed
&& ( POSIX::ceil( $job_count / $analysisStats->get_or_estimate_batch_size() )
- $analysisStats->num_running_workers() );
$required_workers = 0 if($required_workers < 0);
# adjust_stats_for_living_workers:
if($hive_capacity > 0) {
my $unfulfilled_capacity = $hive_capacity - $self->count_running_workers( $analysisStats->analysis_id() );
if($unfulfilled_capacity < $required_workers ) {
$required_workers = (0 < $unfulfilled_capacity) ? $unfulfilled_capacity : 0;
}
}
$analysisStats->num_required_workers( $required_workers );
} elsif($status eq 'SEMAPHORED') {
......@@ -696,6 +690,7 @@ sub get_hive_current_load {
FROM worker w
JOIN analysis_stats USING(analysis_id)
WHERE w.status!='DEAD'
AND hive_capacity IS NOT NULL
AND hive_capacity>0
};
my $sth = $self->prepare($sql);
......
......@@ -589,8 +589,11 @@ sub run {
);
}
# A mechanism whereby workers can be caused to exit even if they were doing fine:
#
# FIXME: The following check is not *completely* correct, as it assumes hive_capacity is "local" to the analysis:
if (!$self->cause_of_death
and defined($self->analysis->stats->hive_capacity)
and 0 <= $self->analysis->stats->hive_capacity
and $self->analysis->stats->hive_capacity < $self->analysis->stats->num_running_workers
) {
......
# make NULL to be the default valid value for hive_capacity :
ALTER TABLE analysis_stats MODIFY COLUMN hive_capacity int(10) DEFAULT NULL;
ALTER TABLE analysis_stats_monitor MODIFY COLUMN hive_capacity int(10) DEFAULT NULL;
......@@ -335,7 +335,7 @@ CREATE TABLE resource_description (
CREATE TABLE analysis_stats (
analysis_id int(10) unsigned NOT NULL,
batch_size int(10) DEFAULT 1 NOT NULL,
hive_capacity int(10) DEFAULT 1,
hive_capacity int(10) DEFAULT NULL,
status enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL,
total_job_count int(10) DEFAULT 0 NOT NULL,
......@@ -369,7 +369,7 @@ CREATE TABLE analysis_stats_monitor (
analysis_id int(10) unsigned NOT NULL,
batch_size int(10) DEFAULT 1 NOT NULL,
hive_capacity int(10) DEFAULT 1,
hive_capacity int(10) DEFAULT NULL,
status enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL,
total_job_count int(10) DEFAULT 0 NOT NULL,
......
......@@ -315,7 +315,7 @@ CREATE TABLE resource_description (
CREATE TABLE analysis_stats (
analysis_id INTEGER NOT NULL,
batch_size int(10) default 1 NOT NULL,
hive_capacity int(10) default 1,
hive_capacity int(10) default NULL,
status TEXT DEFAULT 'EMPTY' NOT NULL, /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, */
total_job_count int(10) NOT NULL DEFAULT 0,
......@@ -347,7 +347,7 @@ CREATE TABLE analysis_stats_monitor (
analysis_id INTEGER NOT NULL,
batch_size int(10) default 1 NOT NULL,
hive_capacity int(10) default 1,
hive_capacity int(10) default NULL,
status TEXT DEFAULT 'EMPTY' NOT NULL, /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, */
total_job_count int(10) NOT NULL DEFAULT 0,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment