Commit 8a07e8f7 authored by Leo Gordon's avatar Leo Gordon
Browse files

put back the lifespan option that I forgot to copy when merging

parent dbaad5bf
......@@ -56,10 +56,8 @@ sub main {
my $reset_all_jobs_for_analysis = 0;
$self->{'sleep_minutes'} = 2;
$self->{'overdue_minutes'} = 60;
$self->{'overdue_minutes'} = 60; # which means one hour
$self->{'verbose_stats'} = 1;
$self->{'monitor'} = undef;
$self->{'reg_file'} = undef;
$self->{'reg_name'} = 'hive';
$self->{'maximise_concurrency'} = 0;
......@@ -93,6 +91,7 @@ sub main {
# worker control
'jlimit=i' => \$self->{'job_limit'},
'batch_size=i' => \$self->{'batch_size'},
'lifespan=i' => \$self->{'lifespan'},
'logic_name=s' => \$self->{'logic_name'},
'maximise_concurrency' => \$self->{'maximise_concurrency'},
......@@ -115,16 +114,6 @@ sub main {
if ($help) { usage(); }
if($local) {
$self->{'meadow'} = Bio::EnsEMBL::Hive::Meadow::LOCAL->new();
$self->{'meadow'} -> total_running_workers_limit($local_cpus);
} else {
$self->{'meadow'} = Bio::EnsEMBL::Hive::Meadow::LSF->new();
$self->{'meadow'} -> lsf_options($lsf_options);
}
$self->{'meadow'} -> pending_adjust(not $no_pend_adjust);
$self->{'meadow'} -> submitted_workers_limit($worker_limit);
parse_conf($self, $conf_file);
if($run or $self->{'run_job_id'}) {
......@@ -157,7 +146,18 @@ sub main {
$queen->{'maximise_concurrency'} = 1 if ($self->{'maximise_concurrency'});
$queen->{'verbose_stats'} = $self->{'verbose_stats'};
$self->{'name'} = $self->{'dba'}->get_MetaContainer->list_value_by_key("name")->[0];
my $pipeline_name = $self->{'dba'}->get_MetaContainer->list_value_by_key("name")->[0];
if($local) {
$self->{'meadow'} = Bio::EnsEMBL::Hive::Meadow::LOCAL->new();
$self->{'meadow'} -> total_running_workers_limit($local_cpus);
} else {
$self->{'meadow'} = Bio::EnsEMBL::Hive::Meadow::LSF->new();
$self->{'meadow'} -> lsf_options($lsf_options);
}
$self->{'meadow'} -> pending_adjust(not $no_pend_adjust);
$self->{'meadow'} -> submitted_workers_limit($worker_limit);
$self->{'meadow'} -> pipeline_name($pipeline_name);
if($reset_job_id) { $queen->reset_and_fetch_job_by_dbID($reset_job_id); }
......@@ -173,7 +173,7 @@ sub main {
if($remove_analysis_id) { remove_analysis_id($self, $remove_analysis_id); }
if($all_dead) { $queen->register_all_workers_dead(); }
if($check_for_dead) { check_for_dead_workers($self, $queen, $self->{'overdue_minutes'}); }
if($check_for_dead) { check_for_dead_workers($self, $queen, 1); }
if ($kill_worker_id) {
my $worker = $queen->_fetch_by_hive_id($kill_worker_id);
......@@ -265,6 +265,7 @@ sub usage {
print "\n===============[worker control]==========================\n";
print " -jlimit <num> : #jobs to run before worker can die naturally\n";
print " -batch_size <num> : #jobs a worker can claim at once\n";
print " -lifespan <num> : lifespan limit for each worker\n";
print " -logic_name <string> : restrict the pipeline stat/runs to this analysis logic_name\n";
print " -maximise_concurrency 1 : try to run more different analyses at the same time\n";
......@@ -299,26 +300,68 @@ sub parse_conf {
}
sub check_for_dead_workers {
my ($self, $queen, $overdue_minutes) = @_;
my ($self, $queen, $check_buried_in_haste) = @_;
print("===== check for dead workers\n");
my $overdueWorkers = $queen->fetch_overdue_workers($overdue_minutes*60);
print(scalar(@{$overdueWorkers}), " overdue workers\n");
foreach my $worker (@{$overdueWorkers}) {
next unless($self->{'meadow'}->responsible_for_worker($worker));
unless($self->{'meadow'}->can('status_of_all_my_workers')) {
return check_for_dead_workers_slow($queen);
}
printf("%10d %35s %15s %20s(%d) : ",
$worker->hive_id, $worker->host, $worker->process_id,
$worker->analysis->logic_name, $worker->analysis->dbID);
my $worker_status_hash = $self->{'meadow'}->status_of_all_my_workers();
my %worker_status_summary = ();
my $queen_worker_list = $queen->fetch_overdue_workers(0);
if( $self->{'meadow'}->check_worker_is_alive($worker) ) {
print("ALIVE and running\n");
} else {
print("worker is missing => it DIED!!\n");
$queen->register_worker_death($worker);
print "====== Live workers according to Queen:".scalar(@$queen_worker_list).", Meadow:".scalar(keys %$worker_status_hash)."\n";
foreach my $worker (@$queen_worker_list) {
my $worker_pid = $worker->process_id();
if(my $status = $worker_status_hash->{$worker_pid}) { # can be RUN|PEND|xSUSP
$worker_status_summary{$status}++;
} else {
$worker_status_summary{'AWOL'}++;
$queen->register_worker_death($worker);
}
}
print "\t".join(', ', map { "$_:$worker_status_summary{$_}" } keys %worker_status_summary)."\n\n";
if($check_buried_in_haste) {
print "====== Checking for workers buried in haste... ";
my $buried_in_haste_list = $queen->fetch_dead_workers_with_jobs();
if(my $bih_number = scalar(@$buried_in_haste_list)) {
print "$bih_number, reclaiming jobs.\n\n";
if($bih_number) {
my $job_adaptor = $queen->db->get_AnalysisJobAdaptor();
foreach my $worker (@$buried_in_haste_list) {
$job_adaptor->reset_dead_jobs_for_worker($worker);
}
}
} else {
print "none\n";
}
}
}
}
sub check_for_dead_workers_slow {
my ($self, $queen) = @_;
my $overdue_minutes = $self->{'overdue_minutes'};
print("===== check for dead workers\n");
my $overdueWorkers = $queen->fetch_overdue_workers($overdue_minutes*60);
print(scalar(@{$overdueWorkers}), " overdue workers\n");
foreach my $worker (@{$overdueWorkers}) {
next unless($self->{'meadow'}->responsible_for_worker($worker));
printf("%10d %35s %15s %20s(%d) : ",
$worker->hive_id, $worker->host, $worker->process_id,
$worker->analysis->logic_name, $worker->analysis->dbID);
if( $self->{'meadow'}->check_worker_is_alive($worker) ) {
print("ALIVE and running\n");
} else {
print("worker is missing => it DIED!!\n");
$queen->register_worker_death($worker);
}
}
}
......@@ -347,14 +390,14 @@ sub show_running_workers {
show_given_workers($self, $queen->fetch_overdue_workers(0), $queen->{'verbose_stats'});
}
sub show_overdue_workers {
sub show_overdue_workers { # does not seem to be used
my ($self, $queen, $overdue_minutes) = @_;
print("===== overdue workers\n");
show_given_workers($self, $queen->fetch_overdue_workers($overdue_minutes*60), $queen->{'verbose_stats'});
}
sub show_failed_workers {
sub show_failed_workers { # does not seem to be used
my ($self, $queen) = @_;
print("===== CRASHED workers\n");
......@@ -370,6 +413,7 @@ sub generate_worker_cmd {
} else {
$worker_cmd .= ((defined $self->{'job_limit'}) ? (' -limit ' .$self->{'job_limit'}) : '')
. ((defined $self->{'batch_size'}) ? (' -batch_size '.$self->{'batch_size'}) : '')
. ((defined $self->{'lifespan'}) ? (' -lifespan '.$self->{'lifespan'}) : '')
. ((defined $self->{'logic_name'}) ? (' -logic_name '.$self->{'logic_name'}) : '')
. ((defined $self->{'maximise_concurrency'}) ? ' -maximise_concurrency 1' : '');
}
......@@ -406,7 +450,7 @@ sub run_autonomously {
print("\n======= beekeeper loop ** $iteration **==========\n");
check_for_dead_workers($self, $queen, $self->{'overdue_minutes'});
check_for_dead_workers($self, $queen, 0);
$queen->print_analysis_status unless($self->{'no_analysis_stats'});
$queen->print_running_worker_status;
......@@ -419,14 +463,13 @@ sub run_autonomously {
# (and we probably do not care about the limits)
$worker_count = 1;
} else { # apply different technical and self-imposed limits:
$worker_count = $self->{'meadow'}->limit_workers($worker_count, $self->{'name'});
$worker_count = $self->{'meadow'}->limit_workers($worker_count);
}
if($worker_count) {
print "Submitting $worker_count '".$self->{'meadow'}->type()."' workers\n";
my $job_name = ($self->{'name'} ? ($self->{'name'}.'-') : '') .'HL'. $iteration;
$self->{'meadow'}->submit_workers($worker_cmd, $worker_count, $job_name);
$self->{'meadow'}->submit_workers($worker_cmd, $worker_count, $iteration);
} else {
print "Not submitting any workers this iteration\n";
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment