Commit e4adf393 authored by Leo Gordon's avatar Leo Gordon
Browse files

some logic cleanup and hive_overload bugfix

parent 6e28fe7b
......@@ -88,7 +88,7 @@ use Bio::EnsEMBL::Hive::Process;
## Minimum amount of time in msec that a worker should run before reporting
## back to the hive. This is used when setting the batch_size automatically.
## 120000 msec = 2 minutes
my $MIN_BATCH_TIME = 120000;
my $MIN_BATCH_TIME = 2*60*1000;
sub new {
my ($class,@args) = @_;
......@@ -98,7 +98,7 @@ sub new {
sub init {
my $self = shift;
$self->{'start_time'} = time();
$self->start_time(time());
$self->debug(0);
return $self;
}
......@@ -171,14 +171,36 @@ sub analysis {
=cut
sub life_span {
#default life_span = 60minutes
sub life_span { # default life_span = 60minutes
my( $self, $value ) = @_;
$self->{'_life_span'} = 60*60 unless(defined($self->{'_life_span'}));
$self->{'_life_span'} = $value if(defined($value));
return $self->{'_life_span'};
}
sub start_time {
my $self = shift @_;
if(@_) {
$self->{'start_time'} = shift @_;
}
return $self->{'start_time'};
}
sub life_span_limit_reached {
my $self = shift @_;
if( $self->life_span() ) {
my $alive_for_secs = time()-$self->start_time();
if($alive_for_secs > $self->life_span() ) {
return $alive_for_secs;
}
}
return 0;
}
=head2 job_limit
Title : job_limit
......@@ -199,6 +221,32 @@ sub job_limit {
return $self->{'_job_limit'};
}
sub jobs_done {
my $self = shift @_;
if(@_) {
$self->{'jobs_done'} = shift @_;
}
return $self->{'jobs_done'} || 0;
}
sub more_jobs_done {
my $self = shift @_;
$self->{'jobs_done'}++;
}
sub job_limit_reached {
my $self = shift @_;
if($self->job_limit and $self->jobs_done >= $self->job_limit) {
return $self->jobs_done;
}
return 0;
}
sub hive_id {
my( $self, $value ) = @_;
$self->{'_hive_id'} = $value if($value);
......@@ -217,21 +265,6 @@ sub process_id {
return $self->{'_ppid'};
}
sub work_done {
my $self = shift @_;
if(@_) {
$self->{'work_done'} = shift @_;
}
return $self->{'work_done'} || 0;
}
sub more_work_done {
my $self = shift @_;
$self->{'work_done'}++;
}
sub cause_of_death {
my( $self, $value ) = @_;
$self->{'_cause_of_death'} = $value if($value);
......@@ -428,22 +461,20 @@ sub run
$self->db->dbc->disconnect_when_inactive(0);
do { # EXTERNAL_do_loop ends when the cause of death is set
my $batch_start = time() * 1000;
my $batch_end = $batch_start;
my $work_done_this_interval = 0;
my $jobs = [];
do { # Worker's lifespan loop (ends only when the worker dies)
my $batches_start = time() * 1000;
my $batches_end = $batches_start;
my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
$self->{fetch_time} = 0;
$self->{run_time} = 0;
$self->{write_time} = 0;
do {
if($specific_job) {
$self->queen->worker_reclaim_job($self,$specific_job);
push @$jobs, $specific_job;
} else {
$jobs = $self->queen->worker_grab_jobs($self);
}
do { # Worker's "batches loop" exists to prevent logging the status too frequently.
# If a batch took less than $MIN_BATCH_TIME to run, the Worker keeps taking&running more batches.
my $jobs = $specific_job
? [ $self->queen->worker_reclaim_job($self,$specific_job) ]
: $self->queen->worker_grab_jobs($self);
$self->queen->worker_check_in($self); #will sync analysis_stats if needed
......@@ -463,27 +494,31 @@ sub run
$self->queen->worker_register_job_done($self, $job);
$self->more_work_done;
$self->more_jobs_done;
}
$batch_end = time() * 1000;
$work_done_this_interval += scalar(@$jobs);
if( $specific_job or
($self->job_limit and $self->work_done >= $self->job_limit)) {
$self->cause_of_death('JOB_LIMIT');
$batches_end = time() * 1000;
$jobs_done_by_batches_loop += scalar(@$jobs);
if( $specific_job ) {
$self->cause_of_death('JOB_LIMIT');
} elsif( my $jobs_completed = $self->job_limit_reached()) {
print "job_limit reached (completed $jobs_completed jobs)\n";
$self->cause_of_death('JOB_LIMIT');
} elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
print "life_span limit reached (alive for $alive_for_secs secs)\n";
$self->cause_of_death('LIFESPAN');
}
} while (!$self->cause_of_death and $batch_end-$batch_start < $MIN_BATCH_TIME); ## Run for $MIN_BATCH_TIME at least, but accept and honour own death
} while (!$self->cause_of_death and $batches_end-$batches_start < $MIN_BATCH_TIME);
#printf("batch start:%f end:%f\n", $batch_start, $batch_end);
# The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
# so they are not allowed to happen too frequently (not before $MIN_BATCH_TIME of work has been done)
#
$self->db->get_AnalysisStatsAdaptor->interval_update_work_done($self->analysis->dbID,
$work_done_this_interval, $batch_end-$batch_start, $self);
if(($self->life_span()>0) and ((time() - $self->{'start_time'}) > $self->life_span())) {
printf("life_span exhausted (alive for %d secs)\n", (time() - $self->{'start_time'}));
$self->cause_of_death('LIFESPAN');
}
$jobs_done_by_batches_loop, $batches_end-$batches_start, $self);
if (!$self->cause_of_death and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity) {
if (!$self->cause_of_death
and $self->analysis->stats->hive_capacity >= 0
and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity) {
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
"WHERE num_running_workers > hive_capacity AND analysis_id = " . $self->analysis->stats->analysis_id;
my $row_count = $self->queen->dbc->do($sql);
......@@ -491,7 +526,7 @@ sub run
$self->cause_of_death('HIVE_OVERLOAD');
}
}
} while (!$self->cause_of_death); # /EXTERNAL_do_loop
} while (!$self->cause_of_death); # /Worker's lifespan loop
$self->queen->dbc->do("UPDATE hive SET status = 'DEAD' WHERE hive_id = ".$self->hive_id);
......@@ -505,7 +540,7 @@ sub run
$self->analysis->stats->print_stats if($self->debug);
printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count);
print("total jobs completes : ", $self->work_done, "\n");
print("total jobs completed : ", $self->jobs_done, "\n");
if($self->output_dir()) {
close STDOUT;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment