Commit 63375cdb authored by Leo Gordon's avatar Leo Gordon
Browse files

aggregate meadow stats collection in the Valley

parent 98a33c67
......@@ -21,6 +21,13 @@ sub get_current_worker_process_id {
}
sub count_pending_workers_by_rc_name {
my ($self) = @_;
return ({}, 0); # LOCAL has no concept of pending workers
}
sub count_running_workers {
my $self = shift @_;
......
......@@ -39,17 +39,19 @@ sub count_pending_workers_by_rc_name {
my ($self) = @_;
my $jnp = $self->job_name_prefix();
my $cmd = qq{bjobs -w -J '${jnp}*' -u all 2>/dev/null | grep PEND};
my $cmd = "bjobs -w -J '${jnp}*' -u all 2>/dev/null | grep PEND";
my %pending_by_rc_name = ();
my %pending_this_meadow_by_rc_name = ();
my $total_pending_this_meadow = 0;
foreach my $line (qx/$cmd/) {
if($line=~/\b\Q$jnp\E(\S+)\-\d+(\[\d+\])?\b/) {
$pending_by_rc_name{$1}++;
$pending_this_meadow_by_rc_name{$1}++;
$total_pending_this_meadow++;
}
}
return \%pending_by_rc_name;
return (\%pending_this_meadow_by_rc_name, $total_pending_this_meadow);
}
......
......@@ -738,7 +738,7 @@ sub count_running_workers {
=cut
sub schedule_workers {
my ($self, $filter_analysis, $orig_pending_by_rc_name, $available_submit_limit) = @_;
my ($self, $filter_analysis, $orig_pending_this_meadow_by_rc_name, $available_submit_limit) = @_;
my @suitable_analyses = $filter_analysis
? ( $filter_analysis->stats )
......@@ -751,7 +751,7 @@ sub schedule_workers {
return {} unless(@suitable_analyses);
my %pending_by_rc_name = %{ $orig_pending_by_rc_name || {} }; # NB: make our own copy to be able to modify it
my %pending_this_meadow_by_rc_name = %{ $orig_pending_this_meadow_by_rc_name || {} }; # NB: make our own copy to be able to modify it
my $total_workers_to_run = 0;
my %workers_to_run_by_rc_name = ();
my $available_load = 1.0 - $self->get_hive_current_load();
......@@ -791,12 +791,12 @@ sub schedule_workers {
my $curr_rc_name = $analysis_id2rc_name{ $analysis_stats->analysis_id };
if($pending_by_rc_name{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available
my $pending_this_analysis = ($pending_by_rc_name{ $curr_rc_name } < $workers_this_analysis) ? $pending_by_rc_name{ $curr_rc_name } : $workers_this_analysis;
if($pending_this_meadow_by_rc_name{ $curr_rc_name }) { # per-rc_name capping by pending processes, if available
my $pending_this_analysis = ($pending_this_meadow_by_rc_name{ $curr_rc_name } < $workers_this_analysis) ? $pending_this_meadow_by_rc_name{ $curr_rc_name } : $workers_this_analysis;
print "Scheduler detected $pending_this_analysis pending workers with resource_class_name=$curr_rc_name\n";
$workers_this_analysis -= $pending_this_analysis;
$pending_by_rc_name{ $curr_rc_name } -= $pending_this_analysis;
$workers_this_analysis -= $pending_this_analysis;
$pending_this_meadow_by_rc_name{ $curr_rc_name } -= $pending_this_analysis;
}
next unless($workers_this_analysis); # do not autovivify the hash by a zero
......@@ -818,14 +818,17 @@ sub schedule_workers_resync_if_necessary {
my $submit_limit = $valley->config_get('SubmitWorkersMax');
my $meadow = $valley->get_current_meadow();
my $pending_by_rc_name = $meadow->can('count_pending_workers_by_rc_name') ? $meadow->count_pending_workers_by_rc_name() : {};
my $meadow_limit = ($meadow->can('count_running_workers') and defined($meadow->config_get('TotalRunningWorkersMax'))) ? $meadow->config_get('TotalRunningWorkersMax') - $meadow->count_running_workers : undef;
my $available_worker_slots_by_meadow_type = $valley->get_available_worker_slots_by_meadow_type();
my $meadow_limit = $available_worker_slots_by_meadow_type->{ $meadow->type };
my ($pending_worker_counts_by_meadow_type_rc_name, $total_pending_all_meadows) = $valley->get_pending_worker_counts_by_meadow_type_rc_name();
my $pending_this_meadow_by_rc_name = $pending_worker_counts_by_meadow_type_rc_name->{ $meadow->type };
my $available_submit_limit = ($submit_limit and $meadow_limit)
? (($submit_limit<$meadow_limit) ? $submit_limit : $meadow_limit)
: (defined($submit_limit) ? $submit_limit : $meadow_limit);
my $workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_by_rc_name, $available_submit_limit);
my $workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_this_meadow_by_rc_name, $available_submit_limit);
unless( keys %$workers_to_run_by_rc_name or $self->get_hive_current_load() or $self->count_running_workers() ) {
print "\nScheduler: nothing is running and nothing to do (according to analysis_stats) => executing garbage collection and sync\n" ;
......@@ -833,7 +836,7 @@ sub schedule_workers_resync_if_necessary {
$self->check_for_dead_workers($valley, 1);
$self->synchronize_hive($analysis);
$workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_by_rc_name, $available_submit_limit);
$workers_to_run_by_rc_name = $self->schedule_workers($analysis, $pending_this_meadow_by_rc_name, $available_submit_limit);
}
return $workers_to_run_by_rc_name;
......
......@@ -140,5 +140,36 @@ sub whereami {
}
sub get_pending_worker_counts_by_meadow_type_rc_name {
my $self = shift @_;
my %pending_counts = ();
my $total_pending_all_meadows = 0;
foreach my $meadow (@{ $self->get_available_meadow_list }) {
my ($pending_this_meadow_by_rc_name, $total_pending_this_meadow) = ($meadow->count_pending_workers_by_rc_name());
$pending_counts{ $meadow->type } = $pending_this_meadow_by_rc_name;
$total_pending_all_meadows += $total_pending_this_meadow;
}
return (\%pending_counts, $total_pending_all_meadows);
}
sub get_available_worker_slots_by_meadow_type {
my $self = shift @_;
my %available_worker_slots = ();
foreach my $meadow (@{ $self->get_available_meadow_list }) {
if( $meadow->can('count_running_workers') and defined($meadow->config_get('TotalRunningWorkersMax'))) {
$available_worker_slots{ $meadow->type } = $meadow->config_get('TotalRunningWorkersMax') - $meadow->count_running_workers;
}
}
return \%available_worker_slots;
}
1;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment