Commit 95b1f35f authored by Leo Gordon's avatar Leo Gordon
Browse files

store a Job using BaseAdaptor's store() method

parent 412da5f6
......@@ -104,8 +104,8 @@ sub default_overflow_limit {
sub CreateNewJob {
my ($class, @args) = @_;
my ($input_id, $param_id_stack, $accu_id_stack, $analysis, $prev_job, $prev_job_id, $semaphore_count, $semaphored_job_id, $push_new_semaphore) =
rearrange([qw(input_id param_id_stack accu_id_stack analysis prev_job prev_job_id semaphore_count semaphored_job_id push_new_semaphore)], @args);
my ($prev_job, $prev_job_id, $analysis, $input_id, $param_id_stack, $accu_id_stack, $semaphore_count, $semaphored_job_id, $push_new_semaphore) =
rearrange([qw(prev_job prev_job_id analysis input_id param_id_stack accu_id_stack semaphore_count semaphored_job_id push_new_semaphore)], @args);
throw("must define input_id") unless($input_id);
throw("must define analysis") unless($analysis);
......@@ -115,73 +115,50 @@ sub CreateNewJob {
unless($analysis->adaptor and $analysis->adaptor->db);
throw("Please specify prev_job object instead of prev_job_id if available") if ($prev_job_id); # 'obsolete' message
$prev_job_id = $prev_job && $prev_job->dbID();
if(ref($input_id)) { # let's do the Perl hash stringification centrally rather than in many places:
$input_id = stringify($input_id);
}
if(length($input_id) >= 255) {
$input_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($input_id);
}
if(length($param_id_stack) >= 64) {
$param_id_stack = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($param_id_stack);
}
if(length($accu_id_stack) >= 64) {
$accu_id_stack = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($accu_id_stack);
}
$semaphore_count ||= 0;
my $dba = $analysis->adaptor->db;
my $dbc = $dba->dbc;
my $insertion_method = { 'mysql' => 'INSERT IGNORE', 'sqlite' => 'INSERT OR IGNORE', 'pgsql' => 'INSERT' }->{ $dbc->driver };
my $job_status = ($semaphore_count>0) ? 'SEMAPHORED' : 'READY';
my $analysis_id = $analysis->dbID();
$prev_job_id = $prev_job && $prev_job->dbID();
$semaphore_count ||= 0;
my $job_status = ($semaphore_count>0) ? 'SEMAPHORED' : 'READY';
my $analysis_id = $analysis->dbID();
my $job = Bio::EnsEMBL::Hive::AnalysisJob->new(
-prev_job_id => $prev_job_id,
-analysis_id => $analysis_id,
-input_id => $input_id,
-param_id_stack => $param_id_stack,
-accu_id_stack => $accu_id_stack,
-status => $job_status,
-semaphore_count => $semaphore_count,
-semaphored_job_id => $semaphored_job_id,
);
my $dba = $analysis->adaptor->db;
my $dbc = $dba->dbc;
$dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($semaphored_job_id and ($dbc->driver ne 'sqlite'));
my $sql = qq{$insertion_method INTO job
(input_id, param_id_stack, accu_id_stack, prev_job_id,analysis_id,status,semaphore_count,semaphored_job_id)
VALUES (?,?,?,?,?,?,?,?)};
my $sth = $dbc->prepare($sql);
my @values = ($input_id, $param_id_stack || '', $accu_id_stack || '', $prev_job_id, $analysis_id, $job_status, $semaphore_count, $semaphored_job_id);
my $return_code = $sth->execute(@values)
# using $return_code in boolean context allows to skip the value '0E0' ('no rows affected') that Perl treats as zero but regards as true:
or die "Could not run\n\t$sql\nwith data:\n\t(".join(',', @values).')';
my ($job, $stored_this_time) = $dba->get_AnalysisJobAdaptor->store( $job, 0 );
my $job_id;
if($return_code > 0) { # <--- for the same reason we have to be explicitly numeric here:
$job_id = $dbc->db_handle->last_insert_id(undef, undef, 'job', 'job_id');
$sth->finish;
if($semaphored_job_id and !$push_new_semaphore) { # if we are not creating a new semaphore (where dependent jobs have already been counted),
if($stored_this_time) {
if($semaphored_job_id and !$push_new_semaphore) { # if we are not creating a new semaphore (where dependent jobs have already been counted),
# but rather propagating an existing one (same or other level), we have to up-adjust the counter
$prev_job->adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id );
}
unless($dba->hive_use_triggers()) {
$dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
}
.(($job_status eq 'READY')
? " ,ready_job_count=ready_job_count+1 "
: " ,semaphored_job_count=semaphored_job_count+1 "
).(($dbc->driver eq 'pgsql')
? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
: " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
)." WHERE analysis_id=$analysis_id ");
}
} else { # if we got 0E0, it means "ignored insert collision" (job created previously), so we simply return an undef and deal with it outside
}
return $job_id;
}
unless($dba->hive_use_triggers()) {
$dbc->do(qq{
UPDATE analysis_stats
SET total_job_count=total_job_count+1
}
.(($job_status eq 'READY')
? " ,ready_job_count=ready_job_count+1 "
: " ,semaphored_job_count=semaphored_job_count+1 "
).(($dbc->driver eq 'pgsql')
? " ,status = CAST(CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END AS analysis_status) "
: " ,status = CASE WHEN status!='BLOCKED' THEN 'LOADING' ELSE 'BLOCKED' END "
)." WHERE analysis_id=$analysis_id ");
}
return $job->dbID();
}
}
###############################################################################
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment