my ($self, $jobs) = @_;
my $jobs_done_here = 0;
my $current_role = $self->current_role;
my $hive_use_param_stack = $self->adaptor->db->hive_use_param_stack();
my $accu_adaptor = $self->adaptor->db->get_AccumulatorAdaptor;
my $max_retry_count = $current_role->analysis->max_retry_count(); # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
$self->adaptor->check_in_worker( $self );
$self->adaptor->safe_synchronize_AnalysisStats( $current_role->analysis->stats );
if($self->debug) {
$self->worker_say( 'AnalysisStats : ' . $current_role->analysis->stats->toString );
$self->worker_say( 'claimed '.scalar(@{$jobs}).' jobs to process' );
}
my $job_partial_timing;
ONE_BATCH: while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay
my $job_id = $job->dbID();
$self->worker_say( $job->toString ) if($self->debug);
$job_partial_timing = {};
$self->start_job_output_redirection($job); # switch logging into job's STDERR
eval { # capture any throw/die
$job->incomplete(1);
$job->accu_hash( $accu_adaptor->fetch_structures_for_job_ids( $job_id )->{ $job_id } );
my $runnable_object = $self->runnable_object();
$self->adaptor->db->dbc->query_count(0);
$job_stopwatch->restart();
my @params_precedence = (
$runnable_object->param_defaults(),
$self->adaptor->db->get_PipelineWideParametersAdaptor->fetch_param_hash(),
$current_role->analysis->parameters(),
);
if( $hive_use_param_stack ) {
my $input_ids_hash = $job->adaptor->fetch_input_ids_for_job_ids( $job->param_id_stack, 2, 0 ); # input_ids have lower precedence (FOR EACH ID)
my $accu_hash = $accu_adaptor->fetch_structures_for_job_ids( $job->accu_id_stack, 2, 1 ); # accus have higher precedence (FOR EACH ID)
my %input_id_accu_hash = ( %$input_ids_hash, %$accu_hash );
push @params_precedence, @input_id_accu_hash{ sort { $a <=> $b } keys %input_id_accu_hash }; # take a slice. Mmm...
}
push @params_precedence, $job->input_id(), $job->accu_hash();
$job->param_init( $runnable_object->strict_hash_format(), @params_precedence );
$self->worker_say( "Job $job_id unsubstituted_params= ".stringify($job->{'_unsubstituted_param_hash'}) ) if($self->debug());
$runnable_object->input_job( $job ); # "take" the job
$job_partial_timing = $runnable_object->life_cycle();
};
if(my $msg = $@) {
$job->died_somewhere( $job->incomplete ); # it will be OR'd inside
$self->runnable_object->input_job->warning( $msg, $job->incomplete );
}
# whether the job completed successfully or not:
$self->runnable_object->input_job( undef ); # release an extra reference to the job
$job->runtime_msec( $job_stopwatch->get_elapsed );
$job->query_count( $self->adaptor->db->dbc->query_count );
my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died' : 'complete' );
print STDERR "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->died_somewhere)); # one copy goes to the job's STDERR
$self->stop_job_output_redirection($job); # and then we switch back to worker's STDERR
$self->worker_say( $job_completion_line ); # one copy goes to the worker's STDERR
$self->current_role->register_attempt( ! $job->died_somewhere );
if($job->died_somewhere) {
# If the job specifically said what to do next, respect that last wish.
# Otherwise follow the default behaviour set by the beekeeper in $worker:
#
my $may_retry = defined($job->transient_error) ? $job->transient_error : $self->retry_throwing_jobs;
$job->adaptor->release_and_age_job( $job_id, $max_retry_count, $may_retry, $job->runtime_msec );
if( $self->prev_job_error # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
or $job->lethal_for_worker ) { # trust the job's expert knowledge
my $reason = $self->prev_job_error ? 'two failed jobs in a row'
: 'suggested by job itself';
$self->worker_say( "Job's error has contaminated the Worker ($reason), so the Worker will now die" );
$self->cause_of_death('CONTAMINATED');
last ONE_BATCH;
}
} else { # job successfully completed:
$self->more_work_done( $job_partial_timing );
$jobs_done_here++;
$job->set_and_update_status('DONE');
if(my $semaphored_job_id = $job->semaphored_job_id) {
my $dbc = $self->adaptor->db->dbc;
$dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($dbc->driver ne 'sqlite');
$job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # step-unblock the semaphore
}
if($job->lethal_for_worker) {
$self->worker_say( "The Job, although complete, wants the Worker to die" );
$self->cause_of_death('CONTAMINATED');
last ONE_BATCH;
}
}
$self->prev_job_error( $job->died_somewhere );
$self->enter_status('READY');
} # /while(my $job = shift @$jobs)
return $jobs_done_here;
}