Commit a89f3c41 authored by Leo Gordon's avatar Leo Gordon
Browse files

detect that strange behaviour pattern if we cannot prevent it

parent 6be1419c
......@@ -200,6 +200,14 @@ sub fetch_all_failed_jobs {
}
sub fetch_all_incomplete_jobs_by_worker_id {
my ($self, $worker_id) = @_;
my $constraint = "a.status IN ('COMPILATION','GET_INPUT','RUN','WRITE_OUTPUT') AND a.worker_id='$worker_id'";
return $self->_generic_fetch($constraint);
}
sub fetch_by_url_query {
my ($self, $field_name, $field_value) = @_;
......
......@@ -489,7 +489,6 @@ sub batch_size {
sub run {
my $self = shift;
my $specific_job = $self->_specific_job;
if( my $worker_output_dir = $self->worker_output_dir ) {
open OLDOUT, ">&STDOUT";
......@@ -505,7 +504,7 @@ sub run {
$self->db->dbc->disconnect_when_inactive(0);
my $max_retry_count = $self->analysis->stats->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
do { # Worker's lifespan loop (ends only when the worker dies)
my $batches_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
......@@ -514,24 +513,89 @@ sub run {
$self->{'writing_stopwatch'} = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
BATCHES: { # note: in order to label a do{} loop you have to enclose it in an extra block
do { # Worker's "batches loop" exists to prevent logging the status too frequently.
# If a batch took less than $MIN_BATCH_TIME to run, the Worker keeps taking&running more batches.
if( my $specific_job = $self->_specific_job() ) {
$jobs_done_by_batches_loop += $self->run_one_batch( [ $self->queen->worker_reclaim_job($self, $specific_job) ] );
$self->cause_of_death('JOB_LIMIT');
} else { # a proper "BATCHES" loop
while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $MIN_BATCH_TIME) {
if(my $incompleted_count = @{ $job_adaptor->fetch_all_incomplete_jobs_by_worker_id( $self->worker_id ) }) {
die "This worker is too greedy: not having completed $incompleted_count jobs it is trying to grab yet more jobs! Has it gone multithreaded?\n";
} else {
$jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->grab_jobs_for_worker( $self ) );
if( my $jobs_completed = $self->job_limit_reached()) {
print "job_limit reached ($jobs_completed jobs completed)\n";
$self->cause_of_death('JOB_LIMIT');
} elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
print "life_span limit reached (alive for $alive_for_secs secs)\n";
$self->cause_of_death('LIFESPAN');
}
}
}
}
my $jobs = $specific_job
? [ $self->queen->worker_reclaim_job($self,$specific_job) ]
: $self->db->get_AnalysisJobAdaptor->grab_jobs_for_worker( $self );
# The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
# so they are not allowed to happen too frequently (not before $MIN_BATCH_TIME of work has been done)
#
$self->db->get_AnalysisStatsAdaptor->interval_update_work_done(
$self->analysis->dbID,
$jobs_done_by_batches_loop,
$batches_stopwatch->get_elapsed,
$self->{'fetching_stopwatch'}->get_elapsed,
$self->{'running_stopwatch'}->get_elapsed,
$self->{'writing_stopwatch'}->get_elapsed,
);
if (!$self->cause_of_death
and $self->analysis->stats->hive_capacity >= 0
and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity
and $self->analysis->stats->adaptor->decrease_running_workers_on_hive_overload( $self->analysis->dbID ) # careful with order, this operation has side-effect
) {
$self->cause_of_death('HIVE_OVERLOAD');
}
} while (!$self->cause_of_death); # /Worker's lifespan loop
if($self->perform_cleanup) {
#have runnable cleanup any global/process files/data it may have created
$self->cleanup_worker_process_temp_directory;
}
$self->queen->register_worker_death($self);
$self->analysis->stats->print_stats if($self->debug);
printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count);
print("total jobs completed : ", $self->work_done, "\n");
if( $self->worker_output_dir() ) {
close STDOUT;
close STDERR;
close WORKER_STDOUT;
close WORKER_STDERR;
open STDOUT, ">&", \*OLDOUT;
open STDERR, ">&", \*OLDERR;
}
}
$self->queen->worker_check_in($self); #will sync analysis_stats if needed
sub run_one_batch {
my ($self, $jobs) = @_;
$self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
my $jobs_done_here = 0;
if($self->debug) {
my $max_retry_count = $self->analysis->stats->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
$self->queen->worker_check_in($self); #will sync analysis_stats if needed
$self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
if($self->debug) {
$self->analysis->stats->print_stats;
print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n");
}
}
foreach my $job (@{$jobs}) {
foreach my $job (@{$jobs}) {
$job->print_job if($self->debug);
$self->start_job_output_redirection($job);
......@@ -564,7 +628,7 @@ sub run {
: 'suggested by job itself';
warn "Job's error has contaminated the Worker ($reason), so the Worker will now die\n";
$self->cause_of_death('CONTAMINATED');
last BATCHES;
return $jobs_done_here;
}
} else { # job successfully completed:
......@@ -572,67 +636,15 @@ sub run {
$job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # step-unblock the semaphore
}
$self->more_work_done;
$jobs_done_by_batches_loop++;
$jobs_done_here++;
$job->update_status('DONE');
}
$self->prev_job_error( $job->incomplete );
$self->enter_status('READY');
}
if( $specific_job ) {
$self->cause_of_death('JOB_LIMIT');
} elsif( my $jobs_completed = $self->job_limit_reached()) {
print "job_limit reached (completed $jobs_completed jobs)\n";
$self->cause_of_death('JOB_LIMIT');
} elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
print "life_span limit reached (alive for $alive_for_secs secs)\n";
$self->cause_of_death('LIFESPAN');
}
} while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $MIN_BATCH_TIME);
} # this is the extra block enclosing a labelled do{} loop
# The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
# so they are not allowed to happen too frequently (not before $MIN_BATCH_TIME of work has been done)
#
$self->db->get_AnalysisStatsAdaptor->interval_update_work_done(
$self->analysis->dbID,
$jobs_done_by_batches_loop,
$batches_stopwatch->get_elapsed,
$self->{'fetching_stopwatch'}->get_elapsed,
$self->{'running_stopwatch'}->get_elapsed,
$self->{'writing_stopwatch'}->get_elapsed,
);
if (!$self->cause_of_death
and $self->analysis->stats->hive_capacity >= 0
and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity
and $self->analysis->stats->adaptor->decrease_running_workers_on_hive_overload( $self->analysis->dbID ) # careful with order, this operation has side-effect
) {
$self->cause_of_death('HIVE_OVERLOAD');
}
} while (!$self->cause_of_death); # /Worker's lifespan loop
if($self->perform_cleanup) {
#have runnable cleanup any global/process files/data it may have created
$self->cleanup_worker_process_temp_directory;
}
$self->queen->register_worker_death($self);
$self->analysis->stats->print_stats if($self->debug);
printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count);
print("total jobs completed : ", $self->work_done, "\n");
if( $self->worker_output_dir() ) {
close STDOUT;
close STDERR;
close WORKER_STDOUT;
close WORKER_STDERR;
open STDOUT, ">&", \*OLDOUT;
open STDERR, ">&", \*OLDERR;
}
return $jobs_done_here;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment