Skip to content
Snippets Groups Projects
Commit f971d01f authored by Jessica Severin's avatar Jessica Severin
Browse files

Improved system for running specific jobs. System now properly runs

jobs that were created outside the database.  Also added ability to not
write (doesn't execute write_output or dataflow jobs).  Improved logic so
that if a job is specified, but something is wrong that the worker fails
rather than reverting to autonomous behaviour mode.
Fully tested by running a complete homology production test.
parent d09dc214
No related branches found
No related tags found
No related merge requests found
......@@ -99,13 +99,13 @@ our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
sub create_new_worker {
my ($self, @args) = @_;
my ($analysis_id, $beekeeper ,$pid, $job) =
rearrange([qw(analysis_id beekeeper process_id) ], @args);
my ($analysis_id, $beekeeper ,$pid, $job, $no_write) =
rearrange([qw(analysis_id beekeeper process_id job no_write) ], @args);
my $analStatsDBA = $self->db->get_AnalysisStatsAdaptor;
return undef unless($analStatsDBA);
return undef if($self->get_hive_current_load() >= 1.5);
$analysis_id = $job->analysis_id if(defined($job));
my $analysisStats;
if($analysis_id) {
......@@ -117,19 +117,23 @@ sub create_new_worker {
}
return undef unless($analysisStats);
$analStatsDBA->decrement_needed_workers($analysisStats->analysis_id);
$analysisStats->print_stats;
if($analysisStats->status eq 'BLOCKED') {
print("Analysis is BLOCKED, can't create workers\n");
return undef;
}
if($analysisStats->status eq 'DONE') {
print("Analysis is DONE, don't need to create workers\n");
return undef;
unless($job) {
#go into autonomous mode
return undef if($self->get_hive_current_load() >= 1.5);
$analStatsDBA->decrement_needed_workers($analysisStats->analysis_id);
$analysisStats->print_stats;
if($analysisStats->status eq 'BLOCKED') {
print("Analysis is BLOCKED, can't create workers\n");
return undef;
}
if($analysisStats->status eq 'DONE') {
print("Analysis is DONE, don't need to create workers\n");
return undef;
}
}
my $host = hostname;
$pid = $$ unless($pid);
$beekeeper = '' unless($beekeeper);
......@@ -149,6 +153,10 @@ sub create_new_worker {
if($worker and $analysisStats) {
$analysisStats->update_status('WORKING');
}
$worker->_specific_job($job) if(defined($job));
$worker->execute_writes(0) if($no_write);
return $worker;
}
......
......@@ -118,6 +118,12 @@ sub debug {
$self->{'_debug'}=0 unless(defined($self->{'_debug'}));
return $self->{'_debug'};
}
sub execute_writes {
my $self = shift;
$self->{'_execute_writes'} = shift if(@_);
$self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
return $self->{'_execute_writes'};
}
=head2 analysis
......@@ -380,7 +386,7 @@ sub batch_size {
sub run
{
my $self = shift;
my $specific_job = shift;
my $specific_job = $self->_specific_job;
if($self->output_dir()) {
open OLDOUT, ">&STDOUT";
......@@ -392,7 +398,6 @@ sub run
open STDOUT, ">&WORKER_STDOUT";
open STDERR, ">&WORKER_STDERR";
}
$self->print_worker();
$self->db->dbc->disconnect_when_inactive(0);
......@@ -404,6 +409,7 @@ sub run
if($specific_job) {
$self->queen->worker_reclaim_job($self,$specific_job);
push @$jobs, $specific_job;
$alive=undef;
} else {
$jobs = $self->queen->worker_grab_jobs($self);
}
......@@ -494,22 +500,26 @@ sub run_module_with_job
}
$job->update_status('GET_INPUT');
print("GET_INPUT\n") if($self->debug);
print("\nGET_INPUT\n") if($self->debug);
$runObj->fetch_input;
$job->update_status('RUN');
print("RUN\n") if($self->debug);
print("\nRUN\n") if($self->debug);
$runObj->run;
$job->update_status('WRITE_OUTPUT');
print("WRITE_OUTPUT\n") if($self->debug);
$runObj->write_output;
if($self->execute_writes) {
$job->update_status('WRITE_OUTPUT');
print("\nWRITE_OUTPUT\n") if($self->debug);
$runObj->write_output;
} else {
print("\n\n!!!! NOT write_output\n\n\n") if($self->debug);
}
$job->query_count($self->queen->dbc->query_count);
$job->runtime_msec(time()*1000 - $start_time);
unless($runObj->isa("Bio::EnsEMBL::Hive::Process") and
!($runObj->autoflow_inputjob)) {
unless(($runObj->isa("Bio::EnsEMBL::Hive::Process") and !($runObj->autoflow_inputjob))
or !($self->execute_writes)) {
printf("AUTOFLOW input->output\n") if($self->debug);
$self->queen->flow_output_job($job);
}
......@@ -526,6 +536,7 @@ sub redirect_job_output
my $outdir = $self->output_dir();
return unless($outdir);
return unless($job);
return unless($job->adaptor);
$job->stdout_file($outdir . "/job_".$job->dbID.".out");
$job->stderr_file($outdir . "/job_".$job->dbID.".err");
......@@ -547,6 +558,7 @@ sub close_and_update_job_output
return unless($job);
return unless($self->output_dir);
return unless($job->adaptor);
if(-z $job->stdout_file) {
#print("unlink zero size ", $job->stdout_file, "\n");
......@@ -578,5 +590,10 @@ sub check_system_load {
return 1; #everything ok
}
sub _specific_job {
my $self = shift;
$self->{'_specific_job'} = shift if(@_);
return $self->{'_specific_job'};
}
1;
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment