Commit d67bcffb authored by Leo Gordon's avatar Leo Gordon
Browse files

remove the per-worker batch_size method

parent 1ca856a8
......@@ -96,7 +96,7 @@ sub batch_size {
my $self = shift;
$self->{'_batch_size'} = shift if(@_);
$self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # do we need to initialize it at all?
$self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
return $self->{'_batch_size'};
}
......
......@@ -481,7 +481,7 @@ sub store_out_files {
my $jobs = $job_adaptor->grab_jobs_for_worker( $worker );
Description:
For the specified worker, it will search available jobs,
and using the workers requested batch_size, claim/fetch that
and using the how_many_this_batch parameter, claim/fetch that
number of jobs, and then return them.
Returntype :
reference to array of Bio::EnsEMBL::Hive::AnalysisJob objects
......@@ -490,17 +490,16 @@ sub store_out_files {
=cut
sub grab_jobs_for_worker {
my ($self, $worker) = @_;
my ($self, $worker, $how_many_this_batch) = @_;
my $analysis_id = $worker->analysis->dbID();
my $worker_id = $worker->dbID();
my $batch_size = $worker->batch_size();
my $update_sql = "UPDATE job SET worker_id='$worker_id', status='CLAIMED'";
my $selection_start_sql = " WHERE analysis_id='$analysis_id' AND status='READY' AND semaphore_count<=0";
my $virgin_selection_sql = $selection_start_sql . " AND retry_count=0 LIMIT $batch_size";
my $any_selection_sql = $selection_start_sql . " LIMIT $batch_size";
my $virgin_selection_sql = $selection_start_sql . " AND retry_count=0 LIMIT $how_many_this_batch";
my $any_selection_sql = $selection_start_sql . " LIMIT $how_many_this_batch";
if($self->dbc->driver eq 'sqlite') {
# we have to be explicitly numereic here because of '0E0' value returned by DBI if "no rows have been affected":
......
......@@ -25,8 +25,8 @@
Each worker is linked to an analysis_id, registers its self on creation
into the Hive, creates a RunnableDB instance of the Analysis->module,
gets $worker->batch_size() jobs from the job table, does its
work, creates the next layer of job entries by interfacing to
gets $analysis->stats->batch_size jobs from the job table, does its work,
creates the next layer of job entries by interfacing to
the DataflowRuleAdaptor to determine the analyses it needs to pass its
output data to and creates jobs on the next analysis database.
It repeats this cycle until it has lived its lifetime or until there are no
......@@ -99,11 +99,11 @@ sub create_new_worker {
my ( $meadow_type, $process_id, $exec_host,
$rc_id, $logic_name, $analysis_id, $input_id, $job_id,
$no_write, $debug, $worker_output_dir, $hive_output_dir, $batch_size, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs) =
$no_write, $debug, $worker_output_dir, $hive_output_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs) =
rearrange([qw(meadow_type process_id exec_host
rc_id logic_name analysis_id input_id job_id
no_write debug worker_output_dir hive_output_dir batch_size job_limit life_span no_cleanup retry_throwing_jobs) ], @args);
no_write debug worker_output_dir hive_output_dir job_limit life_span no_cleanup retry_throwing_jobs) ], @args);
if($logic_name) {
if($analysis_id) {
......@@ -223,15 +223,9 @@ sub create_new_worker {
}
$worker->hive_output_dir($hive_output_dir);
if($batch_size) {
$worker->batch_size($batch_size);
}
if($job_limit) {
$worker->job_limit($job_limit);
$worker->life_span(0);
if($job_limit < $worker->batch_size()) {
$worker->batch_size( $job_limit );
}
}
if($life_span) {
$worker->life_span($life_span * 60);
......@@ -546,19 +540,19 @@ sub synchronize_AnalysisStats {
$analysisStats->failed_job_count(0);
$analysisStats->num_required_workers(0);
my $sql = "SELECT status, count(*), semaphore_count FROM job ".
my $sql = "SELECT status, semaphore_count, count(*) FROM job ".
"WHERE analysis_id=? GROUP BY status, semaphore_count";
my $sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id);
my $done_here = 0;
my $done_elsewhere = 0;
while (my ($status, $job_count, $semaphore_count)=$sth->fetchrow_array()) {
my $done_here = 0;
my $done_elsewhere = 0;
my $total_job_count = 0;
while (my ($status, $semaphore_count, $job_count)=$sth->fetchrow_array()) {
# print STDERR "$status: $job_count\n";
my $curr_total = $analysisStats->total_job_count();
$analysisStats->total_job_count($curr_total + $job_count);
my $total_job_count += $job_count;
if(($status eq 'READY') and ($semaphore_count<=0)) {
$analysisStats->unclaimed_job_count($job_count);
......@@ -585,7 +579,8 @@ sub synchronize_AnalysisStats {
}
$sth->finish;
$analysisStats->done_job_count($done_here + $done_elsewhere);
$analysisStats->total_job_count( $total_job_count );
$analysisStats->done_job_count( $done_here + $done_elsewhere );
} # /unless $self->{'_hive_use_triggers'}
$analysisStats->check_blocking_control_rules();
......
......@@ -235,6 +235,22 @@ sub more_work_done {
$self->{'work_done'}++;
}
sub next_batch_size {
my $self = shift @_;
my $batch_size = $self->analysis->stats->get_or_estimate_batch_size();
if(my $job_limit = $self->job_limit()) { # if job_limit is set, it may influence batch_size
my $jobs_to_do = $job_limit - $self->work_done();
if($jobs_to_do < $batch_size) {
return $jobs_to_do; # should return 0 when job_limit has been attained
}
}
return $batch_size;
}
sub job_limit_reached {
my $self = shift @_;
......@@ -378,7 +394,7 @@ sub print_worker {
" host=",$self->host,
" pid=",$self->process_id,
"\n");
print(" batch_size = ", $self->batch_size,"\n");
print(" batch_size = ", $self->analysis->stats->get_or_estimate_batch_size(),"\n");
print(" job_limit = ", $self->job_limit,"\n") if(defined($self->job_limit));
print(" life_span = ", $self->life_span,"\n") if(defined($self->life_span));
if(my $worker_output_dir = $self->worker_output_dir) {
......@@ -418,28 +434,6 @@ sub cleanup_worker_process_temp_directory {
#
###############################
=head2 batch_size
Args : none
Title : batch_size
Usage : $value = $self->batch_size;
$self->batch_size($new_value);
Description: Defines the number of jobs that should run in batch
before querying the database for the next job batch. Used by the
Hive system to manage the number of workers needed to complete a
particular job type.
DefaultValue : batch_size of analysis
Returntype : integer scalar
=cut
sub batch_size {
my $self = shift;
$self->{'_batch_size'} = shift if(@_);
return $self->{'_batch_size'} || $self->analysis->stats->get_or_estimate_batch_size();
}
=head2 run
......@@ -503,7 +497,9 @@ sub run {
$self->cause_of_death('CONTAMINATED');
$job_adaptor->release_undone_jobs_from_worker($self, $msg);
} else {
$jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->grab_jobs_for_worker( $self ) );
if(my $how_many_this_batch = $self->next_batch_size()) {
$jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->grab_jobs_for_worker( $self, $how_many_this_batch ) );
}
if( my $jobs_completed = $self->job_limit_reached()) {
print "job_limit reached ($jobs_completed jobs completed)\n";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment