Commit 6a568379 authored by Matthieu Muffato's avatar Matthieu Muffato
Browse files

Moved hive_capacity to analysis_base

parent ac5a7081
......@@ -145,6 +145,12 @@ sub analysis_capacity {
return $self->{'_analysis_capacity'};
}
sub hive_capacity {
my $self = shift;
$self->{'_hive_capacity'} = shift if(@_);
return $self->{'_hive_capacity'};
}
sub get_compiled_module_name {
my $self = shift;
......@@ -219,12 +225,6 @@ sub batch_size {
return $self->stats->batch_size(@_);
}
sub hive_capacity {
my $self = shift @_;
return $self->stats->hive_capacity(@_);
}
# ------------------------------------------------------------------------------------------------------------------------------
......
......@@ -78,12 +78,6 @@ sub batch_size {
return $self->{'_batch_size'};
}
sub hive_capacity {
my $self = shift;
$self->{'_hive_capacity'} = shift if(@_);
return $self->{'_hive_capacity'};
}
sub status {
my $self = shift;
$self->{'_status'} = shift if(@_);
......@@ -262,7 +256,7 @@ sub estimate_num_required_workers { # this 'max allowed' total includes the
my $num_required_workers = $self->ready_job_count + $remaining_job_count; # this 'max' estimation can still be zero
my $h_cap = $self->hive_capacity;
my $h_cap = $self->analysis->hive_capacity;
if( defined($h_cap) and $h_cap>=0) { # what is the currently attainable maximum defined via hive_capacity?
my $hive_current_load = $self->hive_pipeline->get_cached_hive_current_load();
my $h_max = $self->num_running_workers + POSIX::floor( $h_cap * ( 1.0 - $hive_current_load ) );
......@@ -401,7 +395,7 @@ sub toString {
$self->num_running_workers,
$self->estimate_num_required_workers,
);
$output .= ' h.cap:' .( $self->hive_capacity // '-' )
$output .= ' h.cap:' .( $analysis->hive_capacity // '-' )
.' a.cap:' .( $analysis->analysis_capacity // '-')
." (sync'd " .($self->seconds_since_when_updated // 0)." sec ago)";
......
......@@ -88,7 +88,7 @@ sub get_hive_current_load {
my $sql = qq{
SELECT sum(1/hive_capacity)
FROM role
JOIN analysis_stats USING(analysis_id)
JOIN analysis_base USING(analysis_id)
WHERE when_finished IS NULL
AND hive_capacity IS NOT NULL
AND hive_capacity>0
......
......@@ -523,13 +523,13 @@ sub add_objects_from_config {
'priority' => $priority,
'meadow_type' => $meadow_type,
'analysis_capacity' => $analysis_capacity,
'hive_capacity' => $hive_capacity,
);
$analysis->get_compiled_module_name(); # check if it compiles and is named correctly
($stats) = $pipeline->add_new_or_update( 'AnalysisStats', # NB: add_new_or_update returns a list
'analysis' => $analysis,
'batch_size' => $batch_size,
'hive_capacity' => $hive_capacity,
'status' => $blocked ? 'BLOCKED' : 'EMPTY', # be careful, as this "soft" way of blocking may be accidentally unblocked by deep sync
'total_job_count' => 0,
'semaphored_job_count' => 0,
......
......@@ -628,7 +628,7 @@ sub synchronize_AnalysisStats {
if( $stats and $stats->analysis_id ) {
$stats->refresh() unless $has_refresh_just_been_done; ## Need to get the new hive_capacity for dynamic analyses
$stats->refresh() unless $has_refresh_just_been_done;
my $job_counts = $stats->hive_pipeline->hive_use_triggers() ? undef : $self->db->get_AnalysisJobAdaptor->fetch_job_counts_hashed_by_status( $stats->analysis_id );
......
......@@ -259,7 +259,7 @@ sub schedule_workers {
$total_extra_workers_required += $extra_workers_this_analysis; # also keep the total number required so far (if nothing required we may need a resync later)
# setting up all negotiating limiters:
$queen_capacity_limiter->multiplier( $analysis_stats->hive_capacity );
$queen_capacity_limiter->multiplier( $analysis->hive_capacity );
my @limiters = (
$submit_capacity_limiter,
$queen_capacity_limiter,
......
......@@ -565,7 +565,7 @@ sub run {
if ( $stats->refresh($self->refresh_tolerance_seconds) ) { # if we DID refresh
$self->adaptor->db->get_AnalysisAdaptor->refresh( $analysis );
$stats->hive_pipeline->invalidate_hive_current_load;
if( defined($stats->hive_capacity) && (0 <= $stats->hive_capacity) && ($stats->hive_pipeline->get_cached_hive_current_load >= 1.1)
if( defined($analysis->hive_capacity) && (0 <= $analysis->hive_capacity) && ($stats->hive_pipeline->get_cached_hive_current_load >= 1.1)
or defined($analysis->analysis_capacity) && (0 <= $analysis->analysis_capacity) && ($analysis->analysis_capacity < $stats->num_running_workers)
) {
$self->cause_of_death('HIVE_OVERLOAD');
......
-- ---------------------------------------------------------------------------------------------------
SET @expected_version = 86;
-- make MySQL stop immediately after it encounters division by zero:
SET SESSION sql_mode='TRADITIONAL';
-- warn that we detected the schema version mismatch:
SELECT CONCAT( 'The patch only applies to schema version ',
@expected_version,
', but the current schema version is ',
meta_value,
', so skipping the rest.') AS ''
FROM hive_meta WHERE meta_key='hive_sql_schema_version' AND meta_value<>@expected_version;
-- cause division by zero only if current version differs from the expected one:
INSERT INTO hive_meta (meta_key, meta_value)
SELECT 'this_should_never_be_inserted', 1 FROM hive_meta WHERE NOT 1/(meta_key<>'hive_sql_schema_version' OR meta_value=@expected_version);
SELECT CONCAT( 'The patch seems to be compatible with schema version ',
@expected_version,
', applying the patch...') AS '';
-- Now undo the change so that we could patch potentially non-TRADITIONAL schema:
SET SESSION sql_mode='';
-- ----------------------------------<actual_patch> -------------------------------------------------
ALTER TABLE analysis_base ADD COLUMN hive_capacity INTEGER DEFAULT NULL;
UPDATE analysis_base JOIN analysis_stats USING (analysis_id) SET analysis_base.hive_capacity = analysis_stats.hive_capacity;
ALTER TABLE analysis_stats DROP COLUMN hive_capacity;
ALTER TABLE analysis_stats_monitor DROP COLUMN hive_capacity;
-- ----------------------------------</actual_patch> -------------------------------------------------
-- increase the schema version by one and register the patch:
UPDATE hive_meta SET meta_value=meta_value+1 WHERE meta_key='hive_sql_schema_version';
-- ---------------------------------------------------------------------------------------------------
\set expected_version 86
\set ON_ERROR_STOP on
-- warn that we detected the schema version mismatch:
SELECT ('The patch only applies to schema version '
|| CAST(:expected_version AS VARCHAR)
|| ', but the current schema version is '
|| meta_value
|| ', so skipping the rest.') as incompatible_msg
FROM hive_meta WHERE meta_key='hive_sql_schema_version' AND meta_value!=CAST(:expected_version AS VARCHAR);
-- cause division by zero only if current version differs from the expected one:
INSERT INTO hive_meta (meta_key, meta_value)
SELECT 'this_should_never_be_inserted', 1 FROM hive_meta WHERE 1 != 1/CAST( (meta_key!='hive_sql_schema_version' OR meta_value=CAST(:expected_version AS VARCHAR)) AS INTEGER );
SELECT ('The patch seems to be compatible with schema version '
|| CAST(:expected_version AS VARCHAR)
|| ', applying the patch...') AS compatible_msg;
-- ----------------------------------<actual_patch> -------------------------------------------------
ALTER TABLE analysis_base ADD COLUMN hive_capacity INTEGER DEFAULT NULL;
UPDATE analysis_base SET hive_capacity = analysis_stats.hive_capacity FROM analysis_stats WHERE analysis_base.analysis_id = analysis_stats.analysis_id;
ALTER TABLE analysis_stats DROP COLUMN hive_capacity;
ALTER TABLE analysis_stats_monitor DROP COLUMN hive_capacity;
-- ----------------------------------</actual_patch> -------------------------------------------------
-- increase the schema version by one and register the patch:
UPDATE hive_meta SET meta_value= (CAST(meta_value AS INTEGER) + 1) WHERE meta_key='hive_sql_schema_version';
INSERT INTO hive_meta (meta_key, meta_value) SELECT 'patched_to_' || meta_value, CURRENT_TIMESTAMP FROM hive_meta WHERE meta_key = 'hive_sql_schema_version';
......@@ -98,6 +98,7 @@ CREATE TABLE pipeline_wide_parameters (
@column priority an Analysis with higher priority will be more likely chosen on Worker's specialization
@column meadow_type if defined, forces this Analysis to be run only on the given Meadow
@column analysis_capacity if defined, limits the number of Workers of this particular Analysis that are allowed to run in parallel
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
*/
CREATE TABLE analysis_base (
......@@ -113,6 +114,7 @@ CREATE TABLE analysis_base (
priority SMALLINT NOT NULL DEFAULT 0,
meadow_type VARCHAR(255) DEFAULT NULL,
analysis_capacity INTEGER DEFAULT NULL,
hive_capacity INTEGER DEFAULT NULL,
UNIQUE KEY logic_name_idx (logic_name)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......@@ -130,7 +132,6 @@ CREATE TABLE analysis_base (
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -154,7 +155,6 @@ CREATE TABLE analysis_base (
CREATE TABLE analysis_stats (
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
hive_capacity INTEGER DEFAULT NULL,
status ENUM('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL,
total_job_count INTEGER NOT NULL DEFAULT 0,
......@@ -653,7 +653,6 @@ CREATE TABLE log_message (
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -679,7 +678,6 @@ CREATE TABLE analysis_stats_monitor (
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
hive_capacity INTEGER DEFAULT NULL,
status ENUM('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL,
total_job_count INTEGER NOT NULL DEFAULT 0,
......
......@@ -100,6 +100,7 @@ CREATE INDEX ON pipeline_wide_parameters (param_value);
@column priority an Analysis with higher priority will be more likely chosen on Worker's specialization
@column meadow_type if defined, forces this Analysis to be run only on the given Meadow
@column analysis_capacity if defined, limits the number of Workers of this particular Analysis that are allowed to run in parallel
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
*/
CREATE TABLE analysis_base (
......@@ -115,6 +116,7 @@ CREATE TABLE analysis_base (
priority SMALLINT NOT NULL DEFAULT 0,
meadow_type VARCHAR(255) DEFAULT NULL,
analysis_capacity INTEGER DEFAULT NULL,
hive_capacity INTEGER DEFAULT NULL,
UNIQUE (logic_name)
);
......@@ -132,7 +134,6 @@ CREATE TABLE analysis_base (
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -157,7 +158,6 @@ CREATE TYPE analysis_status AS ENUM ('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY',
CREATE TABLE analysis_stats (
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
hive_capacity INTEGER DEFAULT NULL,
status analysis_status NOT NULL DEFAULT 'EMPTY',
total_job_count INTEGER NOT NULL DEFAULT 0,
......@@ -668,7 +668,6 @@ CREATE INDEX ON log_message (message_class);
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -694,7 +693,6 @@ CREATE TABLE analysis_stats_monitor (
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
hive_capacity INTEGER DEFAULT NULL,
status analysis_status NOT NULL DEFAULT 'EMPTY',
total_job_count INTEGER NOT NULL DEFAULT 0,
......
......@@ -100,6 +100,7 @@ CREATE INDEX pipeline_wide_parameters_value_idx ON pipeline_wide_para
@column priority an Analysis with higher priority will be more likely chosen on Worker's specialization
@column meadow_type if defined, forces this Analysis to be run only on the given Meadow
@column analysis_capacity if defined, limits the number of Workers of this particular Analysis that are allowed to run in parallel
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
*/
CREATE TABLE analysis_base (
......@@ -114,7 +115,8 @@ CREATE TABLE analysis_base (
can_be_empty SMALLINT NOT NULL DEFAULT 0,
priority SMALLINT NOT NULL DEFAULT 0,
meadow_type VARCHAR(255) DEFAULT NULL,
analysis_capacity INTEGER DEFAULT NULL
analysis_capacity INTEGER DEFAULT NULL,
hive_capacity INTEGER DEFAULT NULL
);
CREATE UNIQUE INDEX analysis_base_logic_name_idx ON analysis_base (logic_name);
......@@ -131,7 +133,6 @@ CREATE UNIQUE INDEX analysis_base_logic_name_idx ON analysis_base (logic_name);
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -155,7 +156,6 @@ CREATE UNIQUE INDEX analysis_base_logic_name_idx ON analysis_base (logic_name);
CREATE TABLE analysis_stats (
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
hive_capacity INTEGER DEFAULT NULL,
status TEXT NOT NULL DEFAULT 'EMPTY', /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, */
total_job_count INTEGER NOT NULL DEFAULT 0,
......@@ -646,7 +646,6 @@ CREATE INDEX message_class_idx ON log_message (message_class);
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -672,7 +671,6 @@ CREATE TABLE analysis_stats_monitor (
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
hive_capacity INTEGER DEFAULT NULL,
status TEXT NOT NULL DEFAULT 'EMPTY', /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, */
total_job_count INTEGER NOT NULL DEFAULT 0,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment