Commit 4638d391 authored by Matthieu Muffato's avatar Matthieu Muffato
Browse files

Moved batch_size to analysis_base

parent 6a568379
......@@ -151,6 +151,12 @@ sub hive_capacity {
return $self->{'_hive_capacity'};
}
sub batch_size {
my $self = shift;
$self->{'_batch_size'} = shift if(@_);
$self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
return $self->{'_batch_size'};
}
sub get_compiled_module_name {
my $self = shift;
......@@ -219,12 +225,6 @@ sub status {
return $self->stats->status(@_);
}
sub batch_size {
my $self = shift @_;
return $self->stats->batch_size(@_);
}
# ------------------------------------------------------------------------------------------------------------------------------
......
......@@ -71,13 +71,6 @@ sub dbID {
}
sub batch_size {
my $self = shift;
$self->{'_batch_size'} = shift if(@_);
$self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
return $self->{'_batch_size'};
}
sub status {
my $self = shift;
$self->{'_status'} = shift if(@_);
......@@ -217,7 +210,7 @@ sub get_or_estimate_batch_size {
my $self = shift @_;
my $remaining_job_count = shift @_ || 0; # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
my $batch_size = $self->batch_size;
my $batch_size = $self->analysis->batch_size;
if( $batch_size > 0 ) { # set to positive or not set (and auto-initialized within $self->batch_size)
......
......@@ -524,12 +524,12 @@ sub add_objects_from_config {
'meadow_type' => $meadow_type,
'analysis_capacity' => $analysis_capacity,
'hive_capacity' => $hive_capacity,
'batch_size' => $batch_size,
);
$analysis->get_compiled_module_name(); # check if it compiles and is named correctly
($stats) = $pipeline->add_new_or_update( 'AnalysisStats', # NB: add_new_or_update returns a list
'analysis' => $analysis,
'batch_size' => $batch_size,
'status' => $blocked ? 'BLOCKED' : 'EMPTY', # be careful, as this "soft" way of blocking may be accidentally unblocked by deep sync
'total_job_count' => 0,
'semaphored_job_count' => 0,
......
......@@ -18,7 +18,7 @@
Each worker is linked to an analysis_id, registers its self on creation
into the Hive, creates a RunnableDB instance of the Analysis->module,
gets $analysis->stats->batch_size jobs from the job table, does its work,
gets $analysis->batch_size jobs from the job table, does its work,
creates the next layer of job entries by interfacing to
the DataflowRuleAdaptor to determine the analyses it needs to pass its
output data to and creates jobs on the next analysis database.
......
-- ---------------------------------------------------------------------------------------------------
SET @expected_version = 87;
-- make MySQL stop immediately after it encounters division by zero:
SET SESSION sql_mode='TRADITIONAL';
-- warn that we detected the schema version mismatch:
SELECT CONCAT( 'The patch only applies to schema version ',
@expected_version,
', but the current schema version is ',
meta_value,
', so skipping the rest.') AS ''
FROM hive_meta WHERE meta_key='hive_sql_schema_version' AND meta_value<>@expected_version;
-- cause division by zero only if current version differs from the expected one:
INSERT INTO hive_meta (meta_key, meta_value)
SELECT 'this_should_never_be_inserted', 1 FROM hive_meta WHERE NOT 1/(meta_key<>'hive_sql_schema_version' OR meta_value=@expected_version);
SELECT CONCAT( 'The patch seems to be compatible with schema version ',
@expected_version,
', applying the patch...') AS '';
-- Now undo the change so that we could patch potentially non-TRADITIONAL schema:
SET SESSION sql_mode='';
-- ----------------------------------<actual_patch> -------------------------------------------------
ALTER TABLE analysis_base ADD COLUMN batch_size INTEGER NOT NULL DEFAULT 1;
UPDATE analysis_base JOIN analysis_stats USING (analysis_id) SET analysis_base.batch_size = analysis_stats.batch_size;
ALTER TABLE analysis_stats DROP COLUMN batch_size;
ALTER TABLE analysis_stats_monitor DROP COLUMN batch_size;
-- ----------------------------------</actual_patch> -------------------------------------------------
-- increase the schema version by one and register the patch:
UPDATE hive_meta SET meta_value=meta_value+1 WHERE meta_key='hive_sql_schema_version';
-- ---------------------------------------------------------------------------------------------------
\set expected_version 87
\set ON_ERROR_STOP on
-- warn that we detected the schema version mismatch:
SELECT ('The patch only applies to schema version '
|| CAST(:expected_version AS VARCHAR)
|| ', but the current schema version is '
|| meta_value
|| ', so skipping the rest.') as incompatible_msg
FROM hive_meta WHERE meta_key='hive_sql_schema_version' AND meta_value!=CAST(:expected_version AS VARCHAR);
-- cause division by zero only if current version differs from the expected one:
INSERT INTO hive_meta (meta_key, meta_value)
SELECT 'this_should_never_be_inserted', 1 FROM hive_meta WHERE 1 != 1/CAST( (meta_key!='hive_sql_schema_version' OR meta_value=CAST(:expected_version AS VARCHAR)) AS INTEGER );
SELECT ('The patch seems to be compatible with schema version '
|| CAST(:expected_version AS VARCHAR)
|| ', applying the patch...') AS compatible_msg;
-- ----------------------------------<actual_patch> -------------------------------------------------
ALTER TABLE analysis_base ADD COLUMN batch_size INTEGER NOT NULL DEFAULT 1;
UPDATE analysis_base SET batch_size = analysis_stats.batch_size FROM analysis_stats WHERE analysis_base.analysis_id = analysis_stats.analysis_id;
ALTER TABLE analysis_stats DROP COLUMN batch_size;
ALTER TABLE analysis_stats_monitor DROP COLUMN batch_size;
-- ----------------------------------</actual_patch> -------------------------------------------------
-- increase the schema version by one and register the patch:
UPDATE hive_meta SET meta_value= (CAST(meta_value AS INTEGER) + 1) WHERE meta_key='hive_sql_schema_version';
INSERT INTO hive_meta (meta_key, meta_value) SELECT 'patched_to_' || meta_value, CURRENT_TIMESTAMP FROM hive_meta WHERE meta_key = 'hive_sql_schema_version';
......@@ -99,6 +99,7 @@ CREATE TABLE pipeline_wide_parameters (
@column meadow_type if defined, forces this Analysis to be run only on the given Meadow
@column analysis_capacity if defined, limits the number of Workers of this particular Analysis that are allowed to run in parallel
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
*/
CREATE TABLE analysis_base (
......@@ -115,6 +116,7 @@ CREATE TABLE analysis_base (
meadow_type VARCHAR(255) DEFAULT NULL,
analysis_capacity INTEGER DEFAULT NULL,
hive_capacity INTEGER DEFAULT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
UNIQUE KEY logic_name_idx (logic_name)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......@@ -131,7 +133,6 @@ CREATE TABLE analysis_base (
when to unblock other analyses. Also provides
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -154,7 +155,6 @@ CREATE TABLE analysis_base (
CREATE TABLE analysis_stats (
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
status ENUM('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL,
total_job_count INTEGER NOT NULL DEFAULT 0,
......@@ -652,7 +652,6 @@ CREATE TABLE log_message (
@column when_logged when this snapshot was taken
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -677,7 +676,6 @@ CREATE TABLE analysis_stats_monitor (
when_logged TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
status ENUM('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL,
total_job_count INTEGER NOT NULL DEFAULT 0,
......
......@@ -101,6 +101,7 @@ CREATE INDEX ON pipeline_wide_parameters (param_value);
@column meadow_type if defined, forces this Analysis to be run only on the given Meadow
@column analysis_capacity if defined, limits the number of Workers of this particular Analysis that are allowed to run in parallel
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
*/
CREATE TABLE analysis_base (
......@@ -117,6 +118,7 @@ CREATE TABLE analysis_base (
meadow_type VARCHAR(255) DEFAULT NULL,
analysis_capacity INTEGER DEFAULT NULL,
hive_capacity INTEGER DEFAULT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
UNIQUE (logic_name)
);
......@@ -133,7 +135,6 @@ CREATE TABLE analysis_base (
when to unblock other analyses. Also provides
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -157,7 +158,6 @@ CREATE TABLE analysis_base (
CREATE TYPE analysis_status AS ENUM ('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED');
CREATE TABLE analysis_stats (
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
status analysis_status NOT NULL DEFAULT 'EMPTY',
total_job_count INTEGER NOT NULL DEFAULT 0,
......@@ -667,7 +667,6 @@ CREATE INDEX ON log_message (message_class);
@column when_logged when this snapshot was taken
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -692,7 +691,6 @@ CREATE TABLE analysis_stats_monitor (
when_logged TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
status analysis_status NOT NULL DEFAULT 'EMPTY',
total_job_count INTEGER NOT NULL DEFAULT 0,
......
......@@ -101,6 +101,7 @@ CREATE INDEX pipeline_wide_parameters_value_idx ON pipeline_wide_para
@column meadow_type if defined, forces this Analysis to be run only on the given Meadow
@column analysis_capacity if defined, limits the number of Workers of this particular Analysis that are allowed to run in parallel
@column hive_capacity a reciprocal limiter on the number of Workers running at the same time (dependent on Workers of other Analyses)
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
*/
CREATE TABLE analysis_base (
......@@ -116,7 +117,8 @@ CREATE TABLE analysis_base (
priority SMALLINT NOT NULL DEFAULT 0,
meadow_type VARCHAR(255) DEFAULT NULL,
analysis_capacity INTEGER DEFAULT NULL,
hive_capacity INTEGER DEFAULT NULL
hive_capacity INTEGER DEFAULT NULL,
batch_size INTEGER NOT NULL DEFAULT 1
);
CREATE UNIQUE INDEX analysis_base_logic_name_idx ON analysis_base (logic_name);
......@@ -132,7 +134,6 @@ CREATE UNIQUE INDEX analysis_base_logic_name_idx ON analysis_base (logic_name);
when to unblock other analyses. Also provides
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -155,7 +156,6 @@ CREATE UNIQUE INDEX analysis_base_logic_name_idx ON analysis_base (logic_name);
CREATE TABLE analysis_stats (
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
status TEXT NOT NULL DEFAULT 'EMPTY', /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, */
total_job_count INTEGER NOT NULL DEFAULT 0,
......@@ -645,7 +645,6 @@ CREATE INDEX message_class_idx ON log_message (message_class);
@column when_logged when this snapshot was taken
@column analysis_id foreign-keyed to the corresponding analysis_base entry
@column batch_size how many jobs are claimed in one claiming operation before Worker starts executing them
@column status cached state of the Analysis
@column total_job_count total number of Jobs of this Analysis
......@@ -670,7 +669,6 @@ CREATE TABLE analysis_stats_monitor (
when_logged TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
analysis_id INTEGER NOT NULL,
batch_size INTEGER NOT NULL DEFAULT 1,
status TEXT NOT NULL DEFAULT 'EMPTY', /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'EMPTY', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'EMPTY' NOT NULL, */
total_job_count INTEGER NOT NULL DEFAULT 0,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment