Commit 1602585e authored by Leo Gordon's avatar Leo Gordon
Browse files

schema_change: removed analysis_stats[_monitor].num_required_workers,...

schema_change: removed analysis_stats[_monitor].num_required_workers, substituted with AS::estimate_num_required_workers()
parent f72897dc
...@@ -126,12 +126,6 @@ sub num_running_workers { ...@@ -126,12 +126,6 @@ sub num_running_workers {
return $self->{'_num_running_workers'}; return $self->{'_num_running_workers'};
} }
sub num_required_workers { # NB: the meaning of this field is, again, "how many extra workers we need to add"
my $self = shift;
$self->{'_num_required_workers'} = shift if(@_);
return $self->{'_num_required_workers'};
}
## dynamic hive_capacity mode attributes: ## dynamic hive_capacity mode attributes:
...@@ -245,6 +239,13 @@ sub get_or_estimate_batch_size { ...@@ -245,6 +239,13 @@ sub get_or_estimate_batch_size {
} }
sub estimate_num_required_workers {
my $self = shift;
return POSIX::ceil( $self->ready_job_count / $self->get_or_estimate_batch_size );
}
sub inprogress_job_count { # includes CLAIMED sub inprogress_job_count { # includes CLAIMED
my $self = shift; my $self = shift;
return $self->total_job_count return $self->total_job_count
...@@ -336,7 +337,7 @@ sub toString { ...@@ -336,7 +337,7 @@ sub toString {
my $analysis = $self->analysis; my $analysis = $self->analysis;
my ($avg_runtime, $avg_runtime_unit) = $self->friendly_avg_job_runtime; my ($avg_runtime, $avg_runtime_unit) = $self->friendly_avg_job_runtime;
my $output .= sprintf("%-${max_logic_name_length}s(%3d) %s, jobs( %s ), avg:%5.1f %-3s, workers(Running:%d, Reqired:%d) ", my $output .= sprintf("%-${max_logic_name_length}s(%3d) %s, jobs( %s ), avg:%5.1f %-3s, workers(Running:%d, Est.Required:%d) ",
$analysis->logic_name, $analysis->logic_name,
$self->analysis_id // 0, $self->analysis_id // 0,
...@@ -347,7 +348,7 @@ sub toString { ...@@ -347,7 +348,7 @@ sub toString {
$avg_runtime, $avg_runtime_unit, $avg_runtime, $avg_runtime_unit,
$self->num_running_workers, $self->num_running_workers,
$self->num_required_workers, $self->estimate_num_required_workers,
); );
$output .= ' h.cap:' .( $self->hive_capacity // '-' ) $output .= ' h.cap:' .( $self->hive_capacity // '-' )
.' a.cap:' .( $analysis->analysis_capacity // '-') .' a.cap:' .( $analysis->analysis_capacity // '-')
...@@ -445,14 +446,6 @@ sub recalculate_from_job_counts { ...@@ -445,14 +446,6 @@ sub recalculate_from_job_counts {
$self->total_job_count( sum( values %$job_counts ) || 0 ); $self->total_job_count( sum( values %$job_counts ) || 0 );
} }
# compute the number of total required workers for this analysis (taking into account the jobs that are already running)
my $analysis = $self->analysis();
my $scheduling_allowed = ( !defined( $self->hive_capacity ) or $self->hive_capacity )
&& ( !defined( $analysis->analysis_capacity ) or $analysis->analysis_capacity );
my $required_workers = $scheduling_allowed
&& POSIX::ceil( $self->ready_job_count() / $self->get_or_estimate_batch_size() );
$self->num_required_workers( $required_workers );
$self->check_blocking_control_rules(); $self->check_blocking_control_rules();
$self->determine_status(); $self->determine_status();
......
...@@ -150,7 +150,6 @@ sub update { ...@@ -150,7 +150,6 @@ sub update {
$sql .= ",num_running_workers=" . $stats->num_running_workers(); $sql .= ",num_running_workers=" . $stats->num_running_workers();
} }
$sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",when_updated=CURRENT_TIMESTAMP"; $sql .= ",when_updated=CURRENT_TIMESTAMP";
$sql .= ",sync_lock='0'"; $sql .= ",sync_lock='0'";
$sql .= " WHERE analysis_id='".$stats->analysis_id."' "; $sql .= " WHERE analysis_id='".$stats->analysis_id."' ";
...@@ -255,27 +254,5 @@ sub decrease_running_workers { ...@@ -255,27 +254,5 @@ sub decrease_running_workers {
} }
sub decrease_required_workers {
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers-1 ".
"WHERE analysis_id='$analysis_id' ";
$self->dbc->do($sql);
}
sub increase_required_workers {
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers+1 ".
"WHERE analysis_id='$analysis_id' ";
$self->dbc->do($sql);
}
1; 1;
...@@ -78,7 +78,7 @@ sub finalize_role { ...@@ -78,7 +78,7 @@ sub finalize_role {
} }
# Re-sync the analysis_stats when a worker dies as part of dynamic sync system. # Re-sync the analysis_stats when a worker dies as part of dynamic sync system.
# It will also re-calculate num_running_workers (from active roles) and num_required_workers, # It will also re-calculate num_running_workers (from active roles)
# so no further adjustment should be necessary. # so no further adjustment should be necessary.
$self->db->get_WorkerAdaptor->safe_synchronize_AnalysisStats( $role->analysis->stats ); $self->db->get_WorkerAdaptor->safe_synchronize_AnalysisStats( $role->analysis->stats );
} }
......
...@@ -496,7 +496,6 @@ sub add_objects_from_config { ...@@ -496,7 +496,6 @@ sub add_objects_from_config {
'done_job_count' => 0, 'done_job_count' => 0,
'failed_job_count' => 0, 'failed_job_count' => 0,
'num_running_workers' => 0, 'num_running_workers' => 0,
'num_required_workers' => 0,
'behaviour' => 'STATIC', 'behaviour' => 'STATIC',
'input_capacity' => 4, 'input_capacity' => 4,
'output_capacity' => 4, 'output_capacity' => 4,
......
...@@ -271,8 +271,6 @@ sub specialize_worker { ...@@ -271,8 +271,6 @@ sub specialize_worker {
} else { # count it as autonomous worker sharing the load of that analysis: } else { # count it as autonomous worker sharing the load of that analysis:
$analysis_stats_adaptor->update_status($analysis->dbID, 'WORKING'); $analysis_stats_adaptor->update_status($analysis->dbID, 'WORKING');
$analysis_stats_adaptor->decrease_required_workers( $analysis->dbID );
} }
# The following increment used to be done only when no specific task was given to the worker, # The following increment used to be done only when no specific task was given to the worker,
......
...@@ -239,8 +239,8 @@ sub schedule_workers { ...@@ -239,8 +239,8 @@ sub schedule_workers {
next ANALYSIS; next ANALYSIS;
} }
# getting the initial worker requirement for this analysis (may be stale if not sync'ed recently) # getting the initial worker requirement for this analysis (may be off if $analysis_stats has not been sync'ed recently)
my $extra_workers_this_analysis = $analysis_stats->num_required_workers; my $extra_workers_this_analysis = $analysis_stats->estimate_num_required_workers;
if ($extra_workers_this_analysis <= 0) { if ($extra_workers_this_analysis <= 0) {
push @$log_buffer, "Analysis '$logic_name' doesn't require extra workers, skipping it."; push @$log_buffer, "Analysis '$logic_name' doesn't require extra workers, skipping it.";
...@@ -315,7 +315,7 @@ sub sort_stats_by_suitability { ...@@ -315,7 +315,7 @@ sub sort_stats_by_suitability {
foreach my $stats ( @sorted_stats ) { foreach my $stats ( @sorted_stats ) {
# assuming sync() is expensive, so first trying analyses that have already been sunk: # assuming sync() is expensive, so first trying analyses that have already been sunk:
if( ($stats->num_required_workers > 0) and ($stats->status =~/^(READY|WORKING)$/) ) { if( ($stats->estimate_num_required_workers > 0) and ($stats->status =~/^(READY|WORKING)$/) ) {
push @primary_candidates, $stats; push @primary_candidates, $stats;
......
-- ---------------------------------------------------------------------------------------------------
SET @expected_version = 66;
-- make MySQL stop immediately after it encounters division by zero:
SET SESSION sql_mode='TRADITIONAL';
-- warn that we detected the schema version mismatch:
SELECT CONCAT( 'The patch only applies to schema version ',
@expected_version,
', but the current schema version is ',
meta_value,
', so skipping the rest.') AS ''
FROM hive_meta WHERE meta_key='hive_sql_schema_version' AND meta_value<>@expected_version;
-- cause division by zero only if current version differs from the expected one:
INSERT INTO hive_meta (meta_key, meta_value)
SELECT 'this_should_never_be_inserted', 1 FROM hive_meta WHERE NOT 1/(meta_key<>'hive_sql_schema_version' OR meta_value=@expected_version);
SELECT CONCAT( 'The patch seems to be compatible with schema version ',
@expected_version,
', applying the patch...') AS '';
-- Now undo the change so that we could patch potentially non-TRADITIONAL schema:
SET SESSION sql_mode='';
-- ----------------------------------<actual_patch> -------------------------------------------------
ALTER TABLE analysis_stats DROP COLUMN num_required_workers;
ALTER TABLE analysis_stats_monitor DROP COLUMN num_required_workers;
-- ----------------------------------</actual_patch> -------------------------------------------------
-- increase the schema version by one:
UPDATE hive_meta SET meta_value=meta_value+1 WHERE meta_key='hive_sql_schema_version';
-- ---------------------------------------------------------------------------------------------------
\set expected_version 66
\set ON_ERROR_STOP on
-- warn that we detected the schema version mismatch:
SELECT ('The patch only applies to schema version '
|| CAST(:expected_version AS VARCHAR)
|| ', but the current schema version is '
|| meta_value
|| ', so skipping the rest.') as incompatible_msg
FROM hive_meta WHERE meta_key='hive_sql_schema_version' AND meta_value!=CAST(:expected_version AS VARCHAR);
-- cause division by zero only if current version differs from the expected one:
INSERT INTO hive_meta (meta_key, meta_value)
SELECT 'this_should_never_be_inserted', 1 FROM hive_meta WHERE 1 != 1/CAST( (meta_key!='hive_sql_schema_version' OR meta_value=CAST(:expected_version AS VARCHAR)) AS INTEGER );
SELECT ('The patch seems to be compatible with schema version '
|| CAST(:expected_version AS VARCHAR)
|| ', applying the patch...') AS compatible_msg;
-- ----------------------------------<actual_patch> -------------------------------------------------
ALTER TABLE analysis_stats DROP COLUMN num_required_workers;
ALTER TABLE analysis_stats_monitor DROP COLUMN num_required_workers;
-- ----------------------------------</actual_patch> -------------------------------------------------
-- increase the schema version by one:
UPDATE hive_meta SET meta_value= (CAST(meta_value AS INTEGER) + 1) WHERE meta_key='hive_sql_schema_version';
...@@ -137,7 +137,6 @@ CREATE TABLE analysis_base ( ...@@ -137,7 +137,6 @@ CREATE TABLE analysis_base (
@column failed_job_count number of Jobs of this Analysis that are in FAILED state @column failed_job_count number of Jobs of this Analysis that are in FAILED state
@column num_running_workers number of running Workers of this Analysis @column num_running_workers number of running Workers of this Analysis
@column num_required_workers extra number of Workers of this Analysis needed to execute all READY jobs
@column behaviour whether hive_capacity is set or is dynamically calculated based on timers @column behaviour whether hive_capacity is set or is dynamically calculated based on timers
@column input_capacity used to compute hive_capacity in DYNAMIC mode @column input_capacity used to compute hive_capacity in DYNAMIC mode
...@@ -163,8 +162,8 @@ CREATE TABLE analysis_stats ( ...@@ -163,8 +162,8 @@ CREATE TABLE analysis_stats (
ready_job_count INTEGER NOT NULL DEFAULT 0, ready_job_count INTEGER NOT NULL DEFAULT 0,
done_job_count INTEGER NOT NULL DEFAULT 0, done_job_count INTEGER NOT NULL DEFAULT 0,
failed_job_count INTEGER NOT NULL DEFAULT 0, failed_job_count INTEGER NOT NULL DEFAULT 0,
num_running_workers INTEGER NOT NULL DEFAULT 0, num_running_workers INTEGER NOT NULL DEFAULT 0,
num_required_workers INTEGER NOT NULL DEFAULT 0,
behaviour ENUM('STATIC', 'DYNAMIC') NOT NULL DEFAULT 'STATIC', behaviour ENUM('STATIC', 'DYNAMIC') NOT NULL DEFAULT 'STATIC',
input_capacity INTEGER NOT NULL DEFAULT 4, input_capacity INTEGER NOT NULL DEFAULT 4,
...@@ -602,7 +601,6 @@ CREATE TABLE log_message ( ...@@ -602,7 +601,6 @@ CREATE TABLE log_message (
@column failed_job_count number of Jobs of this Analysis that are in FAILED state @column failed_job_count number of Jobs of this Analysis that are in FAILED state
@column num_running_workers number of running Workers of this Analysis @column num_running_workers number of running Workers of this Analysis
@column num_required_workers extra number of Workers of this Analysis needed to execute all READY jobs
@column behaviour whether hive_capacity is set or is dynamically calculated based on timers @column behaviour whether hive_capacity is set or is dynamically calculated based on timers
@column input_capacity used to compute hive_capacity in DYNAMIC mode @column input_capacity used to compute hive_capacity in DYNAMIC mode
...@@ -630,8 +628,8 @@ CREATE TABLE analysis_stats_monitor ( ...@@ -630,8 +628,8 @@ CREATE TABLE analysis_stats_monitor (
ready_job_count INTEGER NOT NULL DEFAULT 0, ready_job_count INTEGER NOT NULL DEFAULT 0,
done_job_count INTEGER NOT NULL DEFAULT 0, done_job_count INTEGER NOT NULL DEFAULT 0,
failed_job_count INTEGER NOT NULL DEFAULT 0, failed_job_count INTEGER NOT NULL DEFAULT 0,
num_running_workers INTEGER NOT NULL DEFAULT 0, num_running_workers INTEGER NOT NULL DEFAULT 0,
num_required_workers INTEGER NOT NULL DEFAULT 0,
behaviour ENUM('STATIC', 'DYNAMIC') NOT NULL DEFAULT 'STATIC', behaviour ENUM('STATIC', 'DYNAMIC') NOT NULL DEFAULT 'STATIC',
input_capacity INTEGER NOT NULL DEFAULT 4, input_capacity INTEGER NOT NULL DEFAULT 4,
......
...@@ -139,7 +139,6 @@ CREATE TABLE analysis_base ( ...@@ -139,7 +139,6 @@ CREATE TABLE analysis_base (
@column failed_job_count number of Jobs of this Analysis that are in FAILED state @column failed_job_count number of Jobs of this Analysis that are in FAILED state
@column num_running_workers number of running Workers of this Analysis @column num_running_workers number of running Workers of this Analysis
@column num_required_workers extra number of Workers of this Analysis needed to execute all READY jobs
@column behaviour whether hive_capacity is set or is dynamically calculated based on timers @column behaviour whether hive_capacity is set or is dynamically calculated based on timers
@column input_capacity used to compute hive_capacity in DYNAMIC mode @column input_capacity used to compute hive_capacity in DYNAMIC mode
...@@ -167,8 +166,8 @@ CREATE TABLE analysis_stats ( ...@@ -167,8 +166,8 @@ CREATE TABLE analysis_stats (
ready_job_count INTEGER NOT NULL DEFAULT 0, ready_job_count INTEGER NOT NULL DEFAULT 0,
done_job_count INTEGER NOT NULL DEFAULT 0, done_job_count INTEGER NOT NULL DEFAULT 0,
failed_job_count INTEGER NOT NULL DEFAULT 0, failed_job_count INTEGER NOT NULL DEFAULT 0,
num_running_workers INTEGER NOT NULL DEFAULT 0, num_running_workers INTEGER NOT NULL DEFAULT 0,
num_required_workers INTEGER NOT NULL DEFAULT 0,
behaviour analysis_behaviour NOT NULL DEFAULT 'STATIC', behaviour analysis_behaviour NOT NULL DEFAULT 'STATIC',
input_capacity INTEGER NOT NULL DEFAULT 4, input_capacity INTEGER NOT NULL DEFAULT 4,
...@@ -606,7 +605,6 @@ CREATE INDEX ON log_message (job_id); ...@@ -606,7 +605,6 @@ CREATE INDEX ON log_message (job_id);
@column failed_job_count number of Jobs of this Analysis that are in FAILED state @column failed_job_count number of Jobs of this Analysis that are in FAILED state
@column num_running_workers number of running Workers of this Analysis @column num_running_workers number of running Workers of this Analysis
@column num_required_workers extra number of Workers of this Analysis needed to execute all READY jobs
@column behaviour whether hive_capacity is set or is dynamically calculated based on timers @column behaviour whether hive_capacity is set or is dynamically calculated based on timers
@column input_capacity used to compute hive_capacity in DYNAMIC mode @column input_capacity used to compute hive_capacity in DYNAMIC mode
...@@ -634,8 +632,8 @@ CREATE TABLE analysis_stats_monitor ( ...@@ -634,8 +632,8 @@ CREATE TABLE analysis_stats_monitor (
ready_job_count INTEGER NOT NULL DEFAULT 0, ready_job_count INTEGER NOT NULL DEFAULT 0,
done_job_count INTEGER NOT NULL DEFAULT 0, done_job_count INTEGER NOT NULL DEFAULT 0,
failed_job_count INTEGER NOT NULL DEFAULT 0, failed_job_count INTEGER NOT NULL DEFAULT 0,
num_running_workers INTEGER NOT NULL DEFAULT 0, num_running_workers INTEGER NOT NULL DEFAULT 0,
num_required_workers INTEGER NOT NULL DEFAULT 0,
behaviour analysis_behaviour NOT NULL DEFAULT 'STATIC', behaviour analysis_behaviour NOT NULL DEFAULT 'STATIC',
input_capacity INTEGER NOT NULL DEFAULT 4, input_capacity INTEGER NOT NULL DEFAULT 4,
......
...@@ -138,7 +138,6 @@ CREATE UNIQUE INDEX analysis_base_logic_name_idx ON analysis_base (logic_name); ...@@ -138,7 +138,6 @@ CREATE UNIQUE INDEX analysis_base_logic_name_idx ON analysis_base (logic_name);
@column failed_job_count number of Jobs of this Analysis that are in FAILED state @column failed_job_count number of Jobs of this Analysis that are in FAILED state
@column num_running_workers number of running Workers of this Analysis @column num_running_workers number of running Workers of this Analysis
@column num_required_workers extra number of Workers of this Analysis needed to execute all READY jobs
@column behaviour whether hive_capacity is set or is dynamically calculated based on timers @column behaviour whether hive_capacity is set or is dynamically calculated based on timers
@column input_capacity used to compute hive_capacity in DYNAMIC mode @column input_capacity used to compute hive_capacity in DYNAMIC mode
...@@ -164,8 +163,8 @@ CREATE TABLE analysis_stats ( ...@@ -164,8 +163,8 @@ CREATE TABLE analysis_stats (
ready_job_count INTEGER NOT NULL DEFAULT 0, ready_job_count INTEGER NOT NULL DEFAULT 0,
done_job_count INTEGER NOT NULL DEFAULT 0, done_job_count INTEGER NOT NULL DEFAULT 0,
failed_job_count INTEGER NOT NULL DEFAULT 0, failed_job_count INTEGER NOT NULL DEFAULT 0,
num_running_workers INTEGER NOT NULL DEFAULT 0, num_running_workers INTEGER NOT NULL DEFAULT 0,
num_required_workers INTEGER NOT NULL DEFAULT 0,
behaviour TEXT NOT NULL DEFAULT 'STATIC', /* enum('STATIC', 'DYNAMIC') DEFAULT 'STATIC' NOT NULL, */ behaviour TEXT NOT NULL DEFAULT 'STATIC', /* enum('STATIC', 'DYNAMIC') DEFAULT 'STATIC' NOT NULL, */
input_capacity INTEGER NOT NULL DEFAULT 4, input_capacity INTEGER NOT NULL DEFAULT 4,
...@@ -595,7 +594,6 @@ CREATE INDEX log_message_job_idx ON log_message (job_id); ...@@ -595,7 +594,6 @@ CREATE INDEX log_message_job_idx ON log_message (job_id);
@column failed_job_count number of Jobs of this Analysis that are in FAILED state @column failed_job_count number of Jobs of this Analysis that are in FAILED state
@column num_running_workers number of running Workers of this Analysis @column num_running_workers number of running Workers of this Analysis
@column num_required_workers extra number of Workers of this Analysis needed to execute all READY jobs
@column behaviour whether hive_capacity is set or is dynamically calculated based on timers @column behaviour whether hive_capacity is set or is dynamically calculated based on timers
@column input_capacity used to compute hive_capacity in DYNAMIC mode @column input_capacity used to compute hive_capacity in DYNAMIC mode
...@@ -623,8 +621,8 @@ CREATE TABLE analysis_stats_monitor ( ...@@ -623,8 +621,8 @@ CREATE TABLE analysis_stats_monitor (
ready_job_count INTEGER NOT NULL DEFAULT 0, ready_job_count INTEGER NOT NULL DEFAULT 0,
done_job_count INTEGER NOT NULL DEFAULT 0, done_job_count INTEGER NOT NULL DEFAULT 0,
failed_job_count INTEGER NOT NULL DEFAULT 0, failed_job_count INTEGER NOT NULL DEFAULT 0,
num_running_workers INTEGER NOT NULL DEFAULT 0, num_running_workers INTEGER NOT NULL DEFAULT 0,
num_required_workers INTEGER NOT NULL DEFAULT 0,
behaviour TEXT NOT NULL DEFAULT 'STATIC', /* enum('STATIC', 'DYNAMIC') DEFAULT 'STATIC' NOT NULL, */ behaviour TEXT NOT NULL DEFAULT 'STATIC', /* enum('STATIC', 'DYNAMIC') DEFAULT 'STATIC' NOT NULL, */
input_capacity INTEGER NOT NULL DEFAULT 4, input_capacity INTEGER NOT NULL DEFAULT 4,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment