Skip to content
Snippets Groups Projects
Commit d3ab594d authored by Jessica Severin's avatar Jessica Severin
Browse files

new calculation of avg_msec_per_job

better debug system to allow worker to control analysis debug level
new batch_size method will use avg_msec_per_job to calculate batch_size on the fly
  if analysis_stat.batch_size==0
parent ec68e371
No related branches found
No related tags found
No related merge requests found
......@@ -73,6 +73,7 @@ use strict;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
use Sys::Hostname;
use Time::HiRes qw(time);
use Bio::EnsEMBL::Analysis;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
......@@ -91,6 +92,7 @@ sub new {
sub init {
my $self = shift;
$self->{'start_time'} = time();
$self->debug(0);
return $self;
}
......@@ -109,6 +111,12 @@ sub beekeeper {
$self->{'_beekeeper'} = shift if(@_);
return $self->{'_beekeeper'};
}
sub debug {
my $self = shift;
$self->{'_debug'} = shift if(@_);
$self->{'_debug'}=0 unless(defined($self->{'_debug'}));
return $self->{'_debug'};
}
=head2 analysis
......@@ -250,6 +258,13 @@ sub output_dir {
return $self->{'_output_dir'};
}
sub perform_global_cleanup {
my $self = shift;
$self->{'_perform_global_cleanup'} = shift if(@_);
$self->{'_perform_global_cleanup'} = 1 unless(defined($self->{'_perform_global_cleanup'}));
return $self->{'_perform_global_cleanup'};
}
sub print_worker {
my $self = shift;
print("WORKER: hive_id=",$self->hive_id,
......@@ -271,7 +286,7 @@ sub print_worker {
=head2 batch_size
Arg [1] : (optional) string $value
Args : none
Title : batch_size
Usage : $value = $self->batch_size;
$self->batch_size($new_value);
......@@ -287,13 +302,19 @@ sub print_worker {
sub batch_size {
my $self = shift;
my $stats = $self->analysis->stats;
my $batch_size = $stats->batch_size;
if(@_) {
my $batch_size = shift;
my $stats = $self->analysis->stats;
$batch_size = shift;
$stats->batch_size($batch_size);
$stats->update;
}
if(($batch_size == 0) and ($stats->avg_msec_per_job)) {
$batch_size = int(15000 / $stats->avg_msec_per_job); # num jobs in 15 secs
}
my $batch_size = $self->analysis->stats->batch_size;
$batch_size = 1 if($batch_size < 1); # make sure we grab at least one job
if($self->job_limit and ($self->job_limit < $batch_size)) {
$batch_size = $self->job_limit;
}
......@@ -342,6 +363,8 @@ sub run
$self->db->dbc->disconnect_when_inactive(0);
my $jobDBA = $self->db->get_AnalysisJobAdaptor;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $alive=1;
while($alive) {
my $jobs = [];
......@@ -353,11 +376,14 @@ sub run
$jobs = $jobDBA->fetch_by_claim_analysis($claim, $self->analysis->dbID);
}
$self->queen->worker_check_in($self);
$self->queen->worker_check_in($self); #will sync analysis_stats if >60sec overdue
$self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n");
$self->analysis->stats->print_stats if($self->debug);
print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n") if($self->debug);
my $batch_start = time() * 1000;
foreach my $job (@{$jobs}) {
$self->redirect_job_output($job);
$self->run_module_with_job($job);
......@@ -366,6 +392,11 @@ sub run
$self->close_and_update_job_output($job);
$self->{'_work_done'}++;
}
my $batch_end = time() * 1000;
#printf("batch start:%f end:%f\n", $batch_start, $batch_end);
$statsDBA->interval_update_work_done($self->analysis->dbID,
scalar(@$jobs),
($batch_end - $batch_start));
$self->cause_of_death('JOB_LIMIT') if($specific_job);
......@@ -381,8 +412,11 @@ sub run
#}
if($self->cause_of_death) { $alive=undef; }
}
#have runnable cleanup any global/process files/data it may have created
$self->analysis->runnableDB->global_cleanup();
if($self->perform_global_cleanup) {
#have runnable cleanup any global/process files/data it may have created
$self->analysis->runnableDB->global_cleanup();
}
$self->queen->register_worker_death($self);
......@@ -412,6 +446,7 @@ sub run_module_with_job
#pass the input_id from the job into the runnableDB object
$runObj->input_id($job->input_id);
$runObj->analysis_job_id($job->dbID);
$runObj->debug($self->debug);
$job->status('GET_INPUT');
$runObj->fetch_input;
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment