Commit 9ddf43b2 authored by Leo Gordon's avatar Leo Gordon
Browse files

switched to using Limiter class for job_limit and made some related structural changes

parent bb6d6ca9
......@@ -147,8 +147,8 @@ sub create_new_worker {
$worker->init;
if($job_limit) {
$worker->job_limit($job_limit);
if(defined($job_limit)) {
$worker->job_limiter($job_limit);
$worker->life_span(0);
}
......
......@@ -77,6 +77,7 @@ use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
use Bio::EnsEMBL::Hive::Utils::RedirectStack;
use Bio::EnsEMBL::Hive::Limiter;
use base ( 'Bio::EnsEMBL::Storable', # inherit dbID(), adaptor() and new() methods
);
......@@ -328,24 +329,25 @@ sub life_span_limit_reached {
}
=head2 job_limit
=head2 job_limiter
Title : job_limit
Title : job_limiter
Arg [1] : (optional) integer $value
Usage : $value = $self->job_limit;
$self->job_limit($new_value);
Description: Defines the maximum number of jobs a worker can process
before it needs to die. A worker 'dies' when either the
'life_span' or 'job_limit' is exceeded.
Usage : $limiter_obj = $self->job_limiter;
$self->job_limiter($new_value);
Description: The maximum number of jobs to be done by the Worker can be limited by the given number.
A worker 'dies' when either the 'life_span' or 'job_limit' is exceeded.
DefaultValue : undef (relies on life_span to limit life of worker)
Returntype : integer scalar
Returntype : Hive::Limiter object
=cut
sub job_limit {
my $self=shift;
$self->{'_job_limit'}=shift if(@_);
return $self->{'_job_limit'};
sub job_limiter {
my $self=shift;
if( scalar(@_) or !defined($self->{'_job_limiter'}) ) {
$self->{'_job_limiter'} = Bio::EnsEMBL::Hive::Limiter->new(shift @_);
}
return $self->{'_job_limiter'};
}
......@@ -360,31 +362,6 @@ sub more_work_done {
}
sub next_batch_size {
my $self = shift @_;
my $batch_size = $self->analysis->stats->get_or_estimate_batch_size();
if(my $job_limit = $self->job_limit()) { # if job_limit is set, it may influence batch_size
my $jobs_to_do = $job_limit - $self->work_done();
if($jobs_to_do < $batch_size) {
return $jobs_to_do; # should return 0 when job_limit has been attained
}
}
return $batch_size;
}
sub job_limit_reached {
my $self = shift @_;
if($self->job_limit and $self->work_done >= $self->job_limit) {
return $self->work_done;
}
return 0;
}
# By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
#
# Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
......@@ -439,7 +416,8 @@ sub print_worker {
print $self->toString()."\n";
print("\tbatch_size = ", $self->analysis->stats->get_or_estimate_batch_size(),"\n");
print("\tjob_limit = ", $self->job_limit,"\n") if(defined($self->job_limit));
my $job_limit = $self->job_limiter->available_capacity();
print("\tjob_limit = $job_limit\n") if(defined($job_limit));
print("\tlife_span = ", $self->life_span,"\n") if(defined($self->life_span));
if(my $worker_log_dir = $self->log_dir) {
print("\tworker_log_dir = $worker_log_dir\n");
......@@ -558,17 +536,25 @@ sub run {
warn "$msg";
$self->cause_of_death('CONTAMINATED');
$job_adaptor->release_undone_jobs_from_worker($self, $msg);
} else {
if(my $how_many_this_batch = $self->next_batch_size()) {
$jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->grab_jobs_for_worker( $self, $how_many_this_batch ) );
}
if( my $jobs_completed = $self->job_limit_reached()) {
print "job_limit reached ($jobs_completed jobs completed)\n";
$self->cause_of_death('JOB_LIMIT');
} elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
print "life_span limit reached (alive for $alive_for_secs secs)\n";
$self->cause_of_death('LIFESPAN');
} elsif( $self->job_limiter->reached()) {
print "job_limit reached (".$self->work_done." jobs completed)\n";
$self->cause_of_death('JOB_LIMIT');
} elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
print "life_span limit reached (alive for $alive_for_secs secs)\n";
$self->cause_of_death('LIFESPAN');
} else {
my $desired_batch_size = $self->analysis->stats->get_or_estimate_batch_size();
$desired_batch_size = $self->job_limiter->preliminary_offer( $desired_batch_size );
$self->job_limiter->final_decision( $desired_batch_size );
my $actual_batch = $job_adaptor->grab_jobs_for_worker( $self, $desired_batch_size );
if(scalar(@$actual_batch)) {
$jobs_done_by_batches_loop += $self->run_one_batch( $actual_batch );
} else {
$self->cause_of_death('NO_WORK');
}
}
}
......@@ -609,17 +595,17 @@ sub run {
}
}
$self->adaptor->register_worker_death($self);
$self->adaptor->register_worker_death($self);
$self->analysis->stats->print_stats if($self->debug);
$self->analysis->stats->print_stats if($self->debug);
printf("dbc %d disconnect cycles\n", $self->adaptor->db->dbc->disconnect_count);
print("total jobs completed : ", $self->work_done, "\n");
printf("dbc %d disconnect cycles\n", $self->adaptor->db->dbc->disconnect_count);
print("total jobs completed : ", $self->work_done, "\n");
if( $self->log_dir ) {
$self->get_stdout_redirector->pop();
$self->get_stderr_redirector->pop();
}
if( $self->log_dir ) {
$self->get_stdout_redirector->pop();
$self->get_stderr_redirector->pop();
}
}
......@@ -633,8 +619,6 @@ sub run_one_batch {
$self->adaptor->check_in_worker( $self );
$self->adaptor->safe_synchronize_AnalysisStats($self->analysis->stats);
$self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
if($self->debug) {
$self->analysis->stats->print_stats;
print "claimed ".scalar(@{$jobs})." jobs to process\n";
......@@ -642,7 +626,7 @@ sub run_one_batch {
my $job_partial_timing;
while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay
ONE_BATCH: while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay
$job->print_job if($self->debug);
my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
......@@ -692,15 +676,13 @@ sub run_one_batch {
$job->adaptor->release_and_age_job( $job->dbID, $max_retry_count, $may_retry );
if($self->status eq 'COMPILATION' # if it failed to compile, there is no point in continuing as the code WILL be broken
or $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
or $job->lethal_for_worker ) { # trust the job's expert knowledge
my $reason = ($self->status eq 'COMPILATION') ? 'compilation error'
: $self->prev_job_error ? 'two failed jobs in a row'
if( $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
or $job->lethal_for_worker ) { # trust the job's expert knowledge
my $reason = $self->prev_job_error ? 'two failed jobs in a row'
: 'suggested by job itself';
warn "Job's error has contaminated the Worker ($reason), so the Worker will now die\n";
$self->cause_of_death('CONTAMINATED');
return $jobs_done_here;
last ONE_BATCH;
}
} else { # job successfully completed:
$self->more_work_done( $job_partial_timing );
......@@ -714,7 +696,7 @@ sub run_one_batch {
if($job->lethal_for_worker) {
warn "The Job, although complete, wants the Worker to die\n";
$self->cause_of_death('CONTAMINATED');
return $jobs_done_here;
last ONE_BATCH;
}
}
......
......@@ -78,7 +78,7 @@ sub main {
'submission_options=s' => \$submission_options,
# worker control
'job_limit|jlimit=i' => \$self->{'job_limit'},
'job_limit=i' => \$self->{'job_limit'},
'life_span|lifespan=i' => \$self->{'life_span'},
'logic_name=s' => \$self->{'logic_name'},
'hive_log_dir|hive_output_dir=s' => \$self->{'hive_log_dir'},
......
......@@ -31,7 +31,7 @@ GetOptions(
'job_id=i' => \$job_id,
# Worker control parameters:
'job_limit|limit=i' => \$job_limit,
'job_limit=i' => \$job_limit,
'life_span|lifespan=i' => \$life_span,
'no_cleanup' => \$no_cleanup,
'no_write' => \$no_write,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment