Commit 74244f5e authored by Leo Gordon's avatar Leo Gordon
Browse files

experimental feature: re-specialization of workers instead of dying from NO_WORK

parent 9ddf43b2
......@@ -109,10 +109,10 @@ sub create_new_worker {
my ($self, @args) = @_;
my ($meadow_type, $meadow_name, $process_id, $exec_host, $resource_class_id, $resource_class_name,
$no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs) =
$no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $can_respecialize) =
rearrange([qw(meadow_type meadow_name process_id exec_host resource_class_id resource_class_name
no_write debug worker_log_dir hive_log_dir job_limit life_span no_cleanup retry_throwing_jobs) ], @args);
no_write debug worker_log_dir hive_log_dir job_limit life_span no_cleanup retry_throwing_jobs can_respecialize) ], @args);
if( defined($resource_class_name) ) {
my $rc = $self->db->get_ResourceClassAdaptor->fetch_by_name($resource_class_name)
......@@ -162,6 +162,8 @@ sub create_new_worker {
$worker->retry_throwing_jobs($retry_throwing_jobs) if(defined $retry_throwing_jobs);
$worker->can_respecialize($can_respecialize) if(defined $can_respecialize);
return $worker;
}
......@@ -257,6 +259,7 @@ sub specialize_new_worker {
print "Queen picked analysis with dbID=".$stats->analysis_id." for the worker\n";
$worker->analysis( undef ); # make sure we reset anything that was there before
$analysis_id = $stats->analysis_id;
} else {
$worker->cause_of_death('NO_ROLE');
......@@ -314,15 +317,11 @@ sub register_worker_death {
$analysis_stats_adaptor->decrease_running_workers($worker->analysis_id);
}
if($cod eq 'NO_WORK') {
$analysis_stats_adaptor->update_status($worker->analysis_id, 'ALL_CLAIMED');
} elsif($cod eq 'UNKNOWN'
or $cod eq 'MEMLIMIT'
or $cod eq 'RUNLIMIT'
or $cod eq 'KILLED_BY_USER'
or $cod eq 'SEE_MSG'
or $cod eq 'NO_ROLE'
or $cod eq 'CONTAMINATED') {
unless( $cod eq 'NO_WORK'
or $cod eq 'JOB_LIMIT'
or $cod eq 'HIVE_OVERLOAD'
or $cod eq 'LIFESPAN'
) {
$self->db->get_AnalysisJobAdaptor->release_undone_jobs_from_worker($worker);
}
......
......@@ -65,19 +65,20 @@ package Bio::EnsEMBL::Hive::Worker;
use strict;
use POSIX;
use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Utils::Argument; # import 'rearrange()'
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::Hive::Extensions;
use Bio::EnsEMBL::Hive::Limiter;
use Bio::EnsEMBL::Hive::Process;
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
use Bio::EnsEMBL::Hive::Utils::RedirectStack;
use Bio::EnsEMBL::Hive::Limiter;
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
use base ( 'Bio::EnsEMBL::Storable', # inherit dbID(), adaptor() and new() methods
);
......@@ -252,6 +253,32 @@ sub special_batch {
return $self->{'_special_batch'};
}
sub perform_cleanup {
my $self = shift;
$self->{'_perform_cleanup'} = shift if(@_);
$self->{'_perform_cleanup'} = 1 unless(defined($self->{'_perform_cleanup'}));
return $self->{'_perform_cleanup'};
}
# this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?
sub retry_throwing_jobs {
my $self = shift @_;
$self->{'_retry_throwing_jobs'} = shift @_ if(@_);
return defined($self->{'_retry_throwing_jobs'}) ? $self->{'_retry_throwing_jobs'} : 1;
}
sub can_respecialize {
my $self = shift;
$self->{'_can_respecialize'} = shift if(@_);
return $self->{'_can_respecialize'};
}
=head2 analysis
Arg [1] : (optional) Bio::EnsEMBL::Hive::Analysis $value
......@@ -381,15 +408,6 @@ sub runnable_object {
return $self->{'_runnable_object'};
}
# this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?
sub retry_throwing_jobs {
my $self = shift @_;
$self->{'_retry_throwing_jobs'} = shift @_ if(@_);
return defined($self->{'_retry_throwing_jobs'}) ? $self->{'_retry_throwing_jobs'} : 1;
}
sub get_stdout_redirector {
my $self = shift;
......@@ -404,13 +422,6 @@ sub get_stderr_redirector {
}
sub perform_cleanup {
my $self = shift;
$self->{'_perform_cleanup'} = shift if(@_);
$self->{'_perform_cleanup'} = 1 unless(defined($self->{'_perform_cleanup'}));
return $self->{'_perform_cleanup'};
}
sub print_worker {
my $self = shift;
......@@ -469,55 +480,18 @@ sub toString {
=cut
sub run {
my $self = shift;
my @spec_args = @_;
my ($self, $specialization_arglist) = @_;
if( my $worker_log_dir = $self->log_dir ) {
$self->get_stdout_redirector->push( $worker_log_dir.'/worker.out' );
$self->get_stderr_redirector->push( $worker_log_dir.'/worker.err' );
}
my $min_batch_time;
eval {
$self->enter_status('SPECIALIZATION');
$self->adaptor->specialize_new_worker( $self, @spec_args );
$self->print_worker();
$min_batch_time = $self->analysis->stats->min_batch_time();
1;
} or do {
my $msg = $@;
warn "Could not specialize worker:\n\t$msg\n";
$self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 1 );
$self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
};
if( !$self->cause_of_death() ) {
eval {
$self->enter_status('COMPILATION');
my $runnable_object = $self->analysis->process or die "Unknown compilation error";
$runnable_object->db( $self->adaptor->db );
$runnable_object->worker( $self );
$runnable_object->debug( $self->debug );
$runnable_object->execute_writes( $self->execute_writes );
$self->runnable_object( $runnable_object );
$self->enter_status('READY');
$self->adaptor->db->dbc->disconnect_when_inactive(0);
1;
} or do {
my $msg = "Could not compile Runnable '".$self->analysis->module."' :\n\t".$@;
warn "$msg\n";
$self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 1 );
$self->cause_of_death('SEE_MSG');
};
}
my $min_batch_time = Bio::EnsEMBL::Hive::AnalysisStats::min_batch_time();
my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
$self->specialize_and_compile_wrapper( $specialization_arglist );
while (!$self->cause_of_death) { # Worker's lifespan loop (ends only when the worker dies for any reason)
my $batches_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
......@@ -578,13 +552,24 @@ sub run {
# A mechanism whereby workers can be caused to exit even if they were doing fine:
#
# FIXME: The following check is not *completely* correct, as it assumes hive_capacity is "local" to the analysis:
if (!$self->cause_of_death
and defined($self->analysis->stats->hive_capacity)
and 0 <= $self->analysis->stats->hive_capacity
and $self->analysis->stats->hive_capacity < $self->analysis->stats->num_running_workers
if (!$self->cause_of_death) {
my $stats = $self->analysis->stats;
if( defined($stats->hive_capacity)
and 0 <= $stats->hive_capacity
and $stats->hive_capacity < $stats->num_running_workers
) {
$self->cause_of_death('HIVE_OVERLOAD');
}
}
if( $self->cause_of_death() eq 'NO_WORK') {
$self->adaptor->db->get_AnalysisStatsAdaptor->update_status($self->analysis_id, 'ALL_CLAIMED');
if( $self->can_respecialize and !$specialization_arglist ) {
$self->cause_of_death(undef);
$self->specialize_and_compile_wrapper();
}
}
} # /Worker's lifespan loop
......@@ -609,6 +594,54 @@ sub run {
}
sub specialize_and_compile_wrapper {
my ($self, $specialization_arglist) = @_;
eval {
$self->enter_status('SPECIALIZATION');
my $respecialization_from = $self->analysis_id && $self->analysis->logic_name.'('.$self->analysis_id.')';
$self->adaptor->specialize_new_worker( $self, $specialization_arglist ? @$specialization_arglist : () );
if($respecialization_from) {
my $respecialization_to = $self->analysis->logic_name.'('.$self->analysis_id.')';
my $msg = "Respecialization from $respecialization_from to $respecialization_to";
warn "\n$msg\n";
$self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 0 );
}
$self->print_worker();
1;
} or do {
my $msg = $@;
warn "Could not specialize worker:\n\t$msg\n";
$self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 1 );
$self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
};
if( !$self->cause_of_death() ) {
eval {
$self->enter_status('COMPILATION');
my $runnable_object = $self->analysis->process or die "Unknown compilation error";
$runnable_object->db( $self->adaptor->db );
$runnable_object->worker( $self );
$runnable_object->debug( $self->debug );
$runnable_object->execute_writes( $self->execute_writes );
$self->runnable_object( $runnable_object );
$self->enter_status('READY');
$self->adaptor->db->dbc->disconnect_when_inactive(0);
1;
} or do {
my $msg = $@;
warn "Could not compile Runnable '".$self->analysis->module."' :\n\t$msg\n";
$self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 1 );
$self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
};
}
}
sub run_one_batch {
my ($self, $jobs) = @_;
......
......@@ -53,6 +53,7 @@ sub main {
$self->{'sleep_minutes'} = 1;
$self->{'retry_throwing_jobs'} = undef;
$self->{'can_respecialize'} = undef;
$self->{'hive_log_dir'} = undef;
GetOptions(
......@@ -83,6 +84,7 @@ sub main {
'logic_name=s' => \$self->{'logic_name'},
'hive_log_dir|hive_output_dir=s' => \$self->{'hive_log_dir'},
'retry_throwing_jobs=i' => \$self->{'retry_throwing_jobs'},
'can_respecialize=i' => \$self->{'can_respecialize'},
'debug=i' => \$self->{'debug'},
# other commands/options
......@@ -277,7 +279,7 @@ sub generate_worker_cmd {
$worker_cmd .= " -url '". $self->{'safe_url'} ."'";
}
foreach my $worker_option ('job_limit', 'life_span', 'retry_throwing_jobs', 'hive_log_dir', 'debug') {
foreach my $worker_option ('job_limit', 'life_span', 'retry_throwing_jobs', 'can_respecialize', 'hive_log_dir', 'debug') {
if(defined(my $value = $self->{$worker_option})) {
$worker_cmd .= " -${worker_option} $value";
}
......@@ -436,6 +438,7 @@ __DATA__
-life_span <num> : life_span limit for each worker
-logic_name <string> : restrict the pipeline stat/runs to this analysis logic_name
-retry_throwing_jobs 0|1 : if a job dies *knowingly*, should we retry it by default?
-can_respecialize <0|1> : allow workers to re-specialize into another analysis (within resource_class) after their previous analysis was exhausted
-hive_log_dir <path> : directory where stdout/stderr of the hive is redirected
-debug <debug_level> : set debug level of the workers
......
......@@ -12,8 +12,8 @@ use Bio::EnsEMBL::Hive::Valley;
Bio::EnsEMBL::Registry->no_version_check(1);
my ($reg_conf, $reg_alias, $url); # Connection parameters
my ($resource_class_id, $resource_class_name, $analysis_id, $logic_name, $job_id); # Task specification parameters
my ($job_limit, $life_span, $no_cleanup, $no_write, $hive_log_dir, $worker_log_dir, $retry_throwing_jobs, $force); # Worker control parameters
my ($resource_class_id, $resource_class_name, $analysis_id, $logic_name, $job_id, $force); # Task specification parameters
my ($job_limit, $life_span, $no_cleanup, $no_write, $hive_log_dir, $worker_log_dir, $retry_throwing_jobs, $can_respecialize); # Worker control parameters
my ($help, $debug);
GetOptions(
......@@ -29,6 +29,7 @@ GetOptions(
'analysis_id=i' => \$analysis_id,
'logic_name=s' => \$logic_name,
'job_id=i' => \$job_id,
'force=i' => \$force,
# Worker control parameters:
'job_limit=i' => \$job_limit,
......@@ -38,7 +39,7 @@ GetOptions(
'hive_log_dir|hive_output_dir=s' => \$hive_log_dir, # keep compatibility with the old name
'worker_log_dir|worker_output_dir=s' => \$worker_log_dir, # will take precedence over hive_log_dir if set
'retry_throwing_jobs=i' => \$retry_throwing_jobs,
'force=i' => \$force,
'can_respecialize=i' => \$can_respecialize,
# Other commands
'h|help' => \$help,
......@@ -100,6 +101,7 @@ eval {
-worker_log_dir => $worker_log_dir,
-hive_log_dir => $hive_log_dir,
-retry_throwing_jobs => $retry_throwing_jobs,
-can_respecialize => $can_respecialize,
# Other parameters:
-debug => $debug,
......@@ -109,12 +111,14 @@ my $msg_thrown = $@;
if($worker) {
$worker->run(
my $specialization_arglist = ($analysis_id || $logic_name || $job_id) && [
-analysis_id => $analysis_id,
-logic_name => $logic_name,
-job_id => $job_id,
-force => $force,
);
];
$worker->run( $specialization_arglist );
} else {
......@@ -173,6 +177,7 @@ __DATA__
-analysis_id <id> : pre-specify this worker in a particular analysis defined by database id
-logic_name <string> : pre-specify this worker in a particular analysis defined by name
-job_id <id> : run a specific job defined by its database id
-force 0|1 : set to 1 if you want to force running a Worker over a BLOCKED analysis or to run a specific DONE/SEMAPHORED job_id
=head2 Worker control parameters:
......@@ -183,7 +188,7 @@ __DATA__
-hive_log_dir <path> : directory where stdout/stderr of the whole hive of workers is redirected
-worker_log_dir <path> : directory where stdout/stderr of this particular worker is redirected
-retry_throwing_jobs <0|1> : if a job dies *knowingly*, should we retry it by default?
-force 0|1 : set to 1 if you want to force running a Worker over a BLOCKED analysis or to run a specific DONE/SEMAPHORED job_id
-can_respecialize <0|1> : allow this worker to re-specialize into another analysis (within resource_class) after it has exhausted all jobs of the current one
=head2 Other options:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment