Commit 519c6e33 authored by Leo Gordon's avatar Leo Gordon
Browse files

a Valley-wide (potentially multi-meadow) garbage collector

parent 3f1b4576
......@@ -285,55 +285,73 @@ sub register_worker_death {
}
sub check_for_dead_workers { # a bit counter-intuitively only looks for current meadow's workers, not all of the dead workers.
sub check_for_dead_workers { # scans the whole Valley for lost Workers (but ignores unreachagle ones)
my ($self, $valley, $check_buried_in_haste) = @_;
my $meadow = $valley->get_current_meadow();
warn "GarbageCollector:\tChecking for lost Workers...\n";
my $worker_status_hash = $meadow->status_of_all_our_workers();
my %worker_status_summary = ();
my $queen_worker_list = $self->fetch_overdue_workers(0); # maybe it should return a {meadow->worker_count} hash instead?
my $queen_worker_list = $self->fetch_overdue_workers(0);
my %mt_and_pid_to_worker_status = ();
my %worker_status_counts = ();
my %mt_and_pid_to_lost_worker = ();
print "====== Live workers according to Queen:".scalar(@$queen_worker_list).", Meadow:".scalar(keys %$worker_status_hash)."\n";
my %gc_wpid_to_worker = ();
warn "GarbageCollector:\t[Queen:] we have ".scalar(@$queen_worker_list)." Workers alive.\n";
foreach my $worker (@$queen_worker_list) {
next unless($meadow->responsible_for_worker($worker));
my $process_id = $worker->process_id();
if(my $status = $worker_status_hash->{$process_id}) { # can be RUN|PEND|xSUSP
$worker_status_summary{$status}++;
my $meadow_type = $worker->meadow_type;
if(my $meadow = $valley->find_available_meadow_responsible_for_worker($worker)) {
$mt_and_pid_to_worker_status{$meadow_type} ||= $meadow->status_of_all_our_workers;
} else {
$worker_status_summary{'AWOL'}++;
$worker_status_counts{$meadow_type}{'UNREACHABLE'}++;
$gc_wpid_to_worker{$process_id} = $worker;
next; # Worker is unreachable from this Valley
}
}
print "\t".join(', ', map { "$_:$worker_status_summary{$_}" } keys %worker_status_summary)."\n\n";
if(my $total_lost = scalar(keys %gc_wpid_to_worker)) {
warn "GarbageCollector: Discovered $total_lost lost workers\n";
my $process_id = $worker->process_id;
if(my $status = $mt_and_pid_to_worker_status{$meadow_type}{$process_id}) { # can be RUN|PEND|xSUSP
$worker_status_counts{$meadow_type}{$status}++;
} else {
$worker_status_counts{$meadow_type}{'LOST'}++;
my $wpid_to_cod = {};
if(UNIVERSAL::can($meadow, 'find_out_causes')) {
$wpid_to_cod = $meadow->find_out_causes( keys %gc_wpid_to_worker );
my $lost_with_known_cod = scalar(keys %$wpid_to_cod);
warn "GarbageCollector: Found why $lost_with_known_cod of them died\n";
$mt_and_pid_to_lost_worker{$meadow_type}{$process_id} = $worker;
}
}
# just a quick summary report:
foreach my $meadow_type (keys %worker_status_counts) {
warn "GarbageCollector:\t[$meadow_type Meadow:]\t".join(', ', map { "$_:$worker_status_counts{$meadow_type}{$_}" } keys %{$worker_status_counts{$meadow_type}})."\n\n";
}
warn "GarbageCollector: Releasing the jobs\n";
while(my ($process_id, $worker) = each %gc_wpid_to_worker) {
$worker->cause_of_death( $wpid_to_cod->{$process_id} || 'FATALITY');
$self->register_worker_death($worker);
while(my ($meadow_type, $pid_to_lost_worker) = each %mt_and_pid_to_lost_worker) {
my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
if(my $lost_this_meadow = scalar(keys %$pid_to_lost_worker) ) {
warn "GarbageCollector:\tDiscovered $lost_this_meadow lost $meadow_type Workers\n";
my $wpid_to_cod = {};
if($this_meadow->can('find_out_causes')) {
$wpid_to_cod = $this_meadow->find_out_causes( keys %$pid_to_lost_worker );
my $lost_with_known_cod = scalar(keys %$wpid_to_cod);
warn "GarbageCollector:\tFound why $lost_with_known_cod of $meadow_type Workers died\n";
} else {
warn "GarbageCollector:\t$meadow_type meadow does not support post-mortem examination\n";
}
warn "GarbageCollector:\tReleasing the jobs\n";
while(my ($process_id, $worker) = each %$pid_to_lost_worker) {
$worker->cause_of_death( $wpid_to_cod->{$process_id} || 'FATALITY');
$self->register_worker_death($worker);
}
}
}
# the following bit is completely Meadow-agnostic and only restores database integrity:
if($check_buried_in_haste) {
print "====== Checking for workers buried in haste... ";
warn "GarbageCollector:\tChecking for Workers buried in haste...\n";
my $buried_in_haste_list = $self->fetch_all_dead_workers_with_jobs();
if(my $bih_number = scalar(@$buried_in_haste_list)) {
print "$bih_number, reclaiming jobs.\n\n";
warn "GarbageCollector:\tfound $bih_number jobs, reclaiming.\n\n";
if($bih_number) {
my $job_adaptor = $self->db->get_AnalysisJobAdaptor();
foreach my $worker (@$buried_in_haste_list) {
......@@ -341,7 +359,7 @@ sub check_for_dead_workers { # a bit counter-intuitively only looks for curre
}
}
} else {
print "none\n";
warn "GarbageCollector:\tfound none\n";
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment