Commit 830a7229 authored by Leo Gordon's avatar Leo Gordon
Browse files

refactoring of the Q::create_new_worker() and introduction of -force flag

parent f65e2fd5
......@@ -109,111 +109,104 @@ sub object_class {
=cut
sub create_new_worker {
my ($self, @args) = @_;
my ($self, @args) = @_;
my ( $meadow_type, $meadow_name, $process_id, $exec_host,
my ( $meadow_type, $meadow_name, $process_id, $exec_host,
$rc_id, $rc_name, $analysis_id, $logic_name, $job_id,
$no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $compile_module_once) =
$no_write, $debug, $worker_log_dir, $hive_log_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $compile_module_once, $force) =
rearrange([qw(meadow_type meadow_name process_id exec_host
rc_id rc_name analysis_id logic_name job_id
no_write debug worker_log_dir hive_log_dir job_limit life_span no_cleanup retry_throwing_jobs compile_module_once) ], @args);
rearrange([qw(meadow_type meadow_name process_id exec_host
rc_id rc_name analysis_id logic_name job_id
no_write debug worker_log_dir hive_log_dir job_limit life_span no_cleanup retry_throwing_jobs compile_module_once force) ], @args);
if($rc_name) {
if($rc_id) {
die "You should either define -rc_id or -rc_name, but not both\n";
}
if(my $rc = $self->db->get_ResourceClassAdaptor->fetch_by_name($rc_name)) {
$rc_id = $rc->dbID;
} else {
die "rc_name '$rc_name' could not be fetched from the database\n";
}
if( scalar( grep {defined($_)} ($rc_id, $rc_name, $analysis_id, $logic_name, $job_id) ) > 1) {
die "At most one of the options {-rc_id, -rc_name, -analysis_id, -logic_name, -job_id} can be set to pre-specialize a Worker";
}
if($logic_name) {
if($analysis_id) {
die "You should either define -analysis_id or -logic_name, but not both\n";
}
if(my $analysis = $self->db->get_AnalysisAdaptor->fetch_by_logic_name($logic_name)) {
$analysis_id = $analysis->dbID;
} else {
die "logic_name '$logic_name' could not be fetched from the database\n";
}
}
my ($job, $analysis, $stats);
my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
my $job;
if($job_id or $analysis_id or $logic_name) { # probably pre-specialized by running a runWorker.pl by hand
if($job_id) {
if($analysis_id) {
die "When you specify -job_id, please omit both -logic_name and -analysis_id to avoid confusion\n";
} else {
if($job_id) {
print "resetting and fetching job for job_id '$job_id'\n";
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
$job_adaptor->reset_job_by_dbID($job_id);
if($job = $job_adaptor->fetch_by_dbID($job_id)) {
$analysis_id = $job->analysis_id;
} else {
die "job_id '$job_id' could not be fetched from the database\n";
$job = $job_adaptor->fetch_by_dbID($job_id)
or die "job with dbID='$job_id' could not be fetched from the database";
$analysis_id = $job->analysis_id;
}
if($logic_name) {
$analysis = $self->db->get_AnalysisAdaptor->fetch_by_logic_name($logic_name)
or die "analysis with name='$logic_name' could not be fetched from the database";
$analysis_id = $analysis->dbID;
} elsif($analysis_id) {
$analysis = $self->db->get_AnalysisAdaptor->fetch_by_dbID($analysis_id)
or die "analysis with dbID='$analysis_id' could not be fetched from the database";
}
$stats = $analysis_stats_adaptor->fetch_by_analysis_id($analysis_id);
$self->safe_synchronize_AnalysisStats($stats);
unless($job or $force) { # do we really need to run this analysis?
if($self->get_hive_current_load() >= 1.1) {
die "Hive is overloaded, can't create a worker";
}
if($stats->status eq 'BLOCKED') {
die "Analysis is BLOCKED, can't create workers";
}
if($stats->num_required_workers <= 0) {
die "Analysis does not require workers at the moment";
}
if($stats->status eq 'DONE') {
die "Analysis is DONE, don't need to create workers";
}
}
}
my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor or return undef;
my $analysisStats;
if($analysis_id) {
$analysisStats = $analysis_stats_adaptor->fetch_by_analysis_id($analysis_id);
$self->safe_synchronize_AnalysisStats($analysisStats);
#return undef unless(($analysisStats->status ne 'BLOCKED') and ($analysisStats->num_required_workers > 0));
} else {
if( $analysisStats = $self->specialize_new_worker($rc_id) ) {
print "Scheduler picked analysis_id=".$analysisStats->analysis_id()." for the worker\n";
} else {
print "Scheduler failed to pick analysis_id for the worker\n";
}
}
return undef unless($analysisStats);
} else { # probably scheduled by beekeeper.pl
unless($job) {
#go into autonomous mode
if($rc_name) {
my $rc = $self->db->get_ResourceClassAdaptor->fetch_by_name($rc_name)
or die "resource_class with name='$rc_name' could not be fetched from the database";
if($self->get_hive_current_load() >= 1.1) {
print "Hive is overloaded, can't create a worker\n";
return;
}
if($analysisStats->status eq 'BLOCKED') {
print "Analysis is BLOCKED, can't create workers\n";
return;
}
if($analysisStats->status eq 'DONE') {
print "Analysis is DONE, don't need to create workers\n";
return;
}
$rc_id = $rc->dbID;
}
$analysis_stats_adaptor->decrease_required_workers($analysisStats->analysis_id);
$stats = $self->specialize_new_worker($rc_id)
or die "Scheduler failed to pick an analysis for the worker";
$analysisStats->print_stats;
}
# The following increment used to be done only when no specific task was given to the worker,
# thereby excluding such "special task" workers from being counted in num_running_workers.
#
# However this may be tricky to emulate by triggers that know nothing about "special tasks",
# so I am (temporarily?) simplifying the accounting algorithm.
#
unless( $self->db->hive_use_triggers() ) {
$analysis_stats_adaptor->increase_running_workers($analysisStats->analysis_id);
}
print "Scheduler picked analysis with dbID=".$stats->analysis_id." for the worker\n";
}
my $sql = q{INSERT INTO worker (born, last_check_in, meadow_type, meadow_name, process_id, host, analysis_id)
VALUES (CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, ?,?,?,?,?)};
my $sth = $self->prepare($sql);
$sth->execute($meadow_type, $meadow_name, $process_id, $exec_host, $analysisStats->analysis_id);
my $worker_id = $self->dbc->db_handle->last_insert_id(undef, undef, 'worker', 'worker_id');
$sth->execute($meadow_type, $meadow_name, $process_id, $exec_host, $stats->analysis_id);
my $worker_id = $self->dbc->db_handle->last_insert_id(undef, undef, 'worker', 'worker_id')
or die "Could not store worker";
$sth->finish;
unless($job) { # count it as autonomous worker sharing the load of that analysis:
$analysis_stats_adaptor->decrease_required_workers($stats->analysis_id);
}
# The following increment used to be done only when no specific task was given to the worker,
# thereby excluding such "special task" workers from being counted in num_running_workers.
#
# However this may be tricky to emulate by triggers that know nothing about "special tasks",
# so I am (temporarily?) simplifying the accounting algorithm.
#
unless( $self->db->hive_use_triggers() ) {
$analysis_stats_adaptor->increase_running_workers($stats->analysis_id);
}
if($hive_log_dir or $worker_log_dir) {
my $dir_revhash = dir_revhash($worker_id);
$worker_log_dir ||= $hive_log_dir .'/'. ($dir_revhash ? "$dir_revhash/" : '') .'worker_id_'.$worker_id;
......@@ -227,7 +220,8 @@ sub create_new_worker {
$sth_add_log->finish;
}
my $worker = $self->fetch_by_dbID($worker_id);
my $worker = $self->fetch_by_dbID($worker_id)
or die "Could not fetch worker with dbID=$worker_id";
$worker->init;
......@@ -254,7 +248,7 @@ sub create_new_worker {
$worker->compile_module_once($compile_module_once);
}
$analysisStats->update_status('WORKING');
$stats->update_status('WORKING');
return $worker;
}
......@@ -481,7 +475,7 @@ sub synchronize_hive {
=head2 safe_synchronize_AnalysisStats
Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
Example : $self->synchronize($analysisStats);
Example : $self->safe_synchronize_AnalysisStats($stats);
Description: Prewrapper around synchronize_AnalysisStats that does
checks and grabs sync_lock before proceeding with sync.
Used by distributed worker sync system to avoid contention.
......@@ -494,7 +488,6 @@ sub safe_synchronize_AnalysisStats {
my $self = shift;
my $stats = shift;
return $stats unless($stats);
return $stats unless($stats->analysis_id);
return $stats if($stats->status eq 'SYNCHING');
return $stats if($stats->status eq 'DONE');
......@@ -924,11 +917,11 @@ sub specialize_new_worker {
my @suitable_analyses = @{ $self->db->get_AnalysisStatsAdaptor->fetch_all_by_suitability_rc_id( $rc_id ) };
foreach my $analysis_stats (@suitable_analyses) {
foreach my $stats (@suitable_analyses) {
#synchronize and double check that it can be run:
$self->safe_synchronize_AnalysisStats($analysis_stats);
return $analysis_stats if( ($analysis_stats->status ne 'BLOCKED') and ($analysis_stats->num_required_workers > 0) );
$self->safe_synchronize_AnalysisStats($stats);
return $stats if( ($stats->status ne 'BLOCKED') and ($stats->num_required_workers > 0) );
}
return undef;
......
......@@ -21,7 +21,7 @@ my $db_conf = {
my ($reg_conf, $reg_alias, $url); # Connection parameters
my ($rc_id, $rc_name, $analysis_id, $logic_name, $job_id); # Task specification parameters
my ($job_limit, $life_span, $no_cleanup, $no_write, $hive_log_dir, $worker_log_dir, $retry_throwing_jobs, $compile_module_once); # Worker control parameters
my ($job_limit, $life_span, $no_cleanup, $no_write, $hive_log_dir, $worker_log_dir, $retry_throwing_jobs, $compile_module_once, $force); # Worker control parameters
my ($help, $debug);
GetOptions(
......@@ -52,6 +52,7 @@ GetOptions(
'worker_log_dir|worker_output_dir=s' => \$worker_log_dir, # will take precedence over hive_log_dir if set
'retry_throwing_jobs=i' => \$retry_throwing_jobs,
'compile_module_once=i' => \$compile_module_once,
'force=i' => \$force,
# Other commands
'h|help' => \$help,
......@@ -116,6 +117,7 @@ eval {
-hive_log_dir => $hive_log_dir,
-retry_throwing_jobs => $retry_throwing_jobs,
-compile_module_once => $compile_module_once,
-force => $force,
# Other parameters:
-debug => $debug,
......@@ -126,6 +128,7 @@ my $msg_thrown = $@;
if($worker) {
$worker->run();
# warn "Not really running the worker (debug)";
} else {
......@@ -201,6 +204,7 @@ __DATA__
-worker_log_dir <path> : directory where stdout/stderr of this particular worker is redirected
-retry_throwing_jobs <0|1> : if a job dies *knowingly*, should we retry it by default?
-compile_module_once 0|1 : should we compile the module only once (desired future behaviour), or pretend to do it before every job (current behaviour)?
-force 0|1 : set to 1 if you want to force running a Worker over a BLOCKED analysis
=head2 Other options:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment