Commit 7d55c42a authored by Leo Gordon's avatar Leo Gordon
Browse files

testing resource requirements

parent 5bdd25c8
......@@ -245,6 +245,13 @@ sub sync_lock {
return $self->{'_sync_lock'};
}
sub rc_id {
my $self = shift;
$self->{'_rc_id'} = shift if(@_);
return $self->{'_rc_id'};
}
sub determine_status {
my $self = shift;
......
......@@ -91,9 +91,10 @@ sub fetch_all {
sub fetch_by_needed_workers {
my ($self, $limit, $maximise_concurrency) = @_;
my ($self, $limit, $maximise_concurrency, $rc_id) = @_;
my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')";
my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')"
.(defined($rc_id) ? " AND ast.rc_id = $rc_id" : '');
my $final_clause = 'ORDER BY num_running_workers'
.($maximise_concurrency ? '' : ' DESC')
......@@ -108,22 +109,15 @@ sub fetch_by_needed_workers {
}
sub fetch_by_status {
my $self = shift;
sub fetch_by_statuses {
my ($self, $statuses, $rc_id) = @_;
my $constraint = "ast.status in (";
my $addComma;
while(@_) {
my $status = shift;
$constraint .= ',' if($addComma);
$constraint .= "'$status' ";
$addComma = 1;
}
$constraint .= ")";
my $constraint = 'ast.status in ('.join(', ', map { "'$_'" } @$statuses).')'
.(defined($rc_id) ? " AND ast.rc_id = $rc_id" : '');
$self->_final_clause("ORDER BY last_update");
$self->_final_clause('ORDER BY last_update');
my $results = $self->_generic_fetch($constraint);
$self->_final_clause(""); #reset final clause for other fetches
$self->_final_clause(''); #reset final clause for other fetches
return $results;
}
......@@ -216,6 +210,7 @@ sub update {
$sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",last_update=NOW()";
$sql .= ",sync_lock='0'";
$sql .= ",rc_id=". $stats->rc_id();
$sql .= " WHERE analysis_id='".$stats->analysis_id."' ";
my $sth = $self->prepare($sql);
......@@ -457,6 +452,7 @@ sub _columns {
ast.num_required_workers
ast.last_update
ast.sync_lock
ast.rc_id
);
push @columns , "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
return @columns;
......@@ -476,6 +472,7 @@ sub _objs_from_sth {
$analStats->analysis_id($column{'analysis_id'});
$analStats->status($column{'status'});
$analStats->sync_lock($column{'sync_lock'});
$analStats->rc_id($column{'rc_id'});
$analStats->batch_size($column{'batch_size'});
$analStats->avg_msec_per_job($column{'avg_msec_per_job'});
$analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
......@@ -530,4 +527,3 @@ sub _create_new_for_analysis_id {
1;
......@@ -50,18 +50,20 @@ sub get_Queen {
return $self->get_QueenAdaptor();
}
sub get_available_adaptors{
sub get_available_adaptors {
my %pairs = (
"MetaContainer" => 'Bio::EnsEMBL::DBSQL::MetaContainer',
"Analysis" => "Bio::EnsEMBL::DBSQL::AnalysisAdaptor",
"Queen" => "Bio::EnsEMBL::Hive::Queen",
"AnalysisJob" => "Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor",
"AnalysisData" => "Bio::EnsEMBL::Hive::DBSQL::AnalysisDataAdaptor",
"AnalysisStats" => "Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor",
"AnalysisCtrlRule" => "Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor",
"DataflowRule" => "Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor");
return (\%pairs);
my %pairs = (
'MetaContainer' => 'Bio::EnsEMBL::DBSQL::MetaContainer',
'Analysis' => 'Bio::EnsEMBL::DBSQL::AnalysisAdaptor',
'Queen' => 'Bio::EnsEMBL::Hive::Queen',
'AnalysisJob' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor',
'AnalysisData' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisDataAdaptor',
'AnalysisStats' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor',
'AnalysisCtrlRule' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor',
'DataflowRule' => 'Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor',
'ResourceDescription' => 'Bio::EnsEMBL::Hive::DBSQL::ResourceDescriptionAdaptor',
);
return (\%pairs);
}
1;
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::DBSQL::ResourceDescriptionAdaptor
=head1 SYNOPSIS
$resource_description_adaptor = $db_adaptor->get_ResourceDescriptionAdaptor;
$resource_description_adaptor = $resource_description_object->adaptor;
=head1 DESCRIPTION
Module to encapsulate all db access for persistent class ResourceDescription.
There should be just one per application and database connection.
=head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut
package Bio::EnsEMBL::Hive::DBSQL::ResourceDescriptionAdaptor;
use strict;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::ResourceDescription;
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
sub fetch_by_rcid_meadowtype {
my ($self, $rc_id, $meadow_type) = @_;
my $constraint = "rd.rc_id = $rc_id AND rd.meadow_type = '$meadow_type'";
my ($rd) = @{ $self->_generic_fetch($constraint) };
return $rd; # returns one object or undef
}
sub fetch_all_by_meadowtype {
my ($self, $meadow_type) = @_;
my $constraint = "rd.meadow_type = '$meadow_type'";
return $self->_generic_fetch($constraint); # returns an arrayref
}
sub store {
my ( $self, $rd ) = @_;
my $sth = $self->prepare( q{REPLACE INTO resource_description (rc_id, meadow_type, parameters, description) VALUES (?,?,?,?) } );
$sth->execute($rd->rc_id, $rd->meadow_type, $rd->parameters, $rd->description);
$sth->finish();
return $rd;
}
sub remove {
my ( $self, $rd ) = @_;
my $sth = $self->prepare( q{DELETE FROM resource_description WHERE rc_id = ? AND meadow_type = ?} );
$sth->execute($rd->rc_id, $rd->meadow_type);
$sth->finish();
}
sub create_new {
my $self = shift @_;
my $rd = Bio::EnsEMBL::Hive::ResourceDescription->new(@_, -ADAPTOR => $self);
return $self->store($rd);
}
############################
#
# INTERNAL METHODS
# (pseudo subclass methods)
#
############################
#internal method used in multiple calls above to build objects from table data
sub _tables {
my $self = shift;
return (['resource_description', 'rd']);
}
sub _columns {
my $self = shift;
return qw (rd.rc_id
rd.meadow_type
rd.parameters
rd.description
);
}
sub _objs_from_sth {
my ($self, $sth) = @_;
my @rds = ();
while(my ($rc_id, $meadow_type, $parameters, $description) = $sth->fetchrow_array) {
my $rd = Bio::EnsEMBL::Hive::ResourceDescription->new(
-ADAPTOR => $self,
-RC_ID => $rc_id,
-MEADOW_TYPE => $meadow_type,
-PARAMETERS => $parameters,
-DESCRIPTION => $description,
);
push @rds, $rd;
}
return \@rds;
}
sub _default_where_clause {
my $self = shift;
return '';
}
sub _final_clause {
my $self = shift;
return '';
}
###############################################################################
#
# General access methods that could be moved
# into a superclass
#
###############################################################################
sub fetch_all {
my $self = shift;
return $self->_generic_fetch();
}
sub _generic_fetch {
my ($self, $constraint, $join) = @_;
my @tables = $self->_tables;
my $columns = join(', ', $self->_columns());
if ($join) {
foreach my $single_join (@{$join}) {
my ($tablename, $condition, $extra_columns) = @{$single_join};
if ($tablename && $condition) {
push @tables, $tablename;
if($constraint) {
$constraint .= " AND $condition";
} else {
$constraint = " $condition";
}
}
if ($extra_columns) {
$columns .= ", " . join(', ', @{$extra_columns});
}
}
}
#construct a nice table string like 'table1 t1, table2 t2'
my $tablenames = join(', ', map({ join(' ', @$_) } @tables));
my $sql = "SELECT $columns FROM $tablenames";
my $default_where = $self->_default_where_clause;
my $final_clause = $self->_final_clause;
#append a where clause if it was defined
if($constraint) {
$sql .= " WHERE $constraint ";
if($default_where) {
$sql .= " AND $default_where ";
}
} elsif($default_where) {
$sql .= " WHERE $default_where ";
}
#append additional clauses which may have been defined
$sql .= " $final_clause";
my $sth = $self->prepare($sql);
$sth->execute;
# print STDERR $sql,"\n";
return $self->_objs_from_sth($sth);
}
1;
......@@ -26,10 +26,12 @@ sub pipeline_name { # if set, provides a filter for job-related queries
}
sub generate_job_name {
my ($self, $worker_count, $iteration) = @_;
my ($self, $worker_count, $iteration, $rc_id) = @_;
$rc_id ||= 0;
return ($self->pipeline_name() ? $self->pipeline_name().'-' : '').'HL'.$iteration
. (($worker_count > 1) ? "[1-${worker_count}]" : '');
return ($self->pipeline_name() ? $self->pipeline_name().'-' : '')
."HL${iteration}_${rc_id}"
. (($worker_count > 1) ? "[1-${worker_count}]" : '');
}
sub responsible_for_worker {
......
......@@ -77,9 +77,9 @@ sub kill_worker {
}
sub submit_workers {
my ($self, $worker_cmd, $worker_count, $iteration) = @_;
my ($self, $iteration, $worker_cmd, $worker_count, $rc_id, $rc_parameters) = @_;
my $cmd = "$worker_cmd &";
my $cmd = "$worker_cmd -rc_id $rc_id &"; # $rc_parameters are ignored for the time being
foreach (1..$worker_count) {
print "SUBMITTING_CMD:\t\t$cmd\n";
......
......@@ -89,12 +89,12 @@ sub lsf_options {
}
sub submit_workers {
my ($self, $worker_cmd, $worker_count, $iteration) = @_;
my ($self, $iteration, $worker_cmd, $worker_count, $rc_id, $rc_parameters) = @_;
my $job_name = $self->generate_job_name($worker_count, $iteration);
my $job_name = $self->generate_job_name($worker_count, $iteration, $rc_id);
my $lsf_options = $self->lsf_options();
my $cmd = "bsub -o /dev/null -J\"${job_name}\" $lsf_options $worker_cmd";
my $cmd = "bsub -o /dev/null -J\"${job_name}\" $rc_parameters $lsf_options $worker_cmd -rc_id $rc_id";
print "SUBMITTING_CMD:\t\t$cmd\n";
system($cmd);
......
......@@ -97,8 +97,8 @@ our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
sub create_new_worker {
my ($self, @args) = @_;
my ($analysis_id, $beekeeper ,$pid, $job, $no_write) =
rearrange([qw(analysis_id beekeeper process_id job no_write) ], @args);
my ($rc_id, $analysis_id, $beekeeper ,$pid, $job, $no_write) =
rearrange([qw(rc_id analysis_id beekeeper process_id job no_write) ], @args);
my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor;
return undef unless($analStatsDBA);
......@@ -111,7 +111,7 @@ sub create_new_worker {
$self->safe_synchronize_AnalysisStats($analysisStats);
#return undef unless(($analysisStats->status ne 'BLOCKED') and ($analysisStats->num_required_workers > 0));
} else {
$analysisStats = $self->_pick_best_analysis_for_new_worker;
$analysisStats = $self->_pick_best_analysis_for_new_worker($rc_id);
}
return undef unless($analysisStats);
......@@ -159,7 +159,6 @@ sub create_new_worker {
return $worker;
}
sub register_worker_death {
my ($self, $worker) = @_;
......@@ -199,6 +198,44 @@ sub register_worker_death {
}
sub check_for_dead_workers {
my ($self, $meadow, $check_buried_in_haste) = @_;
my $worker_status_hash = $meadow->status_of_all_my_workers();
my %worker_status_summary = ();
my $queen_worker_list = $self->fetch_overdue_workers(0);
print "====== Live workers according to Queen:".scalar(@$queen_worker_list).", Meadow:".scalar(keys %$worker_status_hash)."\n";
foreach my $worker (@$queen_worker_list) {
next unless($meadow->responsible_for_worker($worker));
my $worker_pid = $worker->process_id();
if(my $status = $worker_status_hash->{$worker_pid}) { # can be RUN|PEND|xSUSP
$worker_status_summary{$status}++;
} else {
$worker_status_summary{'AWOL'}++;
$self->register_worker_death($worker);
}
}
print "\t".join(', ', map { "$_:$worker_status_summary{$_}" } keys %worker_status_summary)."\n\n";
if($check_buried_in_haste) {
print "====== Checking for workers buried in haste... ";
my $buried_in_haste_list = $self->fetch_dead_workers_with_jobs();
if(my $bih_number = scalar(@$buried_in_haste_list)) {
print "$bih_number, reclaiming jobs.\n\n";
if($bih_number) {
my $job_adaptor = $self->db->get_AnalysisJobAdaptor();
foreach my $worker (@$buried_in_haste_list) {
$job_adaptor->reset_dead_jobs_for_worker($worker);
}
}
} else {
print "none\n";
}
}
}
sub worker_check_in {
my ($self, $worker) = @_;
......@@ -363,7 +400,7 @@ sub fetch_dead_workers_with_jobs {
=head2 synchronize_hive
Arg [1] : $this_analysis (optional)
Arg [1] : $filter_analysis (optional)
Example : $queen->synchronize_hive();
Description: Runs through all analyses in the system and synchronizes
the analysis_stats summary with the states in the analysis_job
......@@ -376,11 +413,11 @@ sub fetch_dead_workers_with_jobs {
sub synchronize_hive {
my $self = shift;
my $this_analysis = shift; # optional parameter
my $filter_analysis = shift; # optional parameter
my $start_time = time();
my $list_of_analyses = $this_analysis ? [$this_analysis] : $self->db->get_AnalysisAdaptor->fetch_all;
my $list_of_analyses = $filter_analysis ? [$filter_analysis] : $self->db->get_AnalysisAdaptor->fetch_all;
print STDERR "\nSynchronizing the hive (".scalar(@$list_of_analyses)." analyses this time):\n";
foreach my $analysis (@$list_of_analyses) {
......@@ -566,13 +603,11 @@ sub check_blocking_control_rules_for_AnalysisStats
}
sub get_num_failed_analyses
{
my $self = shift;
my $analysis = shift;
sub get_num_failed_analyses {
my ($self, $analysis) = @_;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $failed_analyses = $statsDBA->fetch_by_status('FAILED');
my $failed_analyses = $statsDBA->fetch_by_statuses(['FAILED']);
if ($analysis) {
foreach my $this_failed_analysis (@$failed_analyses) {
if ($this_failed_analysis->analysis_id == $analysis->dbID) {
......@@ -642,26 +677,25 @@ sub enter_status {
=cut
sub get_num_needed_workers {
my $self = shift;
my $analysis = shift;
my ($self, $filter_analysis) = @_;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $neededAnals = $statsDBA->fetch_by_needed_workers(undef,$self->{maximise_concurrency});
my $deeper_stats_list = $statsDBA->fetch_by_status('LOADING', 'BLOCKED');
push @$neededAnals, @$deeper_stats_list;
my $clearly_needed_analyses = $statsDBA->fetch_by_needed_workers(undef,$self->{maximise_concurrency});
my $potentially_needed_analyses = $statsDBA->fetch_by_statuses(['LOADING', 'BLOCKED']);
my @all_analyses = (@$clearly_needed_analyses, @$potentially_needed_analyses);
return 0 unless($neededAnals);
return 0 unless(@all_analyses);
my $availableLoad = 1.0 - $self->get_hive_current_load();
return 0 if($availableLoad <0.0);
my $available_load = 1.0 - $self->get_hive_current_load();
return 0 if($available_load <=0.0);
my $numWorkers = 0;
foreach my $analysis_stats (@{$neededAnals}) {
next if (defined $analysis && $analysis->dbID != $analysis_stats->analysis_id);
my $total_workers = 0;
my %rc2workers = ();
#$analysis_stats->print_stats();
foreach my $analysis_stats (@all_analyses) {
next if (defined $filter_analysis && $filter_analysis->dbID != $analysis_stats->analysis_id);
#digging deeper under the surface so need to sync
#digging deeper under the surface so need to sync
if(($analysis_stats->status eq 'LOADING') or ($analysis_stats->status eq 'BLOCKED')) {
$self->synchronize_AnalysisStats($analysis_stats);
}
......@@ -669,32 +703,56 @@ sub get_num_needed_workers {
next if($analysis_stats->status eq 'BLOCKED');
next if($analysis_stats->num_required_workers == 0);
my $thisLoad = 0.0;
if($analysis_stats->hive_capacity>0) {
$thisLoad = $analysis_stats->num_required_workers * (1/$analysis_stats->hive_capacity);
}
my $workers_this_analysis = $analysis_stats->num_required_workers;
if(($analysis_stats->hive_capacity<=0) or ($thisLoad < $availableLoad)) {
$numWorkers += $analysis_stats->num_required_workers;
$availableLoad -= $thisLoad;
$analysis_stats->print_stats();
printf(" %5d workers (%1.3f remaining-hive-load)\n", $numWorkers, $availableLoad);
} else {
my $workerCount = POSIX::ceil($availableLoad * $analysis_stats->hive_capacity);
$numWorkers += $workerCount;
$availableLoad -= $workerCount * (1/$analysis_stats->hive_capacity);
$analysis_stats->print_stats();
printf(" %5d workers (%1.3f remaining-hive-load) use only %3d workers\n", $numWorkers, $availableLoad, $workerCount);
last;
if($analysis_stats->hive_capacity > 0) { # if there is a limit, use it for cut-off
my $limit_workers_this_analysis = int($available_load * $analysis_stats->hive_capacity);
if($workers_this_analysis > $limit_workers_this_analysis) {
$workers_this_analysis = $limit_workers_this_analysis;
}
$available_load -= 1.0*$workers_this_analysis/$analysis_stats->hive_capacity;
}
last if($availableLoad <= 0.0);
$total_workers += $workers_this_analysis;
$rc2workers{$analysis_stats->rc_id} += $workers_this_analysis;
$analysis_stats->print_stats();
printf(" (%1.3f remaining-hive-load) use %3d workers of analysis_id=%d\n", $available_load, $workers_this_analysis, $analysis_stats->analysis_id);
last if($available_load <= 0.0);
}
printf("need $numWorkers workers (availLoad=%1.5f)\n", $availableLoad);
return $numWorkers;
printf("need a total of $total_workers workers (availLoad=%1.5f)\n", $available_load);
return ($total_workers, \%rc2workers);
}
sub get_hive_progress {
sub get_needed_workers_failed_analyses_resync_if_necessary {
my ($self, $meadow, $analysis) = @_;
my $load = $self->get_hive_current_load();
my $running_count = $self->get_num_running_workers();
my ($needed_count, $rc_hash) = $self->get_num_needed_workers($analysis);
if($load==0 and $needed_count==0 and $running_count==0) {
print "*** nothing is running and nothing to do (according to analysis_stats) => perform a hard resync\n" ;
$self->synchronize_hive($analysis);
$self->check_for_dead_workers($meadow, 1);
($needed_count, $rc_hash) = $self->get_num_needed_workers($analysis);
}
my $failed_analyses = $self->get_num_failed_analyses($analysis);
if($needed_count==0) {
if($failed_analyses==0) {
print "Nothing left to do".($analysis ? (' for analysis '.$analysis->logic_name) : '').". DONE!!\n\n";
}
}
return ($needed_count, $failed_analyses, $rc_hash);
}
sub get_remaining_jobs_show_hive_progress {
my $self = shift;
my $sql = "SELECT sum(done_job_count), sum(failed_job_count), sum(total_job_count), ".
"sum(unclaimed_job_count * analysis_stats.avg_msec_per_job)/1000/60/60 ".
......@@ -717,17 +775,17 @@ sub get_hive_progress {
}
sub print_hive_status {
my ($self, $this_analysis) = @_;
my ($self, $filter_analysis) = @_;