Commit 4abd6675 authored by Leo Gordon's avatar Leo Gordon
Browse files

start using meadow_user for querying the LSF

parent 16738fe3
......@@ -93,41 +93,54 @@ sub count_pending_workers_by_rc_name {
sub count_running_workers {
my ($self) = @_;
my $self = shift @_;
my $meadow_users_of_interest = shift @_ || [ 'all' ];
my $jnp = $self->job_name_prefix();
my $cmd = "bjobs -w -J '${jnp}*' -u all 2>/dev/null | grep RUN | wc -l";
# warn "LSF::count_running_workers() running cmd:\n\t$cmd\n";
my $total_running_worker_count = 0;
foreach my $meadow_user (@$meadow_users_of_interest) {
my $cmd = "bjobs -w -J '${jnp}*' -u $meadow_user 2>/dev/null | grep RUN | wc -l";
# warn "LSF::count_running_workers() running cmd:\n\t$cmd\n";
my $run_count = qx/$cmd/;
chomp($run_count);
my $meadow_user_worker_count = qx/$cmd/;
chomp($meadow_user_worker_count);
return $run_count;
$total_running_worker_count += $meadow_user_worker_count;
}
return $total_running_worker_count;
}
sub status_of_all_our_workers { # returns a hashref
my ($self) = @_;
my $self = shift @_;
my $meadow_users_of_interest = shift @_ || [ 'all' ];
my $jnp = $self->job_name_prefix();
my $cmd = "bjobs -w -J '${jnp}*' -u all 2>/dev/null";
# warn "LSF::status_of_all_our_workers() running cmd:\n\t$cmd\n";
my %status_hash = ();
foreach my $line (`$cmd`) {
my ($group_pid, $user, $status, $queue, $submission_host, $running_host, $job_name) = split(/\s+/, $line);
next if(($group_pid eq 'JOBID') or ($status eq 'DONE') or ($status eq 'EXIT'));
foreach my $meadow_user (@$meadow_users_of_interest) {
my $cmd = "bjobs -w -J '${jnp}*' -u $meadow_user 2>/dev/null";
# warn "LSF::status_of_all_our_workers() running cmd:\n\t$cmd\n";
foreach my $line (`$cmd`) {
my ($group_pid, $user, $status, $queue, $submission_host, $running_host, $job_name) = split(/\s+/, $line);
my $worker_pid = $group_pid;
if($job_name=~/(\[\d+\])/) {
$worker_pid .= $1;
next if(($group_pid eq 'JOBID') or ($status eq 'DONE') or ($status eq 'EXIT'));
my $worker_pid = $group_pid;
if($job_name=~/(\[\d+\])/) {
$worker_pid .= $1;
}
$status_hash{$worker_pid} = $status;
}
$status_hash{$worker_pid} = $status;
}
return \%status_hash;
}
......
......@@ -319,6 +319,13 @@ sub register_worker_death {
}
sub meadow_users_of_running_workers {
my $self = shift @_;
return [ keys %{ $self->count_all("status!='DEAD'", ['meadow_user']) } ];
}
sub check_for_dead_workers { # scans the whole Valley for lost Workers (but ignores unreachable ones)
my ($self, $valley, $check_buried_in_haste) = @_;
......@@ -326,6 +333,7 @@ sub check_for_dead_workers { # scans the whole Valley for lost Workers (but i
my $last_few_seconds = 5; # FIXME: It is probably a good idea to expose this parameter for easier tuning.
my $queen_overdue_workers = $self->fetch_overdue_workers( $last_few_seconds ); # check the workers we have not seen active during the $last_few_seconds
my $meadow_users_of_interest = $self->meadow_users_of_running_workers(); # FIXME: we could make it very granular ( meadow_type->meadow_name->meadow_user->count )
my %mt_and_pid_to_worker_status = ();
my %worker_status_counts = ();
my %mt_and_pid_to_lost_worker = ();
......@@ -337,7 +345,7 @@ sub check_for_dead_workers { # scans the whole Valley for lost Workers (but i
my $meadow_type = $worker->meadow_type;
if(my $meadow = $valley->find_available_meadow_responsible_for_worker($worker)) {
$mt_and_pid_to_worker_status{$meadow_type} ||= $meadow->status_of_all_our_workers; # only run this once per reachable Meadow
$mt_and_pid_to_worker_status{$meadow_type} ||= $meadow->status_of_all_our_workers( $meadow_users_of_interest ); # only run this once per reachable Meadow
my $process_id = $worker->process_id;
if(my $status = $mt_and_pid_to_worker_status{$meadow_type}{$process_id}) { # can be RUN|PEND|xSUSP
......
......@@ -58,15 +58,16 @@ sub scheduler_say {
sub schedule_workers_resync_if_necessary {
my ($queen, $valley, $list_of_analyses) = @_;
my $submit_capacity = $valley->config_get('SubmitWorkersMax');
my $default_meadow_type = $valley->get_default_meadow()->type;
my $meadow_capacity_limiter_hashed_by_type = $valley->get_meadow_capacity_hash_by_meadow_type();
my $analysis_id2rc_id = $queen->db->get_AnalysisAdaptor->fetch_HASHED_FROM_analysis_id_TO_resource_class_id();
my $rc_id2name = $queen->db->get_ResourceClassAdaptor->fetch_HASHED_FROM_resource_class_id_TO_name();
my $meadow_users_of_interest = $queen->meadow_users_of_running_workers(); # FIXME: we could make it very granular ( meadow_type->meadow_name->meadow_user->count )
# combined mapping:
my $analysis_id2rc_name = { map { $_ => $rc_id2name->{ $analysis_id2rc_id->{ $_ }} } keys %$analysis_id2rc_id };
my $submit_capacity = $valley->config_get('SubmitWorkersMax');
my $default_meadow_type = $valley->get_default_meadow()->type;
my $meadow_capacity_limiter_hashed_by_type = $valley->get_meadow_capacity_hash_by_meadow_type( $meadow_users_of_interest );
my ($workers_to_submit_by_analysis, $workers_to_submit_by_meadow_type_rc_name, $total_extra_workers_required, $log_buffer)
= schedule_workers($queen, $submit_capacity, $default_meadow_type, $list_of_analyses, $meadow_capacity_limiter_hashed_by_type, $analysis_id2rc_name);
......@@ -76,7 +77,7 @@ sub schedule_workers_resync_if_necessary {
scheduler_say( "According to analysis_stats no workers are required... let's see if anything went out of sync." );
# FIXME: here is an (optimistic) assumption all Workers the DB knows about are reachable from the Valley:
if( $queen->db->get_RoleAdaptor->count_active_roles() != $valley->count_running_workers ) {
if( $queen->db->get_RoleAdaptor->count_active_roles() != $valley->aggregated_count_running_workers( $meadow_users_of_interest ) ) {
scheduler_say( "Mismatch between DB's active Roles and Valley's running Workers detected, checking for dead workers..." );
$queen->check_for_dead_workers($valley, 1);
}
......
......@@ -171,14 +171,14 @@ sub get_pending_worker_counts_by_meadow_type_rc_name {
sub get_meadow_capacity_hash_by_meadow_type {
my $self = shift @_;
my ($self, $meadow_users_of_interest) = @_;
my %meadow_capacity_hash = ();
foreach my $meadow (@{ $self->get_available_meadow_list }) {
my $available_worker_slots = defined($meadow->config_get('TotalRunningWorkersMax'))
? $meadow->config_get('TotalRunningWorkersMax') - $meadow->count_running_workers
? $meadow->config_get('TotalRunningWorkersMax') - $meadow->count_running_workers( $meadow_users_of_interest )
: undef;
# so the hash will contain limiters for every meadow_type, but not all of them active:
......@@ -189,13 +189,14 @@ sub get_meadow_capacity_hash_by_meadow_type {
}
sub count_running_workers { # just an aggregator
my $self = shift @_;
sub aggregated_count_running_workers { # just an aggregator
my ($self, $meadow_users_of_interest) = @_;
my $valley_running_workers = 0;
foreach my $meadow (@{ $self->get_available_meadow_list }) {
$valley_running_workers += $meadow->count_running_workers;
$valley_running_workers += $meadow->count_running_workers( $meadow_users_of_interest );
}
return $valley_running_workers;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment