Commit d68fe01b authored by Leo Gordon's avatar Leo Gordon
Browse files

added a new per-analysis "analysis_capacity" limiter for cases where users...

added a new per-analysis "analysis_capacity" limiter for cases where users want to limit analyses independently
parent 24bfdb58
...@@ -35,8 +35,8 @@ sub new { ...@@ -35,8 +35,8 @@ sub new {
my $self = $class->SUPER::new( @_ ); # deal with Storable stuff my $self = $class->SUPER::new( @_ ); # deal with Storable stuff
my ($logic_name, $module, $parameters, $resource_class_id, $failed_job_tolerance, $max_retry_count, $can_be_empty, $priority, $meadow_type) = my ($logic_name, $module, $parameters, $resource_class_id, $failed_job_tolerance, $max_retry_count, $can_be_empty, $priority, $meadow_type, $analysis_capacity) =
rearrange([qw(logic_name module parameters resource_class_id failed_job_tolerance max_retry_count can_be_empty priority meadow_type) ], @_); rearrange([qw(logic_name module parameters resource_class_id failed_job_tolerance max_retry_count can_be_empty priority meadow_type analysis_capacity) ], @_);
$self->logic_name($logic_name) if($logic_name); $self->logic_name($logic_name) if($logic_name);
$self->module($module) if($module); $self->module($module) if($module);
...@@ -47,6 +47,7 @@ sub new { ...@@ -47,6 +47,7 @@ sub new {
$self->can_be_empty($can_be_empty) if($can_be_empty); $self->can_be_empty($can_be_empty) if($can_be_empty);
$self->priority($priority) if($priority); $self->priority($priority) if($priority);
$self->meadow_type($meadow_type) if($meadow_type); $self->meadow_type($meadow_type) if($meadow_type);
$self->analysis_capacity($analysis_capacity) if($analysis_capacity);
return $self; return $self;
} }
...@@ -119,6 +120,13 @@ sub meadow_type { ...@@ -119,6 +120,13 @@ sub meadow_type {
} }
sub analysis_capacity {
my $self = shift;
$self->{'_analysis_capacity'} = shift if(@_);
return $self->{'_analysis_capacity'};
}
=head2 process =head2 process
Arg [1] : none Arg [1] : none
......
...@@ -380,8 +380,10 @@ sub run { ...@@ -380,8 +380,10 @@ sub run {
my %seen_logic_name = (); my %seen_logic_name = ();
foreach my $aha (@{$self->pipeline_analyses}) { foreach my $aha (@{$self->pipeline_analyses}) {
my ($logic_name, $module, $parameters_hash, $input_ids, $blocked, $batch_size, $hive_capacity, $failed_job_tolerance, $max_retry_count, $can_be_empty, $rc_id, $rc_name, $priority, $meadow_type) = my ($logic_name, $module, $parameters_hash, $input_ids, $blocked, $batch_size, $hive_capacity, $failed_job_tolerance,
rearrange([qw(logic_name module parameters input_ids blocked batch_size hive_capacity failed_job_tolerance max_retry_count can_be_empty rc_id rc_name priority meadow_type)], %$aha); $max_retry_count, $can_be_empty, $rc_id, $rc_name, $priority, $meadow_type, $analysis_capacity)
= rearrange([qw(logic_name module parameters input_ids blocked batch_size hive_capacity failed_job_tolerance
max_retry_count can_be_empty rc_id rc_name priority meadow_type analysis_capacity)], %$aha);
unless($logic_name) { unless($logic_name) {
die "logic_name' must be defined in every analysis"; die "logic_name' must be defined in every analysis";
...@@ -425,6 +427,7 @@ sub run { ...@@ -425,6 +427,7 @@ sub run {
-can_be_empty => $can_be_empty, -can_be_empty => $can_be_empty,
-priority => $priority, -priority => $priority,
-meadow_type => $meadow_type, -meadow_type => $meadow_type,
-analysis_capacity => $analysis_capacity,
); );
$analysis_adaptor->store($analysis); $analysis_adaptor->store($analysis);
......
...@@ -159,7 +159,8 @@ sub pipeline_analyses { ...@@ -159,7 +159,8 @@ sub pipeline_analyses {
{ -logic_name => 'part_multiply', { -logic_name => 'part_multiply',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply', -module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply',
-parameters => {}, -parameters => {},
-hive_capacity => 8, -hive_capacity => -1, # turn off the reciprocal limiter
-analysis_capacity => 4, # use per-analysis limiter
-input_ids => [ -input_ids => [
# (jobs for this analysis will be flown_into via branch-2 from 'start' jobs above) # (jobs for this analysis will be flown_into via branch-2 from 'start' jobs above)
], ],
......
...@@ -108,7 +108,7 @@ sub schedule_workers { ...@@ -108,7 +108,7 @@ sub schedule_workers {
my %meadow_capacity = map { $_ => Bio::EnsEMBL::Hive::Limiter->new( $available_worker_slots_by_meadow_type->{$_} ) } keys %$available_worker_slots_by_meadow_type; my %meadow_capacity = map { $_ => Bio::EnsEMBL::Hive::Limiter->new( $available_worker_slots_by_meadow_type->{$_} ) } keys %$available_worker_slots_by_meadow_type;
foreach my $analysis_stats (@suitable_analyses) { foreach my $analysis_stats (@suitable_analyses) {
last if( $submit_capacity->reached or $queen_capacity->reached); last if( $submit_capacity->reached );
my $analysis = $analysis_stats->get_analysis; # FIXME: if it proves too expensive we may need to consider caching my $analysis = $analysis_stats->get_analysis; # FIXME: if it proves too expensive we may need to consider caching
my $this_meadow_type = $analysis->meadow_type || $default_meadow_type; my $this_meadow_type = $analysis->meadow_type || $default_meadow_type;
...@@ -127,19 +127,25 @@ sub schedule_workers { ...@@ -127,19 +127,25 @@ sub schedule_workers {
# setting up all negotiating limiters: # setting up all negotiating limiters:
$queen_capacity->multiplier( $analysis_stats->hive_capacity ); $queen_capacity->multiplier( $analysis_stats->hive_capacity );
my @limiters = (
$submit_capacity,
$queen_capacity,
$meadow_capacity{$this_meadow_type},
defined($analysis->analysis_capacity) ? Bio::EnsEMBL::Hive::Limiter->new( $analysis->analysis_capacity ) : (),
);
# negotiations: # negotiations:
$workers_this_analysis = $submit_capacity->preliminary_offer( $workers_this_analysis ); foreach my $limiter (@limiters) {
$workers_this_analysis = $queen_capacity->preliminary_offer( $workers_this_analysis ); $workers_this_analysis = $limiter->preliminary_offer( $workers_this_analysis );
$workers_this_analysis = $meadow_capacity{$this_meadow_type}->preliminary_offer( $workers_this_analysis ); }
# do not continue with this analysis if haven't agreed on a positive number: # do not continue with this analysis if limiters haven't agreed on a positive number:
next unless($workers_this_analysis); next unless($workers_this_analysis);
# let all parties know the final decision of negotiations: # let all parties know the final decision of negotiations:
$submit_capacity->final_decision( $workers_this_analysis ); foreach my $limiter (@limiters) {
$queen_capacity->final_decision( $workers_this_analysis ); $limiter->final_decision( $workers_this_analysis );
$meadow_capacity{$this_meadow_type}->final_decision( $workers_this_analysis ); }
my $this_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id }; my $this_rc_name = $analysis_id2rc_name->{ $analysis_stats->analysis_id };
$workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $workers_this_analysis; $workers_to_submit_by_meadow_type_rc_name{ $this_meadow_type }{ $this_rc_name } += $workers_this_analysis;
......
# a new limiter column for the scheduler:
ALTER TABLE analysis_base ADD COLUMN analysis_capacity int(10) DEFAULT NULL;
# allow NULL to be a valid value for hive_capacity (however not yet default) :
ALTER TABLE analysis_stats MODIFY COLUMN hive_capacity int(10) DEFAULT 1;
ALTER TABLE analysis_stats_monitor MODIFY COLUMN hive_capacity int(10) DEFAULT 1;
...@@ -51,6 +51,7 @@ CREATE TABLE analysis_base ( ...@@ -51,6 +51,7 @@ CREATE TABLE analysis_base (
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL, can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL, priority TINYINT DEFAULT 0 NOT NULL,
meadow_type varchar(40) DEFAULT NULL, meadow_type varchar(40) DEFAULT NULL,
analysis_capacity int(10) DEFAULT NULL,
PRIMARY KEY (analysis_id), PRIMARY KEY (analysis_id),
UNIQUE KEY logic_name_idx (logic_name) UNIQUE KEY logic_name_idx (logic_name)
...@@ -334,7 +335,7 @@ CREATE TABLE resource_description ( ...@@ -334,7 +335,7 @@ CREATE TABLE resource_description (
CREATE TABLE analysis_stats ( CREATE TABLE analysis_stats (
analysis_id int(10) unsigned NOT NULL, analysis_id int(10) unsigned NOT NULL,
batch_size int(10) DEFAULT 1 NOT NULL, batch_size int(10) DEFAULT 1 NOT NULL,
hive_capacity int(10) DEFAULT 1 NOT NULL, hive_capacity int(10) DEFAULT 1,
status enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, status enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL,
total_job_count int(10) DEFAULT 0 NOT NULL, total_job_count int(10) DEFAULT 0 NOT NULL,
...@@ -368,7 +369,7 @@ CREATE TABLE analysis_stats_monitor ( ...@@ -368,7 +369,7 @@ CREATE TABLE analysis_stats_monitor (
analysis_id int(10) unsigned NOT NULL, analysis_id int(10) unsigned NOT NULL,
batch_size int(10) DEFAULT 1 NOT NULL, batch_size int(10) DEFAULT 1 NOT NULL,
hive_capacity int(10) DEFAULT 1 NOT NULL, hive_capacity int(10) DEFAULT 1,
status enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, status enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL,
total_job_count int(10) DEFAULT 0 NOT NULL, total_job_count int(10) DEFAULT 0 NOT NULL,
......
...@@ -44,7 +44,9 @@ CREATE TABLE IF NOT EXISTS analysis_base ( ...@@ -44,7 +44,9 @@ CREATE TABLE IF NOT EXISTS analysis_base (
failed_job_tolerance int(10) NOT NULL DEFAULT 0, failed_job_tolerance int(10) NOT NULL DEFAULT 0,
max_retry_count int(10) default 3 NOT NULL, max_retry_count int(10) default 3 NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL, can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL priority TINYINT DEFAULT 0 NOT NULL,
meadow_type varchar(40) DEFAULT NULL,
analysis_capacity int(10) DEFAULT NULL
); );
CREATE UNIQUE INDEX IF NOT EXISTS logic_name_idx ON analysis_base (logic_name); CREATE UNIQUE INDEX IF NOT EXISTS logic_name_idx ON analysis_base (logic_name);
...@@ -313,7 +315,7 @@ CREATE TABLE resource_description ( ...@@ -313,7 +315,7 @@ CREATE TABLE resource_description (
CREATE TABLE analysis_stats ( CREATE TABLE analysis_stats (
analysis_id INTEGER NOT NULL, analysis_id INTEGER NOT NULL,
batch_size int(10) default 1 NOT NULL, batch_size int(10) default 1 NOT NULL,
hive_capacity int(10) default 1 NOT NULL, hive_capacity int(10) default 1,
status TEXT DEFAULT 'EMPTY' NOT NULL, /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, */ status TEXT DEFAULT 'EMPTY' NOT NULL, /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, */
total_job_count int(10) NOT NULL DEFAULT 0, total_job_count int(10) NOT NULL DEFAULT 0,
...@@ -345,7 +347,7 @@ CREATE TABLE analysis_stats_monitor ( ...@@ -345,7 +347,7 @@ CREATE TABLE analysis_stats_monitor (
analysis_id INTEGER NOT NULL, analysis_id INTEGER NOT NULL,
batch_size int(10) default 1 NOT NULL, batch_size int(10) default 1 NOT NULL,
hive_capacity int(10) default 1 NOT NULL, hive_capacity int(10) default 1,
status TEXT DEFAULT 'EMPTY' NOT NULL, /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, */ status TEXT DEFAULT 'EMPTY' NOT NULL, /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, */
total_job_count int(10) NOT NULL DEFAULT 0, total_job_count int(10) NOT NULL DEFAULT 0,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment