my $self = shift @_;
my $worker = shift @_;
my %flags = @_;
my ($analysis_id, $logic_name, $job_id, $force)
= @flags{qw(-analysis_id -logic_name -job_id -force)};
if( scalar( grep {defined($_)} ($analysis_id, $logic_name, $job_id) ) > 1) {
die "At most one of the options {-analysis_id, -logic_name, -job_id} can be set to pre-specialize a Worker";
}
my ($analysis, $stats);
my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
if($job_id or $analysis_id or $logic_name) { # probably pre-specialized from command-line
if($job_id) {
warn "resetting and fetching job for job_id '$job_id'\n";
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
my $job = $job_adaptor->fetch_by_dbID( $job_id )
or die "Could not fetch job with dbID='$job_id'";
my $job_status = $job->status();
if($job_status =~/(CLAIMED|PRE_CLEANUP|FETCH_INPUT|RUN|WRITE_OUTPUT|POST_CLEANUP)/ ) {
die "Job with dbID='$job_id' is already in progress, cannot run"; # FIXME: try GC first, then complain
} elsif($job_status =~/(DONE|SEMAPHORED)/ and !$force) {
die "Job with dbID='$job_id' is $job_status, please use -force 1 to override";
}
if(($job_status eq 'DONE') and $job->semaphored_job_id) {
warn "Increasing the semaphore count of the dependent job";
$job_adaptor->increase_semaphore_count_for_jobid( $job->semaphored_job_id );
}
$analysis_id = $job->analysis_id;
}
if($logic_name) {
$analysis = $self->db->get_AnalysisAdaptor->fetch_by_logic_name($logic_name)
or die "analysis with name='$logic_name' could not be fetched from the database";
$analysis_id = $analysis->dbID;
} elsif($analysis_id) {
$analysis = $self->db->get_AnalysisAdaptor->fetch_by_dbID($analysis_id)
or die "analysis with dbID='$analysis_id' could not be fetched from the database";
}
if( $worker->resource_class_id
and $worker->resource_class_id != $analysis->resource_class_id) {
die "resource_class of analysis ".$analysis->logic_name." is incompatible with this Worker's resource_class";
}
$stats = $analysis->stats;
$self->safe_synchronize_AnalysisStats($stats);
unless($job_id or $force) { # do we really need to run this analysis?
if($self->db->get_RoleAdaptor->get_hive_current_load() >= 1.1) {
$worker->cause_of_death('HIVE_OVERLOAD');
die "Hive is overloaded, can't specialize a worker";
}
if($stats->status eq 'BLOCKED') {
die "Analysis is BLOCKED, can't specialize a worker";
}
if($stats->num_required_workers <= 0) {
die "Analysis doesn't require extra workers at the moment";
}
if($stats->status eq 'DONE') {
die "Analysis is DONE, and doesn't require workers";
}
}
# probably scheduled by beekeeper.pl:
$analysis_id = $stats->analysis_id;
} else {
$worker->cause_of_death('NO_ROLE');
die "No analysis suitable for the worker was found\n";
}
my $role_adaptor = $self->db->get_RoleAdaptor;
if( my $old_role = $worker->current_role ) {
$role_adaptor->finalize_role( $old_role );
}
'worker' => $worker,
'analysis_id' => $analysis_id,
);
$role_adaptor->store( $new_role );
$worker->current_role( $new_role );
if($job_id) {
my $role_id = $new_role->
dbID;
if( my $job = $self->db->get_AnalysisJobAdaptor->reset_or_grab_job_by_dbID($job_id, $role_id) ) {
$worker->special_batch( [ $job ] );
} else {
die "Could not claim job with dbID='$job_id' for Role with dbID='$role_id'";
}
} else { # count it as autonomous worker sharing the load of that analysis:
$analysis_stats_adaptor->update_status($analysis_id, 'WORKING');
$analysis_stats_adaptor->decrease_required_workers( $new_role->analysis_id );
}
# The following increment used to be done only when no specific task was given to the worker,
# thereby excluding such "special task" workers from being counted in num_running_workers.
#
# However this may be tricky to emulate by triggers that know nothing about "special tasks",
# so I am (temporarily?) simplifying the accounting algorithm.
#
unless( $self->db->hive_use_triggers() ) {
$analysis_stats_adaptor->increase_running_workers( $new_role->analysis_id );
}
}