Commit a744dfb2 authored by Leo Gordon's avatar Leo Gordon
Browse files

replaced internal rc_id by rc_name in the Meadow code and in most of the Scheduler; needs testing

parent b972daaf
......@@ -72,16 +72,15 @@ sub pipeline_name { # if set, provides a filter for job-related queries
sub job_name_prefix {
my $self = shift @_;
return ($self->pipeline_name() ? $self->pipeline_name().'-' : '') . 'Hive';
return ($self->pipeline_name() ? $self->pipeline_name().'-' : '') . 'Hive-';
}
sub generate_job_name {
my ($self, $worker_count, $iteration, $rc_id) = @_;
$rc_id ||= 0;
my ($self, $worker_count, $iteration, $rc_name) = @_;
return $self->job_name_prefix()
."${rc_id}_${iteration}"
."${rc_name}-${iteration}"
. (($worker_count > 1) ? "[1-${worker_count}]" : '');
}
......
......@@ -87,7 +87,7 @@ sub kill_worker {
sub submit_workers {
my ($self, $worker_cmd, $worker_count, $iteration, $rc_id, $rc_parameters) = @_;
my ($self, $worker_cmd, $worker_count, $iteration, $rc_name, $rc_parameters) = @_;
my $cmd = "$worker_cmd &";
......
......@@ -35,21 +35,21 @@ sub get_current_worker_process_id {
}
sub count_pending_workers_by_rc_id {
sub count_pending_workers_by_rc_name {
my ($self) = @_;
my $jnp = $self->job_name_prefix();
my $cmd = qq{bjobs -w -J '${jnp}*' -u all 2>/dev/null | grep PEND};
my %pending_by_rc_id = ();
my %pending_by_rc_name = ();
foreach my $line (qx/$cmd/) {
if($line=~/Hive(\d+)/) { # FIXME: should be safer to match against $jnp instead of 'Hive'
$pending_by_rc_id{$1}++;
if($line=~/\b\Q$jnp\E(.+)\-\d+\b/) {
$pending_by_rc_name{$1}++;
}
}
return \%pending_by_rc_id;
return \%pending_by_rc_name;
}
......@@ -120,9 +120,9 @@ sub find_out_causes {
sub submit_workers {
my ($self, $worker_cmd, $worker_count, $iteration, $rc_id, $rc_parameters) = @_;
my ($self, $worker_cmd, $required_worker_count, $iteration, $rc_name, $rc_parameters) = @_;
my $job_name = $self->generate_job_name($worker_count, $iteration, $rc_id);
my $job_name = $self->generate_job_name($required_worker_count, $iteration, $rc_name);
my $submission_options = $self->config_get('SubmissionOptions');
$ENV{'LSB_STDOUT_DIRECT'} = 'y'; # unbuffer the output of the bsub command
......
......@@ -707,18 +707,19 @@ sub count_running_workers {
=cut
sub schedule_workers {
my ($self, $filter_analysis, $orig_pending_by_rc_id, $available_submit_limit) = @_;
my ($self, $filter_analysis, $orig_pending_by_rc_name, $available_submit_limit) = @_;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $clearly_needed_analyses = $statsDBA->fetch_by_needed_workers();
my $potentially_needed_analyses = $statsDBA->fetch_by_statuses(['LOADING', 'BLOCKED', 'ALL_CLAIMED']);
my @all_analyses = (@$clearly_needed_analyses, @$potentially_needed_analyses);
my $rc_id2name = $self->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
return {} unless(@all_analyses);
my %pending_by_rc_id = %{ $orig_pending_by_rc_id || {} };
my %pending_by_rc_name = %{ $orig_pending_by_rc_name || {} }; # NB: make our own copy to be able to modify it
my $total_workers_to_run = 0;
my %workers_to_run_by_rc_id = ();
my %workers_to_run_by_rc_name = ();
my $available_load = 1.0 - $self->get_hive_current_load();
......@@ -755,25 +756,26 @@ sub schedule_workers {
$available_load -= 1.0*$workers_this_analysis/$hive_capacity;
}
my $curr_rc_id = $analysis_stats->resource_class_id;
if($pending_by_rc_id{ $curr_rc_id }) { # per-rc_id capping by pending processes, if available
my $pending_this_analysis = ($pending_by_rc_id{ $curr_rc_id } < $workers_this_analysis) ? $pending_by_rc_id{ $curr_rc_id } : $workers_this_analysis;
my $curr_rc_name = $rc_id2name->{ $analysis_stats->resource_class_id };
print "Scheduler detected $pending_this_analysis pending workers with rc_id=$curr_rc_id\n";
$workers_this_analysis -= $pending_this_analysis;
$pending_by_rc_id{ $curr_rc_id } -= $pending_this_analysis;
if($pending_by_rc_name{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available
my $pending_this_analysis = ($pending_by_rc_name{ $curr_rc_name } < $workers_this_analysis) ? $pending_by_rc_name{ $curr_rc_name } : $workers_this_analysis;
print "Scheduler detected $pending_this_analysis pending workers with resource_class_name=$curr_rc_name\n";
$workers_this_analysis -= $pending_this_analysis;
$pending_by_rc_name{ $curr_rc_name } -= $pending_this_analysis;
}
next unless($workers_this_analysis); # do not autovivify the hash by a zero
$total_workers_to_run += $workers_this_analysis;
$workers_to_run_by_rc_id{ $curr_rc_id } += $workers_this_analysis;
$workers_to_run_by_rc_name{ $curr_rc_name } += $workers_this_analysis;
$analysis_stats->print_stats();
printf("Scheduler suggests adding %d more workers of resource_class_id=%d for analysis_id=%d [%1.3f hive_load remaining]\n", $workers_this_analysis, $curr_rc_id, $analysis_stats->analysis_id, $available_load);
printf("Scheduler suggests adding %d more workers of resource_class_name=%s for analysis_id=%d [%1.3f hive_load remaining]\n", $workers_this_analysis, $curr_rc_name, $analysis_stats->analysis_id, $available_load);
}
printf("Scheduler suggests adding a total of %d workers [%1.5f hive_load remaining]\n", $total_workers_to_run, $available_load);
return \%workers_to_run_by_rc_id;
return \%workers_to_run_by_rc_name;
}
......@@ -782,7 +784,7 @@ sub schedule_workers_resync_if_necessary {
my $meadow = $valley->get_current_meadow();
my $pending_by_rc_id = ($meadow->can('count_pending_workers_by_rc_id') and $meadow->config_get('PendingAdjust')) ? $meadow->count_pending_workers_by_rc_id() : {};
my $pending_by_rc_name = ($meadow->can('count_pending_workers_by_rc_name') and $meadow->config_get('PendingAdjust')) ? $meadow->count_pending_workers_by_rc_name() : {};
my $submit_limit = $meadow->config_get('SubmitWorkersMax');
my $meadow_limit = ($meadow->can('count_running_workers') and defined($meadow->config_get('TotalRunningWorkersMax'))) ? $meadow->config_get('TotalRunningWorkersMax') - $meadow->count_running_workers : undef;
......@@ -790,18 +792,18 @@ sub schedule_workers_resync_if_necessary {
? (($submit_limit<$meadow_limit) ? $submit_limit : $meadow_limit)
: (defined($submit_limit) ? $submit_limit : $meadow_limit);
my $workers_to_run_by_rc_id = $self->schedule_workers($analysis, $pending_by_rc_id, $available_submit_limit);
my $workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_by_rc_name, $available_submit_limit);
unless( keys %$workers_to_run_by_rc_id or $self->get_hive_current_load() or $self->count_running_workers() ) {
unless( keys %$workers_to_run_by_rc_name or $self->get_hive_current_load() or $self->count_running_workers() ) {
print "\nScheduler: nothing is running and nothing to do (according to analysis_stats) => executing garbage collection and sync\n" ;
$self->check_for_dead_workers($valley, 1);
$self->synchronize_hive($analysis);
$workers_to_run_by_rc_id = $self->schedule_workers($analysis, $pending_by_rc_id, $available_submit_limit);
$workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_by_rc_name, $available_submit_limit);
}
return $workers_to_run_by_rc_id;
return $workers_to_run_by_rc_name;
}
......
......@@ -255,7 +255,6 @@ sub main {
}
$queen->print_running_worker_counts;
# $queen->schedule_workers($analysis); # show what would be submitted, but do not actually submit
$queen->schedule_workers_resync_if_necessary($valley, $analysis); # show what would be submitted, but do not actually submit
$queen->get_remaining_jobs_show_hive_progress();
......@@ -322,10 +321,14 @@ sub run_autonomously {
my $current_meadow = $valley->get_current_meadow();
my $worker_cmd = generate_worker_cmd($self, $run_job_id);
my $rc_id2name = $self->{'dba'}->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
# pre-hash the resource_class xparams for future use:
my $rc_xparams = $self->{'dba'}->get_ResourceDescriptionAdaptor->fetch_by_meadow_type_HASHED_FROM_resource_class_id_TO_parameters($current_meadow->type());
# first, fetch two resource-related mappings from the database:
my $rc_name2id = $self->{'dba'}->get_ResourceClassAdaptor->fetch_HASHED_FROM_name_TO_resource_class_id();
my $rc_id2xparams = $self->{'dba'}->get_ResourceDescriptionAdaptor->fetch_by_meadow_type_HASHED_FROM_resource_class_id_TO_parameters($current_meadow->type());
# now, chain these two mappings together:
# FIXME: in future this mapping should be obtainable from the adaptor in one go.
my %rc_name2xparams = map { $_ => $rc_id2xparams->{ $rc_name2id->{ $_ }} } keys %$rc_name2id;
my $iteration=0;
my $num_of_remaining_jobs=0;
......@@ -345,17 +348,15 @@ sub run_autonomously {
$queen->print_analysis_status unless($self->{'no_analysis_stats'});
$queen->print_running_worker_counts;
my $workers_to_run_by_rc_id = $queen->schedule_workers_resync_if_necessary($valley, $this_analysis);
if(keys %$workers_to_run_by_rc_id) {
foreach my $rc_id ( sort { $workers_to_run_by_rc_id->{$a}<=>$workers_to_run_by_rc_id->{$b} } keys %$workers_to_run_by_rc_id) {
my $this_rc_worker_count = $workers_to_run_by_rc_id->{$rc_id};
my $workers_to_run_by_rc_name = $queen->schedule_workers_resync_if_necessary($valley, $this_analysis);
my $rc_name = $rc_id2name->{$rc_id};
if(keys %$workers_to_run_by_rc_name) {
foreach my $rc_name ( sort { $workers_to_run_by_rc_name->{$a}<=>$workers_to_run_by_rc_name->{$b} } keys %$workers_to_run_by_rc_name) {
my $this_rc_worker_count = $workers_to_run_by_rc_name->{$rc_name};
print "Submitting $this_rc_worker_count workers (rc_id=$rc_id, rc_name=$rc_name) to ".$current_meadow->toString()."\n";
print "Submitting $this_rc_worker_count workers (rc_name=$rc_name) to ".$current_meadow->toString()."\n";
$current_meadow->submit_workers("$worker_cmd -rc_name $rc_name", $this_rc_worker_count, $iteration, $rc_id, $rc_xparams->{$rc_id} || '');
$current_meadow->submit_workers("$worker_cmd -rc_name $rc_name", $this_rc_worker_count, $iteration, $rc_name, $rc_name2xparams{ $rc_name } || '');
}
} else {
print "Not submitting any workers this iteration\n";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment