Commit 531a160f authored by Leo Gordon's avatar Leo Gordon
Browse files

common denominator for schedule_workers and specialize_new_worker

parent 9f8199c0
......@@ -78,37 +78,28 @@ sub fetch_by_analysis_id {
}
sub fetch_by_needed_workers_rc_id {
my ($self, $limit, $resource_class_id) = @_;
my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')";
sub fetch_all_by_suitability_rc_id {
my ($self, $resource_class_id) = @_;
my $join = [[['analysis_base', 'a'], " ast.analysis_id=a.analysis_id ".( $resource_class_id ? "AND a.resource_class_id=$resource_class_id " : '') ]];
my $final_clause = 'ORDER BY a.priority DESC, '
.( ($self->dbc->driver eq 'sqlite') ? 'RANDOM()' : 'RAND()' )
.($limit ? " LIMIT $limit" : '');
$self->_final_clause($final_clause);
my $results = $self->_generic_fetch($constraint, $join);
$self->_final_clause(''); # reset final clause for other fetches
return $results;
}
sub fetch_by_statuses_rc_id {
my ($self, $statuses, $resource_class_id) = @_;
my $constraint = 'ast.status in ('.join(', ', map { "'$_'" } @$statuses).')';
my $join = $resource_class_id ? [[['analysis_base', 'a'], " ast.analysis_id=a.analysis_id AND a.resource_class_id=$resource_class_id"]] : [];
$self->_final_clause('ORDER BY last_update');
my $results = $self->_generic_fetch($constraint, $join);
$self->_final_clause(''); #reset final clause for other fetches
return $results;
# the ones that clearly have work to do:
#
my $primary_results = $self->_generic_fetch(
"ast.num_required_workers>0 AND ast.status in ('READY', 'WORKING')" ,
$join ,
'ORDER BY a.priority DESC, ' . ( ($self->dbc->driver eq 'sqlite') ? 'RANDOM()' : 'RAND()' ),
);
# the ones that may have work to do after a sync:
#
my $secondary_results = $self->_generic_fetch(
"ast.status in ('LOADING', 'BLOCKED', 'ALL_CLAIMED')" ,
$join ,
'ORDER BY last_update', # FIXME: could mix in a.priority if sync is not too expensive?
);
return [ @$primary_results, @$secondary_results ];
}
......@@ -342,7 +333,7 @@ sub increase_required_workers {
=cut
sub _generic_fetch {
my ($self, $constraint, $join) = @_;
my ($self, $constraint, $join, $final_clause) = @_;
my @tables = $self->_tables;
my $columns = join(', ', $self->_columns());
......@@ -371,7 +362,6 @@ sub _generic_fetch {
my $sql = "SELECT $columns FROM $tablenames";
my $default_where = $self->_default_where_clause;
my $final_clause = $self->_final_clause;
#append a where clause if it was defined
if($constraint) {
......@@ -384,7 +374,7 @@ sub _generic_fetch {
}
#append additional clauses which may have been defined
$sql .= " $final_clause";
$sql .= " $final_clause" if($final_clause);
#rint STDOUT $sql,"\n";
my $sth = $self->prepare($sql);
......@@ -479,13 +469,5 @@ sub _objs_from_sth {
}
sub _final_clause {
my $self = shift;
$self->{'_final_clause'} = shift if(@_);
$self->{'_final_clause'} = "" unless($self->{'_final_clause'});
return $self->{'_final_clause'};
}
1;
......@@ -185,7 +185,7 @@ sub create_new_worker {
$self->safe_synchronize_AnalysisStats($analysisStats);
#return undef unless(($analysisStats->status ne 'BLOCKED') and ($analysisStats->num_required_workers > 0));
} else {
if( $analysisStats = $self->_pick_best_analysis_for_new_worker($rc_id) ) {
if( $analysisStats = $self->specialize_new_worker($rc_id) ) {
print "Scheduler picked analysis_id=".$analysisStats->analysis_id()." for the worker\n";
} else {
print "Scheduler failed to pick analysis_id for the worker\n";
......@@ -729,17 +729,16 @@ sub count_running_workers {
sub schedule_workers {
my ($self, $filter_analysis, $orig_pending_by_rc_name, $available_submit_limit) = @_;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $clearly_needed_analyses = $statsDBA->fetch_by_needed_workers_rc_id();
my $potentially_needed_analyses = $statsDBA->fetch_by_statuses_rc_id(['LOADING', 'BLOCKED', 'ALL_CLAIMED']);
my @all_analyses = (@$clearly_needed_analyses, @$potentially_needed_analyses);
my @suitable_analyses = $filter_analysis
? ( $filter_analysis->stats )
: @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id() };
my $analysis_id2rc_id = $self->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
my $rc_id2name = $self->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
# combined mapping:
my %analysis_id2rc_name = map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id;
my $analysis_id2rc_id = $self->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
my $rc_id2name = $self->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
# combined mapping
my %analysis_id2rc_name = map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id;
return {} unless(@all_analyses);
return {} unless(@suitable_analyses);
my %pending_by_rc_name = %{ $orig_pending_by_rc_name || {} }; # NB: make our own copy to be able to modify it
my $total_workers_to_run = 0;
......@@ -747,10 +746,9 @@ sub schedule_workers {
my $available_load = 1.0 - $self->get_hive_current_load();
foreach my $analysis_stats (@all_analyses) {
foreach my $analysis_stats (@suitable_analyses) {
last if ($available_load <= 0.0);
last if (defined($available_submit_limit) and !$available_submit_limit);
next if (defined $filter_analysis && $filter_analysis->dbID != $analysis_stats->analysis_id);
#digging deeper under the surface so need to sync
if(($analysis_stats->status eq 'LOADING') or ($analysis_stats->status eq 'BLOCKED') or ($analysis_stats->status eq 'ALL_CLAIMED')) {
......@@ -941,38 +939,20 @@ sub register_all_workers_dead {
}
sub _pick_best_analysis_for_new_worker {
my $self = shift;
my $rc_id = shift; # this parameter will need to percolate very deep
sub specialize_new_worker {
my $self = shift;
my $rc_id = shift; # this parameter will percolate into fetch
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
return undef unless($statsDBA);
my @suitable_analyses = @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id( $rc_id ) };
my ($stats) = @{$statsDBA->fetch_by_needed_workers_rc_id(1, $rc_id)};
if($stats) {
#synchronize and double check that it can be run
$self->safe_synchronize_AnalysisStats($stats);
return $stats if( ($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0) );
}
foreach my $analysis_stats (@suitable_analyses) {
# ok so no analyses 'need' workers with the given $rc_id.
#synchronize and double check that it can be run:
$self->safe_synchronize_AnalysisStats($analysis_stats);
return $analysis_stats if( ($analysis_stats->status ne 'BLOCKED') and ($analysis_stats->num_required_workers > 0) );
}
if ($self->get_num_failed_analyses()) {
return undef;
}
# see if any analysis needs an update, in case there are hidden jobs that haven't made it into the summary stats:
print("QUEEN: no obvious needed workers, need to dig deeper\n");
my $stats_list = $statsDBA->fetch_by_statuses_rc_id(['LOADING', 'BLOCKED', 'ALL_CLAIMED'], $rc_id);
foreach $stats (@$stats_list) {
$self->safe_synchronize_AnalysisStats($stats);
return $stats if( ($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0) );
}
# does the following really ever help?
($stats) = @{$statsDBA->fetch_by_needed_workers_rc_id(1, $rc_id)};
return $stats;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment