Commit 8db64bca authored by Leo Gordon's avatar Leo Gordon
Browse files

moved failed_job_tolerance, max_retry_count, can_be_empty and priority columns...

moved failed_job_tolerance, max_retry_count, can_be_empty and priority columns from analysis_stats to analysis_base
parent 158bd88c
......@@ -35,51 +35,79 @@ sub new {
my $self = $class->SUPER::new( @_ ); # deal with Storable stuff
my ($logic_name, $module, $parameters, $resource_class_id) =
rearrange([qw(logic_name module parameters resource_class_id) ], @_);
$self->logic_name($logic_name) if($logic_name);
$self->module($module) if($module);
$self->parameters($parameters) if($parameters);
$self->resource_class_id($resource_class_id) if($resource_class_id);
my ($logic_name, $module, $parameters, $resource_class_id, $failed_job_tolerance, $max_retry_count, $can_be_empty, $priority) =
rearrange([qw(logic_name module parameters resource_class_id failed_job_tolerance max_retry_count can_be_empty priority) ], @_);
$self->logic_name($logic_name) if($logic_name);
$self->module($module) if($module);
$self->parameters($parameters) if($parameters);
$self->resource_class_id($resource_class_id) if($resource_class_id);
$self->failed_job_tolerance($failed_job_tolerance) if($failed_job_tolerance);
$self->max_retry_count($max_retry_count) if($max_retry_count);
$self->can_be_empty($can_be_empty) if($can_be_empty);
$self->priority($priority) if($priority);
return $self;
}
sub logic_name {
my $self = shift @_;
$self->{'_logic_name'} = shift @_ if(@_);
my $self = shift;
$self->{'_logic_name'} = shift if(@_);
return $self->{'_logic_name'};
}
sub module {
my $self = shift @_;
$self->{'_module'} = shift @_ if(@_);
my $self = shift;
$self->{'_module'} = shift if(@_);
return $self->{'_module'};
}
sub parameters {
my $self = shift @_;
$self->{'_parameters'} = shift @_ if(@_);
my $self = shift;
$self->{'_parameters'} = shift if(@_);
return $self->{'_parameters'};
}
sub resource_class_id {
my $self = shift @_;
my $self = shift;
$self->{'_resource_class_id'} = shift if(@_);
return $self->{'_resource_class_id'};
}
$self->{'_resource_class_id'} = shift @_ if(@_);
return $self->{'_resource_class_id'};
sub failed_job_tolerance {
my $self = shift;
$self->{'_failed_job_tolerance'} = shift if(@_);
$self->{'_failed_job_tolerance'} = 0 unless(defined($self->{'_failed_job_tolerance'}));
return $self->{'_failed_job_tolerance'};
}
sub max_retry_count {
my $self = shift;
$self->{'_max_retry_count'} = shift if(@_);
$self->{'_max_retry_count'} = 3 unless(defined($self->{'_max_retry_count'}));
return $self->{'_max_retry_count'};
}
sub can_be_empty {
my $self = shift;
$self->{'_can_be_empty'} = shift if(@_);
$self->{'_can_be_empty'} = 0 unless(defined($self->{'_can_be_empty'}));
return $self->{'_can_be_empty'};
}
sub priority {
my $self = shift;
$self->{'_priority'} = shift if(@_);
$self->{'_priority'} = 0 unless(defined($self->{'_priority'}));
return $self->{'_priority'};
}
......
......@@ -43,16 +43,12 @@ sub new {
my $class = shift;
my $self = bless {}, $class;
my ($analysis_id, $batch_size, $hive_capacity, $failed_job_tolerance, $max_retry_count, $can_be_empty, $priority, $status) =
rearrange([qw(analysis_id batch_size hive_capacity failed_job_tolerance max_retry_count can_be_empty priority status) ], @_);
my ($analysis_id, $batch_size, $hive_capacity, $status) =
rearrange([qw(analysis_id batch_size hive_capacity status) ], @_);
$self->analysis_id($analysis_id) if(defined($analysis_id));
$self->batch_size($batch_size) if(defined($batch_size));
$self->hive_capacity($hive_capacity) if(defined($hive_capacity));
$self->failed_job_tolerance($failed_job_tolerance) if(defined($failed_job_tolerance));
$self->max_retry_count($max_retry_count) if(defined($max_retry_count));
$self->can_be_empty($can_be_empty) if(defined($can_be_empty));
$self->priority($priority) if(defined($priority));
$self->status($status) if(defined($status));
return $self;
......@@ -92,34 +88,6 @@ sub hive_capacity {
return $self->{'_hive_capacity'};
}
sub failed_job_tolerance {
my $self = shift;
$self->{'_failed_job_tolerance'} = shift if(@_);
$self->{'_failed_job_tolerance'} = 0 unless(defined($self->{'_failed_job_tolerance'}));
return $self->{'_failed_job_tolerance'};
}
sub max_retry_count {
my $self = shift;
$self->{'_max_retry_count'} = shift if(@_);
$self->{'_max_retry_count'} = 3 unless(defined($self->{'_max_retry_count'}));
return $self->{'_max_retry_count'};
}
sub can_be_empty {
my $self = shift;
$self->{'_can_be_empty'} = shift if(@_);
$self->{'_can_be_empty'} = 0 unless(defined($self->{'_can_be_empty'}));
return $self->{'_can_be_empty'};
}
sub priority {
my $self = shift;
$self->{'_priority'} = shift if(@_);
$self->{'_priority'} = 0 unless(defined($self->{'_priority'}));
return $self->{'_priority'};
}
sub status {
my $self = shift;
$self->{'_status'} = shift if(@_);
......@@ -310,8 +278,6 @@ sub print_stats {
my $self = shift;
my $mode = shift;
return unless($self->get_analysis);
$mode=1 unless($mode);
if($mode == 1) {
......@@ -368,10 +334,10 @@ sub check_blocking_control_rules {
my $condition_analysis = $ctrl_rule->condition_analysis;
my $condition_analysis_stats = $condition_analysis && $condition_analysis->stats;
my $condition_analysis_stats_status = $condition_analysis_stats && $condition_analysis_stats->status;
my $condition_analysis_stats_cbe = $condition_analysis_stats && $condition_analysis_stats->can_be_empty;
my $condition_analysis_cbe = $condition_analysis && $condition_analysis->can_be_empty;
my $unblocked_condition = ($condition_analysis_stats_status eq 'DONE')
|| ($condition_analysis_stats_cbe && ($condition_analysis_stats_status eq 'READY'));
|| ($condition_analysis_cbe && ($condition_analysis_stats_status eq 'READY'));
unless( $unblocked_condition ) {
$all_ctrl_rules_done = 0;
......@@ -400,12 +366,13 @@ sub determine_status {
$self->status('READY');
} elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) { # all jobs of the analysis have been tried
my $absolute_tolerance = $self->failed_job_tolerance * $self->total_job_count / 100.0;
my $analysis = $self->get_analysis;
my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
if ($self->failed_job_count > $absolute_tolerance) {
$self->status('FAILED');
print "\n##################################################\n";
printf("## ERROR: %-35s ##\n", $self->get_analysis->logic_name." failed!");
printf("## %d jobs failed (tolerance: %d (%3d%%)) ##\n", $self->failed_job_count, $absolute_tolerance, $self->failed_job_tolerance);
printf("## ERROR: %-35s ##\n", $analysis->logic_name." failed!");
printf("## %d jobs failed (tolerance: %d (%3d%%)) ##\n", $self->failed_job_count, $absolute_tolerance, $analysis->failed_job_tolerance);
print "##################################################\n\n";
} else {
$self->status('DONE');
......
......@@ -553,7 +553,7 @@ sub reclaim_job_for_worker {
sub release_undone_jobs_from_worker {
my ($self, $worker, $msg) = @_;
my $max_retry_count = $worker->analysis->stats->max_retry_count();
my $max_retry_count = $worker->analysis->max_retry_count();
my $worker_id = $worker->dbID();
#first just reset the claimed jobs, these don't need a retry_count index increment:
......
......@@ -83,9 +83,9 @@ sub fetch_by_needed_workers_rc_id {
my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')";
my $join = $resource_class_id ? [[['analysis_base', 'a'], " ast.analysis_id=a.analysis_id AND a.resource_class_id=$resource_class_id"]] : [];
my $join = [[['analysis_base', 'a'], " ast.analysis_id=a.analysis_id ".( $resource_class_id ? "AND a.resource_class_id=$resource_class_id " : '') ]];
my $final_clause = 'ORDER BY priority DESC, '
my $final_clause = 'ORDER BY a.priority DESC, '
.( ($self->dbc->driver eq 'sqlite') ? 'RANDOM()' : 'RAND()' )
.($limit ? " LIMIT $limit" : '');
......@@ -142,11 +142,10 @@ sub refresh {
sub store {
my ($self, $stats) = @_;
my $sql = "INSERT INTO analysis_stats (analysis_id, batch_size, hive_capacity, failed_job_tolerance, max_retry_count, can_be_empty, priority, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
my $sql = "INSERT INTO analysis_stats (analysis_id, batch_size, hive_capacity, status) VALUES (?, ?, ?, ?)";
my $sth = $self->prepare($sql);
$sth->execute($stats->analysis_id, $stats->batch_size, $stats->hive_capacity, $stats->failed_job_tolerance,
$stats->max_retry_count, $stats->can_be_empty, $stats->priority, $stats->status);
$sth->execute($stats->analysis_id, $stats->batch_size, $stats->hive_capacity, $stats->status);
$sth->finish;
$stats->adaptor( $self );
......@@ -194,8 +193,6 @@ sub update {
$sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job();
$sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job();
$sql .= ",hive_capacity=" . $stats->hive_capacity();
# $sql .= ",max_retry_count=" . $stats->max_retry_count();
# $sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance();
unless( $self->db->hive_use_triggers() ) {
$sql .= ",total_job_count=" . $stats->total_job_count();
......@@ -210,8 +207,6 @@ sub update {
$sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",last_update=CURRENT_TIMESTAMP";
$sql .= ",sync_lock='0'";
# $sql .= ",can_be_empty=". $stats->can_be_empty();
# $sql .= ",priority=". $stats->priority();
$sql .= " WHERE analysis_id='".$stats->analysis_id."' ";
my $sth = $self->prepare($sql);
......@@ -411,10 +406,6 @@ sub _columns {
my @columns = qw (ast.analysis_id
ast.batch_size
ast.hive_capacity
ast.failed_job_tolerance
ast.max_retry_count
ast.can_be_empty
ast.priority
ast.status
ast.total_job_count
......@@ -457,10 +448,6 @@ sub _objs_from_sth {
$analStats->analysis_id($column{'analysis_id'});
$analStats->batch_size($column{'batch_size'});
$analStats->hive_capacity($column{'hive_capacity'});
$analStats->failed_job_tolerance($column{'failed_job_tolerance'});
$analStats->max_retry_count($column{'max_retry_count'});
$analStats->can_be_empty($column{'can_be_empty'});
$analStats->priority($column{'priority'});
$analStats->status($column{'status'});
$analStats->total_job_count($column{'total_job_count'});
......
......@@ -416,10 +416,14 @@ sub run {
}
$analysis = Bio::EnsEMBL::Hive::Analysis->new(
-logic_name => $logic_name,
-module => $module,
-parameters => stringify($parameters_hash || {}), # have to stringify it here, because Analysis code is external wrt Hive code
-resource_class_id => $rc_id,
-logic_name => $logic_name,
-module => $module,
-parameters => stringify($parameters_hash || {}), # have to stringify it here, because Analysis code is external wrt Hive code
-resource_class_id => $rc_id,
-failed_job_tolerance => $failed_job_tolerance,
-max_retry_count => $max_retry_count,
-can_be_empty => $can_be_empty,
-priority => $priority,
);
$analysis_adaptor->store($analysis);
......@@ -427,10 +431,6 @@ sub run {
-analysis_id => $analysis->dbID,
-batch_size => $batch_size,
-hive_capacity => $hive_capacity,
-failed_job_tolerance => $failed_job_tolerance,
-max_retry_count => $max_retry_count,
-can_be_empty => $can_be_empty,
-priority => $priority,
-status => $blocked ? 'BLOCKED' : 'READY', # be careful, as this "soft" way of blocking may be accidentally unblocked by deep sync
);
$analysis_stats_adaptor->store($stats);
......
......@@ -28,7 +28,7 @@ $Author: lg4 $
=head1 VERSION
$Revision: 1.22 $
$Revision: 1.23 $
=cut
......@@ -284,8 +284,7 @@ sub _add_analysis_node {
my $graph = $self->graph();
#Check we can invoke it & then check if it was able to be empty
my $can_be_empty = $a->stats()->can('can_be_empty') && $a->stats()->can_be_empty();
my $shape = ($can_be_empty) ? 'doubleoctagon' : 'ellipse' ;
my $shape = $a->can_be_empty() ? 'doubleoctagon' : 'ellipse' ;
my $status_colour = $self->config->get('Graph', 'Node', $a->stats->status, 'Colour');
my $node_fontname = $self->config->get('Graph', 'Node', $a->stats->status, 'Font');
......
......@@ -602,7 +602,7 @@ sub run_one_batch {
my $jobs_done_here = 0;
my $max_retry_count = $self->analysis->stats->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
my $max_retry_count = $self->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
$self->adaptor->check_in_worker( $self );
$self->adaptor->safe_synchronize_AnalysisStats($self->analysis->stats);
......
# Move failed_job_tolerance, max_retry_count, can_be_empty and priority columns
# from analysis_stats and analysis_stats_monitor into analysis_base table.
#
# Do not bother deleting the original columns from analysis_stats and analysis_stats_monitor,
# just copy the data over.
ALTER TABLE analysis_base ADD COLUMN failed_job_tolerance int(10) DEFAULT 0 NOT NULL;
ALTER TABLE analysis_base ADD COLUMN max_retry_count int(10) DEFAULT 3 NOT NULL;
ALTER TABLE analysis_base ADD COLUMN can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL;
ALTER TABLE analysis_base ADD COLUMN priority TINYINT DEFAULT 0 NOT NULL;
UPDATE analysis_base a JOIN analysis_stats s USING(analysis_id)
SET a.failed_job_tolerance = s.failed_job_tolerance
, a.max_retry_count = s.max_retry_count
, a.can_be_empty = s.can_be_empty
, a.priority = s.priority;
......@@ -30,24 +30,29 @@ CREATE TABLE IF NOT EXISTS meta (
--
-- overview:
-- Each Analysis object contains
-- analysis_id - a unique ID that is also a foreign key to most of the other tables
-- logic_name - the name of the Analysis object
-- module - the Perl module name that runs this Analysis
-- parameters - a stingified hash of parameters common to all jobs of the Analysis
--
-- semantics:
--
-- analysis_id - a unique ID that is also a foreign key to most of the other tables
-- logic_name - the name of the Analysis object
-- module - the Perl module name that runs this Analysis
-- parameters - a stingified hash of parameters common to all jobs of the Analysis
-- resource_class_id - link to the resource_class table
-- failed_job_tolerance - % of tolerated failed jobs
-- max_retry_count - how many times a job of this Analysis will be retried (unless there is no point)
-- can_be_empty - if TRUE, this Analysis will not be blocking if/while it doesn't have any jobs
-- priority - an Analysis with higher priority will be more likely chosen on Worker's specialization
CREATE TABLE analysis_base (
analysis_id int(10) unsigned NOT NULL AUTO_INCREMENT,
logic_name VARCHAR(40) NOT NULL,
module VARCHAR(255),
parameters TEXT,
resource_class_id int(10) unsigned NOT NULL,
analysis_id int(10) unsigned NOT NULL AUTO_INCREMENT,
logic_name VARCHAR(40) NOT NULL,
module VARCHAR(255),
parameters TEXT,
resource_class_id int(10) unsigned NOT NULL,
failed_job_tolerance int(10) DEFAULT 0 NOT NULL,
max_retry_count int(10) DEFAULT 3 NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL,
PRIMARY KEY (analysis_id),
UNIQUE KEY logic_name_idx (logic_name)
PRIMARY KEY (analysis_id),
UNIQUE KEY logic_name_idx (logic_name)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......@@ -322,16 +327,11 @@ CREATE TABLE resource_description (
-- semantics:
-- analysis_id - foreign key to analysis table
-- status - overview status of the jobs (cached state)
-- failed_job_tolerance - % of tolerated failed jobs
CREATE TABLE analysis_stats (
analysis_id int(10) unsigned NOT NULL,
batch_size int(10) DEFAULT 1 NOT NULL,
hive_capacity int(10) DEFAULT 1 NOT NULL,
failed_job_tolerance int(10) DEFAULT 0 NOT NULL,
max_retry_count int(10) DEFAULT 3 NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL,
status enum('BLOCKED', 'LOADING', 'SYNCHING', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'READY' NOT NULL,
total_job_count int(10) DEFAULT NULL,
......@@ -364,10 +364,6 @@ CREATE TABLE analysis_stats_monitor (
analysis_id int(10) unsigned NOT NULL,
batch_size int(10) DEFAULT 1 NOT NULL,
hive_capacity int(10) DEFAULT 1 NOT NULL,
failed_job_tolerance int(10) DEFAULT 0 NOT NULL,
max_retry_count int(10) DEFAULT 3 NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL,
status enum('BLOCKED', 'LOADING', 'SYNCHING', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'READY' NOT NULL,
total_job_count int(10) DEFAULT NULL,
......
......@@ -26,20 +26,25 @@ CREATE INDEX IF NOT EXISTS species_value_idx ON meta (species_id, met
--
-- overview:
-- Each Analysis object contains
-- analysis_id - a unique ID that is also a foreign key to most of the other tables
-- logic_name - the name of the Analysis object
-- module - the Perl module name that runs this Analysis
-- parameters - a stingified hash of parameters common to all jobs of the Analysis
--
-- semantics:
--
-- analysis_id - a unique ID that is also a foreign key to most of the other tables
-- logic_name - the name of the Analysis object
-- module - the Perl module name that runs this Analysis
-- parameters - a stingified hash of parameters common to all jobs of the Analysis
-- failed_job_tolerance - % of tolerated failed jobs
-- max_retry_count - how many times a job of this Analysis will be retried (unless there is no point)
-- can_be_empty - if TRUE, this Analysis will not be blocking if/while it doesn't have any jobs
-- priority - an Analysis with higher priority will be more likely chosen on Worker's specialization
CREATE TABLE IF NOT EXISTS analysis_base (
analysis_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
logic_name VARCHAR(40) NOT NULL,
module VARCHAR(255),
parameters TEXT,
resource_class_id INTEGER NOT NULL
analysis_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
logic_name VARCHAR(40) NOT NULL,
module VARCHAR(255),
parameters TEXT,
resource_class_id INTEGER NOT NULL,
failed_job_tolerance int(10) NOT NULL DEFAULT 0,
max_retry_count int(10) default 3 NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS logic_name_idx ON analysis_base (logic_name);
......@@ -300,17 +305,12 @@ CREATE TABLE resource_description (
-- semantics:
-- analysis_id - foreign key to analysis table
-- status - overview status of the jobs (cached state)
-- failed_job_tolerance - % of tolerated failed jobs
-- resource_class_id - resource class id (analyses are grouped into disjoint classes)
CREATE TABLE analysis_stats (
analysis_id INTEGER NOT NULL,
batch_size int(10) default 1 NOT NULL,
hive_capacity int(10) default 1 NOT NULL,
failed_job_tolerance int(10) NOT NULL DEFAULT 0,
max_retry_count int(10) NOT NULL DEFAULT 3,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL,
status TEXT DEFAULT 'READY' NOT NULL, /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'READY' NOT NULL, */
total_job_count int(10) NOT NULL DEFAULT 0,
......@@ -341,10 +341,6 @@ CREATE TABLE analysis_stats_monitor (
analysis_id INTEGER NOT NULL,
batch_size int(10) default 1 NOT NULL,
hive_capacity int(10) default 1 NOT NULL,
failed_job_tolerance int(10) default 0 NOT NULL,
max_retry_count int(10) default 3 NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL,
status TEXT DEFAULT 'READY' NOT NULL, /* enum('BLOCKED', 'LOADING', 'SYNCHING', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'READY' NOT NULL, */
total_job_count int(10) NOT NULL DEFAULT 0,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment