Commit 9e3508cc authored by Leo Gordon's avatar Leo Gordon
Browse files

moved resource_class_id from analysis_stats and analysis_stats_monitor to analysis_base

parent 0f4b0911
No preview for this file type
docs/hive_schema.png

227 KB | W: | H:

docs/hive_schema.png

225 KB | W: | H:

docs/hive_schema.png
docs/hive_schema.png
docs/hive_schema.png
docs/hive_schema.png
  • 2-up
  • Swipe
  • Onion skin
...@@ -35,12 +35,13 @@ sub new { ...@@ -35,12 +35,13 @@ sub new {
my $self = $class->SUPER::new( @_ ); # deal with Storable stuff my $self = $class->SUPER::new( @_ ); # deal with Storable stuff
my ($logic_name, $module, $parameters) = my ($logic_name, $module, $parameters, $resource_class_id) =
rearrange([qw(logic_name module parameters) ], @_); rearrange([qw(logic_name module parameters resource_class_id) ], @_);
$self->logic_name($logic_name) if($logic_name); $self->logic_name($logic_name) if($logic_name);
$self->module($module) if($module); $self->module($module) if($module);
$self->parameters($parameters) if($parameters); $self->parameters($parameters) if($parameters);
$self->resource_class_id($resource_class_id) if($resource_class_id);
return $self; return $self;
} }
...@@ -49,9 +50,8 @@ sub new { ...@@ -49,9 +50,8 @@ sub new {
sub logic_name { sub logic_name {
my $self = shift @_; my $self = shift @_;
if(@_) { $self->{'_logic_name'} = shift @_ if(@_);
$self->{'_logic_name'} = shift @_;
}
return $self->{'_logic_name'}; return $self->{'_logic_name'};
} }
...@@ -59,9 +59,8 @@ sub logic_name { ...@@ -59,9 +59,8 @@ sub logic_name {
sub module { sub module {
my $self = shift @_; my $self = shift @_;
if(@_) { $self->{'_module'} = shift @_ if(@_);
$self->{'_module'} = shift @_;
}
return $self->{'_module'}; return $self->{'_module'};
} }
...@@ -69,13 +68,21 @@ sub module { ...@@ -69,13 +68,21 @@ sub module {
sub parameters { sub parameters {
my $self = shift @_; my $self = shift @_;
if(@_) { $self->{'_parameters'} = shift @_ if(@_);
$self->{'_parameters'} = shift @_;
}
return $self->{'_parameters'}; return $self->{'_parameters'};
} }
sub resource_class_id {
my $self = shift @_;
$self->{'_resource_class_id'} = shift @_ if(@_);
return $self->{'_resource_class_id'};
}
=head2 process =head2 process
Arg [1] : none Arg [1] : none
......
...@@ -260,13 +260,6 @@ sub sync_lock { ...@@ -260,13 +260,6 @@ sub sync_lock {
return $self->{'_sync_lock'}; return $self->{'_sync_lock'};
} }
sub resource_class_id {
my $self = shift;
$self->{'_resource_class_id'} = shift if(@_);
return $self->{'_resource_class_id'};
}
sub can_be_empty { sub can_be_empty {
my $self = shift; my $self = shift;
......
...@@ -46,11 +46,11 @@ use Bio::EnsEMBL::Utils::Exception; ...@@ -46,11 +46,11 @@ use Bio::EnsEMBL::Utils::Exception;
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor'); use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
sub create_new_for_analysis_id_resource_class_id { sub create_new_for_analysis_id{
my ($self, $analysis_id, $resource_class_id) = @_; my ($self, $analysis_id) = @_;
my $insertion_method = ($self->dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE'; my $insertion_method = ($self->dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
my $sql = "$insertion_method INTO analysis_stats (analysis_id, resource_class_id) VALUES ($analysis_id, $resource_class_id)"; my $sql = "$insertion_method INTO analysis_stats (analysis_id) VALUES ($analysis_id)";
my $sth = $self->prepare($sql); my $sth = $self->prepare($sql);
$sth->execute(); $sth->execute();
$sth->finish; $sth->finish;
...@@ -89,35 +89,37 @@ sub fetch_by_analysis_id { ...@@ -89,35 +89,37 @@ sub fetch_by_analysis_id {
} }
sub fetch_by_needed_workers { sub fetch_by_needed_workers_rc_id {
my ($self, $limit, $resource_class_id) = @_; my ($self, $limit, $resource_class_id) = @_;
my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')" my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')";
.($resource_class_id ? " AND ast.resource_class_id = $resource_class_id" : '');
my $join = $resource_class_id ? [[['analysis_base', 'a'], " ast.analysis_id=a.analysis_id AND a.resource_class_id=$resource_class_id"]] : [];
my $final_clause = 'ORDER BY priority DESC, ' my $final_clause = 'ORDER BY priority DESC, '
.( ($self->dbc->driver eq 'sqlite') ? 'RANDOM()' : 'RAND()' ) .( ($self->dbc->driver eq 'sqlite') ? 'RANDOM()' : 'RAND()' )
.($limit ? " LIMIT $limit" : ''); .($limit ? " LIMIT $limit" : '');
$self->_final_clause($final_clause); $self->_final_clause($final_clause);
my $results = $self->_generic_fetch($constraint); my $results = $self->_generic_fetch($constraint, $join);
$self->_final_clause(''); # reset final clause for other fetches $self->_final_clause(''); # reset final clause for other fetches
return $results; return $results;
} }
sub fetch_by_statuses { sub fetch_by_statuses_rc_id {
my ($self, $statuses, $resource_class_id) = @_; my ($self, $statuses, $resource_class_id) = @_;
my $constraint = 'ast.status in ('.join(', ', map { "'$_'" } @$statuses).')';
my $constraint = 'ast.status in ('.join(', ', map { "'$_'" } @$statuses).')' my $join = $resource_class_id ? [[['analysis_base', 'a'], " ast.analysis_id=a.analysis_id AND a.resource_class_id=$resource_class_id"]] : [];
.($resource_class_id ? " AND ast.resource_class_id = $resource_class_id" : '');
$self->_final_clause('ORDER BY last_update'); $self->_final_clause('ORDER BY last_update');
my $results = $self->_generic_fetch($constraint); my $results = $self->_generic_fetch($constraint, $join);
$self->_final_clause(''); #reset final clause for other fetches $self->_final_clause(''); #reset final clause for other fetches
return $results; return $results;
} }
...@@ -208,7 +210,6 @@ sub update { ...@@ -208,7 +210,6 @@ sub update {
$sql .= ",num_required_workers=" . $stats->num_required_workers(); $sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",last_update=CURRENT_TIMESTAMP"; $sql .= ",last_update=CURRENT_TIMESTAMP";
$sql .= ",sync_lock='0'"; $sql .= ",sync_lock='0'";
$sql .= ",resource_class_id=". $stats->resource_class_id();
$sql .= ",can_be_empty=". $stats->can_be_empty(); $sql .= ",can_be_empty=". $stats->can_be_empty();
$sql .= ",priority=". $stats->priority(); $sql .= ",priority=". $stats->priority();
$sql .= " WHERE analysis_id='".$stats->analysis_id."' "; $sql .= " WHERE analysis_id='".$stats->analysis_id."' ";
...@@ -433,7 +434,6 @@ sub _columns { ...@@ -433,7 +434,6 @@ sub _columns {
ast.num_required_workers ast.num_required_workers
ast.last_update ast.last_update
ast.sync_lock ast.sync_lock
ast.resource_class_id
ast.can_be_empty ast.can_be_empty
ast.priority ast.priority
); );
...@@ -458,7 +458,6 @@ sub _objs_from_sth { ...@@ -458,7 +458,6 @@ sub _objs_from_sth {
$analStats->analysis_id($column{'analysis_id'}); $analStats->analysis_id($column{'analysis_id'});
$analStats->status($column{'status'}); $analStats->status($column{'status'});
$analStats->sync_lock($column{'sync_lock'}); $analStats->sync_lock($column{'sync_lock'});
$analStats->resource_class_id($column{'resource_class_id'});
$analStats->can_be_empty($column{'can_be_empty'}); $analStats->can_be_empty($column{'can_be_empty'});
$analStats->priority($column{'priority'}); $analStats->priority($column{'priority'});
$analStats->batch_size($column{'batch_size'}); $analStats->batch_size($column{'batch_size'});
......
...@@ -415,13 +415,14 @@ sub run { ...@@ -415,13 +415,14 @@ sub run {
} }
$analysis = Bio::EnsEMBL::Hive::Analysis->new( $analysis = Bio::EnsEMBL::Hive::Analysis->new(
-logic_name => $logic_name, -logic_name => $logic_name,
-module => $module, -module => $module,
-parameters => stringify($parameters_hash || {}), # have to stringify it here, because Analysis code is external wrt Hive code -parameters => stringify($parameters_hash || {}), # have to stringify it here, because Analysis code is external wrt Hive code
-resource_class_id => $rc_id,
); );
$analysis_adaptor->store($analysis); $analysis_adaptor->store($analysis);
$analysis_stats_adaptor->create_new_for_analysis_id_resource_class_id($analysis->dbID, $rc_id); $analysis_stats_adaptor->create_new_for_analysis_id($analysis->dbID);
my $stats = $analysis->stats(); my $stats = $analysis->stats();
$stats->batch_size( $batch_size ) if(defined($batch_size)); $stats->batch_size( $batch_size ) if(defined($batch_size));
......
...@@ -640,7 +640,7 @@ sub get_num_failed_analyses { ...@@ -640,7 +640,7 @@ sub get_num_failed_analyses {
my ($self, $analysis) = @_; my ($self, $analysis) = @_;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor; my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $failed_analyses = $statsDBA->fetch_by_statuses(['FAILED']); my $failed_analyses = $statsDBA->fetch_by_statuses_rc_id(['FAILED']);
if ($analysis) { if ($analysis) {
foreach my $this_failed_analysis (@$failed_analyses) { foreach my $this_failed_analysis (@$failed_analyses) {
if ($this_failed_analysis->analysis_id == $analysis->dbID) { if ($this_failed_analysis->analysis_id == $analysis->dbID) {
...@@ -710,10 +710,14 @@ sub schedule_workers { ...@@ -710,10 +710,14 @@ sub schedule_workers {
my ($self, $filter_analysis, $orig_pending_by_rc_name, $available_submit_limit) = @_; my ($self, $filter_analysis, $orig_pending_by_rc_name, $available_submit_limit) = @_;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor; my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $clearly_needed_analyses = $statsDBA->fetch_by_needed_workers(); my $clearly_needed_analyses = $statsDBA->fetch_by_needed_workers_rc_id();
my $potentially_needed_analyses = $statsDBA->fetch_by_statuses(['LOADING', 'BLOCKED', 'ALL_CLAIMED']); my $potentially_needed_analyses = $statsDBA->fetch_by_statuses_rc_id(['LOADING', 'BLOCKED', 'ALL_CLAIMED']);
my @all_analyses = (@$clearly_needed_analyses, @$potentially_needed_analyses); my @all_analyses = (@$clearly_needed_analyses, @$potentially_needed_analyses);
my $analysis_id2rc_id = $self->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
my $rc_id2name = $self->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name(); my $rc_id2name = $self->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
# combined mapping:
my %analysis_id2rc_name = map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id;
return {} unless(@all_analyses); return {} unless(@all_analyses);
...@@ -756,7 +760,7 @@ sub schedule_workers { ...@@ -756,7 +760,7 @@ sub schedule_workers {
$available_load -= 1.0*$workers_this_analysis/$hive_capacity; $available_load -= 1.0*$workers_this_analysis/$hive_capacity;
} }
my $curr_rc_name = $rc_id2name->{ $analysis_stats->resource_class_id }; my $curr_rc_name = $analysis_id2rc_name{ $analysis_stats->analysis_id };
if($pending_by_rc_name{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available if($pending_by_rc_name{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available
my $pending_this_analysis = ($pending_by_rc_name{ $curr_rc_name } < $workers_this_analysis) ? $pending_by_rc_name{ $curr_rc_name } : $workers_this_analysis; my $pending_this_analysis = ($pending_by_rc_name{ $curr_rc_name } < $workers_this_analysis) ? $pending_by_rc_name{ $curr_rc_name } : $workers_this_analysis;
...@@ -927,11 +931,11 @@ sub _pick_best_analysis_for_new_worker { ...@@ -927,11 +931,11 @@ sub _pick_best_analysis_for_new_worker {
my $statsDBA = $self->db->get_AnalysisStatsAdaptor; my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
return undef unless($statsDBA); return undef unless($statsDBA);
my ($stats) = @{$statsDBA->fetch_by_needed_workers(1, $rc_id)}; my ($stats) = @{$statsDBA->fetch_by_needed_workers_rc_id(1, $rc_id)};
if($stats) { if($stats) {
#synchronize and double check that it can be run #synchronize and double check that it can be run
$self->safe_synchronize_AnalysisStats($stats); $self->safe_synchronize_AnalysisStats($stats);
return $stats if(($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0) and (!defined($rc_id) or ($stats->resource_class_id == $rc_id))); return $stats if( ($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0) );
} }
# ok so no analyses 'need' workers with the given $rc_id. # ok so no analyses 'need' workers with the given $rc_id.
...@@ -942,15 +946,15 @@ sub _pick_best_analysis_for_new_worker { ...@@ -942,15 +946,15 @@ sub _pick_best_analysis_for_new_worker {
# see if any analysis needs an update, in case there are hidden jobs that haven't made it into the summary stats: # see if any analysis needs an update, in case there are hidden jobs that haven't made it into the summary stats:
print("QUEEN: no obvious needed workers, need to dig deeper\n"); print("QUEEN: no obvious needed workers, need to dig deeper\n");
my $stats_list = $statsDBA->fetch_by_statuses(['LOADING', 'BLOCKED', 'ALL_CLAIMED'], $rc_id); my $stats_list = $statsDBA->fetch_by_statuses_rc_id(['LOADING', 'BLOCKED', 'ALL_CLAIMED'], $rc_id);
foreach $stats (@$stats_list) { foreach $stats (@$stats_list) {
$self->safe_synchronize_AnalysisStats($stats); $self->safe_synchronize_AnalysisStats($stats);
return $stats if(($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0) and (!defined($rc_id) or ($stats->resource_class_id == $rc_id))); return $stats if( ($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0) );
} }
# does the following really ever help? # does the following really ever help?
($stats) = @{$statsDBA->fetch_by_needed_workers(1, $rc_id)}; ($stats) = @{$statsDBA->fetch_by_needed_workers_rc_id(1, $rc_id)};
return $stats; return $stats;
} }
......
...@@ -59,7 +59,6 @@ sub main { ...@@ -59,7 +59,6 @@ sub main {
min(mem), avg(mem), max(mem), min(mem), avg(mem), max(mem),
min(swap), avg(swap), max(swap) min(swap), avg(swap), max(swap)
FROM analysis_base FROM analysis_base
JOIN analysis_stats USING(analysis_id)
JOIN resource_class rc USING(resource_class_id) JOIN resource_class rc USING(resource_class_id)
LEFT JOIN worker USING(analysis_id) LEFT JOIN worker USING(analysis_id)
LEFT JOIN lsf_report USING (process_id) LEFT JOIN lsf_report USING (process_id)
......
...@@ -17,6 +17,5 @@ ALTER TABLE job_message ADD FOREIGN KEY (job_id) REFE ...@@ -17,6 +17,5 @@ ALTER TABLE job_message ADD FOREIGN KEY (job_id) REFE
ALTER TABLE job_file ADD FOREIGN KEY (job_id) REFERENCES job(job_id); ALTER TABLE job_file ADD FOREIGN KEY (job_id) REFERENCES job(job_id);
ALTER TABLE resource_description ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id); ALTER TABLE resource_description ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE analysis_stats ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id); ALTER TABLE analysis_base ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE analysis_stats_monitor ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
# Move resource_class_id from analysis_stats and analysis_stats_monitor into analysis_base table.
# Do not bother deleting the original column from analysis_stats and analysis_stats_monitor
# because of the already established foreign key relationships.
ALTER TABLE analysis_base ADD COLUMN resource_class_id int(10) unsigned NOT NULL;
UPDATE analysis_base a JOIN analysis_stats s USING(analysis_id) SET a.resource_class_id=s.resource_class_id;
...@@ -44,6 +44,7 @@ CREATE TABLE analysis_base ( ...@@ -44,6 +44,7 @@ CREATE TABLE analysis_base (
logic_name VARCHAR(40) NOT NULL, logic_name VARCHAR(40) NOT NULL,
module VARCHAR(255), module VARCHAR(255),
parameters TEXT, parameters TEXT,
resource_class_id int(10) unsigned NOT NULL,
PRIMARY KEY (analysis_id), PRIMARY KEY (analysis_id),
UNIQUE KEY logic_name_idx (logic_name) UNIQUE KEY logic_name_idx (logic_name)
...@@ -346,7 +347,6 @@ CREATE TABLE analysis_stats ( ...@@ -346,7 +347,6 @@ CREATE TABLE analysis_stats (
num_required_workers int(10) NOT NULL, num_required_workers int(10) NOT NULL,
last_update datetime NOT NULL, last_update datetime NOT NULL,
sync_lock int(10) default 0 NOT NULL, sync_lock int(10) default 0 NOT NULL,
resource_class_id int(10) unsigned NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL, can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL, priority TINYINT DEFAULT 0 NOT NULL,
...@@ -379,7 +379,6 @@ CREATE TABLE analysis_stats_monitor ( ...@@ -379,7 +379,6 @@ CREATE TABLE analysis_stats_monitor (
num_required_workers int(10) NOT NULL, num_required_workers int(10) NOT NULL,
last_update datetime NOT NULL, last_update datetime NOT NULL,
sync_lock int(10) default 0 NOT NULL, sync_lock int(10) default 0 NOT NULL,
resource_class_id int(10) unsigned NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL, can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL priority TINYINT DEFAULT 0 NOT NULL
......
...@@ -38,7 +38,8 @@ CREATE TABLE IF NOT EXISTS analysis_base ( ...@@ -38,7 +38,8 @@ CREATE TABLE IF NOT EXISTS analysis_base (
analysis_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, analysis_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
logic_name VARCHAR(40) NOT NULL, logic_name VARCHAR(40) NOT NULL,
module VARCHAR(255), module VARCHAR(255),
parameters TEXT parameters TEXT,
resource_class_id INTEGER NOT NULL
); );
CREATE UNIQUE INDEX IF NOT EXISTS logic_name_idx ON analysis_base (logic_name); CREATE UNIQUE INDEX IF NOT EXISTS logic_name_idx ON analysis_base (logic_name);
...@@ -322,7 +323,6 @@ CREATE TABLE analysis_stats ( ...@@ -322,7 +323,6 @@ CREATE TABLE analysis_stats (
num_required_workers int(10) NOT NULL DEFAULT 0, num_required_workers int(10) NOT NULL DEFAULT 0,
last_update datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, last_update datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
sync_lock int(10) NOT NULL DEFAULT 0, sync_lock int(10) NOT NULL DEFAULT 0,
resource_class_id INTEGER NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL, can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL priority TINYINT DEFAULT 0 NOT NULL
); );
...@@ -352,7 +352,6 @@ CREATE TABLE analysis_stats_monitor ( ...@@ -352,7 +352,6 @@ CREATE TABLE analysis_stats_monitor (
num_required_workers int(10) NOT NULL DEFAULT 0, num_required_workers int(10) NOT NULL DEFAULT 0,
last_update datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, last_update datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
sync_lock int(10) default 0 NOT NULL, sync_lock int(10) default 0 NOT NULL,
resource_class_id INTEGER NOT NULL,
can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL, can_be_empty TINYINT UNSIGNED DEFAULT 0 NOT NULL,
priority TINYINT DEFAULT 0 NOT NULL priority TINYINT DEFAULT 0 NOT NULL
); );
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment