Commit 6f8ef59b authored by Leo Gordon's avatar Leo Gordon
Browse files

a more straightforward storing method for an AnalysisJob

parent e45199af
......@@ -100,12 +100,14 @@ sub input_id {
sub param_id_stack {
my $self = shift;
$self->{'_param_id_stack'} = shift if(@_);
$self->{'_param_id_stack'} = '' unless(defined($self->{'_param_id_stack'}));
return $self->{'_param_id_stack'};
}
sub accu_id_stack {
my $self = shift;
$self->{'_accu_id_stack'} = shift if(@_);
$self->{'_accu_id_stack'} = '' unless(defined($self->{'_accu_id_stack'}));
return $self->{'_accu_id_stack'};
}
......@@ -118,6 +120,7 @@ sub worker_id {
sub status {
my $self = shift;
$self->{'_status'} = shift if(@_);
$self->{'_status'} = ( ($self->semaphore_count>0) ? 'SEMAPHORED' : 'READY' ) unless(defined($self->{'_status'}));
return $self->{'_status'};
}
......
......@@ -76,7 +76,7 @@ sub default_overflow_limit {
###############################################################################
#
# CLASS methods
# CLASS method
#
###############################################################################
......@@ -115,58 +115,64 @@ sub CreateNewJob {
unless($analysis->adaptor and $analysis->adaptor->db);
throw("Please specify prev_job object instead of prev_job_id if available") if ($prev_job_id); # 'obsolete' message
$prev_job_id = $prev_job && $prev_job->dbID();
$semaphore_count ||= 0;
my $job_status = ($semaphore_count>0) ? 'SEMAPHORED' : 'READY';
my $analysis_id = $analysis->dbID();
my $job = Bio::EnsEMBL::Hive::AnalysisJob->new(
-prev_job_id => $prev_job_id,
-analysis_id => $analysis_id,
-prev_job_id => $prev_job && $prev_job->dbID,
-analysis_id => $analysis->dbID,
-input_id => $input_id,
-param_id_stack => $param_id_stack || '',
-accu_id_stack => $accu_id_stack || '',
-status => $job_status,
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
-semaphore_count => $semaphore_count,
-semaphored_job_id => $semaphored_job_id,
);
return $analysis->adaptor->db->get_AnalysisJobAdaptor->store_job_and_adjust_counters( $job, $push_new_semaphore );
}
###############################################################################
#
# INSTANCE methods
#
###############################################################################
my $dba = $analysis->adaptor->db;
my $dbc = $dba->dbc;
$dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($semaphored_job_id and ($dbc->driver ne 'sqlite'));
my ($job, $stored_this_time) = $dba->get_AnalysisJobAdaptor->store( $job, 0 );
sub store_job_and_adjust_counters {
my ($self, $job, $push_new_semaphore) = @_;
my $semaphored_job_id = $job->semaphored_job_id();
my $need_to_increase_semaphore_count = ($semaphored_job_id && !$push_new_semaphore);
my $dbc = $self->dbc;
# avoid deadlocks when dataflowing under transactional mode (used in Ortheus Runnable for example):
$dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($need_to_increase_semaphore_count and ($dbc->driver ne 'sqlite'));
my ($job, $stored_this_time) = $self->store( $job, 0 );
if($stored_this_time) {
if($semaphored_job_id and !$push_new_semaphore) { # if we are not creating a new semaphore (where dependent jobs have already been counted),
# but rather propagating an existing one (same or other level), we have to up-adjust the counter
$prev_job->adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id );
if($need_to_increase_semaphore_count) { # if we are not creating a new semaphore (where dependent jobs have already been counted),
# but rather propagating an existing one (same or other level), we have to up-adjust the counter
$self->increase_semaphore_count_for_jobid( $semaphored_job_id );
}
unless($dba->hive_use_triggers()) {
unless($self->db->hive_use_triggers()) {
$dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
}
.(($job_status eq 'READY')
? " ,ready_job_count=ready_job_count+1 "
: " ,semaphored_job_count=semaphored_job_count+1 "
).(($dbc->driver eq 'pgsql')
? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
: " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
)." WHERE analysis_id=$analysis_id ");
UPDATE analysis_stats
SET total_job_count=total_job_count+1
}
.(($job->status eq 'READY')
? " ,ready_job_count=ready_job_count+1 "
: " ,semaphored_job_count=semaphored_job_count+1 "
).(($dbc->driver eq 'pgsql')
? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
: " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
)." WHERE analysis_id=".$job->analysis_id
);
}
return $job->dbID();
}
}
###############################################################################
#
# INSTANCE methods
#
###############################################################################
=head2 fetch_all_by_analysis_id_status
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment