Commit 2c97e3cd authored by Leo Gordon's avatar Leo Gordon
Browse files

more robust checking for dead workers/jobs

parent a966931c
...@@ -38,13 +38,6 @@ sub responsible_for_worker { ...@@ -38,13 +38,6 @@ sub responsible_for_worker {
return $worker->beekeeper() eq $self->type(); return $worker->beekeeper() eq $self->type();
} }
sub status_of_all_my_workers { # Returns undef if it is not implemented in the derived class (which returns a hashref).
# You should check the return value and use other means if it is not defined.
my ($self) = @_;
return undef;
}
sub check_worker_is_alive { sub check_worker_is_alive {
my ($self, $worker) = @_; my ($self, $worker) = @_;
......
...@@ -23,6 +23,30 @@ sub responsible_for_worker { ...@@ -23,6 +23,30 @@ sub responsible_for_worker {
return ($worker->beekeeper() eq $self->type()) and ($worker->host eq hostname()); return ($worker->beekeeper() eq $self->type()) and ($worker->host eq hostname());
} }
sub status_of_all_my_workers { # returns a hashref
my ($self) = @_;
my $cmd = 'ps -o state,pid,cmd -w -w --no-header x | grep runWorker.pl';
# FIXME: if we want to incorporate Meadow->pipeline_name() filtering here,
# a dummy parameter to the runWorker.pl should probably be introduced
# for 'ps' to be able to externally differentiate between local workers
# working for different hives
# (but at the moment such a feature is unlikely to be be in demand).
my %status_hash = ();
foreach my $line (`$cmd`) {
my ($pre_status, $worker_pid, $job_name) = split(/\s+/, $line);
my $status = { 'R' => 'RUN', 'S' => 'RUN', 'D' => 'RUN', 'T' => 'SSUSP' }->{$pre_status};
# Note: you can locally 'kill -19' a worker to suspend it and 'kill -18' a worker to resume it
$status_hash{$worker_pid} = $status;
}
return \%status_hash;
}
sub check_worker_is_alive { sub check_worker_is_alive {
my ($self, $worker) = @_; my ($self, $worker) = @_;
......
...@@ -333,6 +333,9 @@ sub flow_output_job { ...@@ -333,6 +333,9 @@ sub flow_output_job {
# #
###################################### ######################################
# Note: asking for Queen->fetch_overdue_workers(0) essentially means
# "fetch all workers known to the Queen not to be officially dead"
#
sub fetch_overdue_workers { sub fetch_overdue_workers {
my ($self,$overdue_secs) = @_; my ($self,$overdue_secs) = @_;
...@@ -693,34 +696,7 @@ sub get_num_needed_workers { ...@@ -693,34 +696,7 @@ sub get_num_needed_workers {
return $numWorkers; return $numWorkers;
} }
sub get_needed_workers_failed_analyses_resync_if_necessary { sub get_hive_progress {
my ($self, $this_analysis) = @_;
my $runCount = $self->get_num_running_workers();
my $load = $self->get_hive_current_load();
my $worker_count = $self->get_num_needed_workers($this_analysis);
my $failed_analyses = $self->get_num_failed_analyses($this_analysis);
if($load==0 and $worker_count==0 and $runCount==0) {
print "*** nothing is running and nothing to do => perform a hard resync\n" ;
$self->synchronize_hive($this_analysis);
$worker_count = $self->get_num_needed_workers($this_analysis);
$failed_analyses = $self->get_num_failed_analyses($this_analysis);
if($worker_count==0) {
if($failed_analyses==0) {
print "Nothing left to do".($this_analysis ? (' for analysis '.$this_analysis->logic_name) : '').". DONE!!\n\n";
}
}
}
return ($worker_count, $failed_analyses);
}
sub get_hive_progress
{
my $self = shift; my $self = shift;
my $sql = "SELECT sum(done_job_count), sum(failed_job_count), sum(total_job_count), ". my $sql = "SELECT sum(done_job_count), sum(failed_job_count), sum(total_job_count), ".
"sum(unclaimed_job_count * analysis_stats.avg_msec_per_job)/1000/60/60 ". "sum(unclaimed_job_count * analysis_stats.avg_msec_per_job)/1000/60/60 ".
...@@ -729,11 +705,13 @@ sub get_hive_progress ...@@ -729,11 +705,13 @@ sub get_hive_progress
$sth->execute(); $sth->execute();
my ($done, $failed, $total, $cpuhrs) = $sth->fetchrow_array(); my ($done, $failed, $total, $cpuhrs) = $sth->fetchrow_array();
$sth->finish; $sth->finish;
$done=0 unless($done);
$failed=0 unless($failed); $done ||= 0;
$total=0 unless($total); $failed ||= 0;
my $completed=0.0; $total ||= 0;
$completed = ((100.0 * ($done+$failed))/$total) if($total>0); my $completed = $total
? ((100.0 * ($done+$failed))/$total)
: 0.0;
my $remaining = $total - $done - $failed; my $remaining = $total - $done - $failed;
printf("hive %1.3f%% complete (< %1.3f CPU_hrs) (%d todo + %d done + %d failed = %d total)\n", printf("hive %1.3f%% complete (< %1.3f CPU_hrs) (%d todo + %d done + %d failed = %d total)\n",
$completed, $cpuhrs, $remaining, $done, $failed, $total); $completed, $cpuhrs, $remaining, $done, $failed, $total);
......
...@@ -56,7 +56,7 @@ sub main { ...@@ -56,7 +56,7 @@ sub main {
my $reset_all_jobs_for_analysis = 0; my $reset_all_jobs_for_analysis = 0;
$self->{'sleep_minutes'} = 2; $self->{'sleep_minutes'} = 2;
$self->{'overdue_minutes'} = 60; # which means one hour # $self->{'overdue_minutes'} = 60; # which means one hour
$self->{'verbose_stats'} = 1; $self->{'verbose_stats'} = 1;
$self->{'reg_name'} = 'hive'; $self->{'reg_name'} = 'hive';
$self->{'maximise_concurrency'} = 0; $self->{'maximise_concurrency'} = 0;
...@@ -99,7 +99,7 @@ sub main { ...@@ -99,7 +99,7 @@ sub main {
'sync' => \$sync, 'sync' => \$sync,
'dead' => \$check_for_dead, 'dead' => \$check_for_dead,
'killworker=i' => \$kill_worker_id, 'killworker=i' => \$kill_worker_id,
'overdue' => \$self->{'overdue_minutes'}, # 'overdue' => \$self->{'overdue_minutes'},
'alldead' => \$all_dead, 'alldead' => \$all_dead,
'no_analysis_stats' => \$self->{'no_analysis_stats'}, 'no_analysis_stats' => \$self->{'no_analysis_stats'},
'verbose_stats=i' => \$self->{'verbose_stats'}, 'verbose_stats=i' => \$self->{'verbose_stats'},
...@@ -270,8 +270,8 @@ sub usage { ...@@ -270,8 +270,8 @@ sub usage {
print " -maximise_concurrency 1 : try to run more different analyses at the same time\n"; print " -maximise_concurrency 1 : try to run more different analyses at the same time\n";
print "\n===============[other commands/options]==================\n"; print "\n===============[other commands/options]==================\n";
print " -dead : clean overdue jobs for resubmission\n"; print " -dead : clean dead jobs for resubmission\n";
print " -overdue <min> : worker overdue minutes checking if dead\n"; # print " -overdue <min> : worker overdue minutes checking if dead\n";
print " -alldead : all outstanding workers\n"; print " -alldead : all outstanding workers\n";
print " -no_analysis_stats : don't show status of each analysis\n"; print " -no_analysis_stats : don't show status of each analysis\n";
print " -worker_stats : show status of each running worker\n"; print " -worker_stats : show status of each running worker\n";
...@@ -302,10 +302,6 @@ sub parse_conf { ...@@ -302,10 +302,6 @@ sub parse_conf {
sub check_for_dead_workers { sub check_for_dead_workers {
my ($self, $queen, $check_buried_in_haste) = @_; my ($self, $queen, $check_buried_in_haste) = @_;
unless($self->{'meadow'}->can('status_of_all_my_workers')) {
return check_for_dead_workers_slow($queen);
}
my $worker_status_hash = $self->{'meadow'}->status_of_all_my_workers(); my $worker_status_hash = $self->{'meadow'}->status_of_all_my_workers();
my %worker_status_summary = (); my %worker_status_summary = ();
my $queen_worker_list = $queen->fetch_overdue_workers(0); my $queen_worker_list = $queen->fetch_overdue_workers(0);
...@@ -313,6 +309,8 @@ sub check_for_dead_workers { ...@@ -313,6 +309,8 @@ sub check_for_dead_workers {
print "====== Live workers according to Queen:".scalar(@$queen_worker_list).", Meadow:".scalar(keys %$worker_status_hash)."\n"; print "====== Live workers according to Queen:".scalar(@$queen_worker_list).", Meadow:".scalar(keys %$worker_status_hash)."\n";
foreach my $worker (@$queen_worker_list) { foreach my $worker (@$queen_worker_list) {
next unless($self->{'meadow'}->responsible_for_worker($worker));
my $worker_pid = $worker->process_id(); my $worker_pid = $worker->process_id();
if(my $status = $worker_status_hash->{$worker_pid}) { # can be RUN|PEND|xSUSP if(my $status = $worker_status_hash->{$worker_pid}) { # can be RUN|PEND|xSUSP
$worker_status_summary{$status}++; $worker_status_summary{$status}++;
...@@ -340,31 +338,6 @@ sub check_for_dead_workers { ...@@ -340,31 +338,6 @@ sub check_for_dead_workers {
} }
} }
sub check_for_dead_workers_slow {
my ($self, $queen) = @_;
my $overdue_minutes = $self->{'overdue_minutes'};
print("===== check for dead workers\n");
my $overdueWorkers = $queen->fetch_overdue_workers($overdue_minutes*60);
print(scalar(@{$overdueWorkers}), " overdue workers\n");
foreach my $worker (@{$overdueWorkers}) {
next unless($self->{'meadow'}->responsible_for_worker($worker));
printf("%10d %35s %15s %20s(%d) : ",
$worker->hive_id, $worker->host, $worker->process_id,
$worker->analysis->logic_name, $worker->analysis->dbID);
if( $self->{'meadow'}->check_worker_is_alive($worker) ) {
print("ALIVE and running\n");
} else {
print("worker is missing => it DIED!!\n");
$queen->register_worker_death($worker);
}
}
}
# --------------[worker reports]-------------------- # --------------[worker reports]--------------------
sub show_given_workers { sub show_given_workers {
...@@ -390,13 +363,6 @@ sub show_running_workers { ...@@ -390,13 +363,6 @@ sub show_running_workers {
show_given_workers($self, $queen->fetch_overdue_workers(0), $queen->{'verbose_stats'}); show_given_workers($self, $queen->fetch_overdue_workers(0), $queen->{'verbose_stats'});
} }
sub show_overdue_workers { # does not seem to be used
my ($self, $queen, $overdue_minutes) = @_;
print("===== overdue workers\n");
show_given_workers($self, $queen->fetch_overdue_workers($overdue_minutes*60), $queen->{'verbose_stats'});
}
sub show_failed_workers { # does not seem to be used sub show_failed_workers { # does not seem to be used
my ($self, $queen) = @_; my ($self, $queen) = @_;
...@@ -427,6 +393,34 @@ sub generate_worker_cmd { ...@@ -427,6 +393,34 @@ sub generate_worker_cmd {
return $worker_cmd; return $worker_cmd;
} }
sub get_needed_workers_failed_analyses_resync_if_necessary {
my ($self, $queen, $this_analysis) = @_;
my $runCount = $queen->get_num_running_workers();
my $load = $queen->get_hive_current_load();
my $worker_count = $queen->get_num_needed_workers($this_analysis);
my $failed_analyses = $queen->get_num_failed_analyses($this_analysis);
if($load==0 and $worker_count==0 and $runCount==0) {
print "*** nothing is running and nothing to do (according to analysis_stats) => perform a hard resync\n" ;
$queen->synchronize_hive($this_analysis);
check_for_dead_workers($self, $queen, 1);
$worker_count = $queen->get_num_needed_workers($this_analysis);
$failed_analyses = $queen->get_num_failed_analyses($this_analysis);
if($worker_count==0) {
if($failed_analyses==0) {
print "Nothing left to do".($this_analysis ? (' for analysis '.$this_analysis->logic_name) : '').". DONE!!\n\n";
}
}
}
return ($worker_count, $failed_analyses);
}
sub run_autonomously { sub run_autonomously {
my ($self, $max_loops, $queen, $this_analysis) = @_; my ($self, $max_loops, $queen, $this_analysis) = @_;
...@@ -457,7 +451,7 @@ sub run_autonomously { ...@@ -457,7 +451,7 @@ sub run_autonomously {
#show_failed_workers($self, $queen); #show_failed_workers($self, $queen);
my $worker_count; my $worker_count;
($worker_count, $failed_analyses) = $queen->get_needed_workers_failed_analyses_resync_if_necessary($this_analysis); ($worker_count, $failed_analyses) = get_needed_workers_failed_analyses_resync_if_necessary($self, $queen, $this_analysis);
if($self->{'run_job_id'}) { # If it's just one job, we don't require more than one worker if($self->{'run_job_id'}) { # If it's just one job, we don't require more than one worker
# (and we probably do not care about the limits) # (and we probably do not care about the limits)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment