Commit 8071c155 authored by Matthieu Muffato's avatar Matthieu Muffato
Browse files

New way of dealing with duplicated jobs

We now allow storing them, but at runtime they are aborted. This is
achieved by computing a checksum of the parameters hash. In fact, this
allows to detect cases where different stacks lead to the same final hash !

There is an additional attempt status, PARAM_CHECK, which is used when eHive
is checking whether the job has already been selected, and a new job
status, REDUNDANT, when a job is discarded.
parent 06256908
......@@ -45,7 +45,7 @@ use Term::ANSIColor;
use base ( 'Bio::EnsEMBL::Hive::Storable' );
# How to map the job statuses to the counters
our %status2counter = ('FAILED' => 'failed_job_count', 'READY' => 'ready_job_count', 'DONE' => 'done_job_count', 'PASSED_ON' => 'done_job_count', 'SEMAPHORED' => 'semaphored_job_count');
our %status2counter = ('FAILED' => 'failed_job_count', 'READY' => 'ready_job_count', 'DONE' => 'done_job_count', 'PASSED_ON' => 'done_job_count', 'REDUNDANT' => 'done_job_count', 'SEMAPHORED' => 'semaphored_job_count');
sub unikey { # override the default from Cacheable parent
......
......@@ -75,6 +75,12 @@ sub when_ended {
return $self->{'_when_ended'};
}
sub param_checksum {
my $self = shift;
$self->{'_param_checksum'} = shift if(@_);
return $self->{'_param_checksum'};
}
sub runtime_msec {
my $self = shift;
$self->{'_runtime_msec'} = shift if(@_);
......
......@@ -48,7 +48,7 @@ use warnings;
use Bio::EnsEMBL::Hive::Cacheable;
use Bio::EnsEMBL::Hive::Semaphore;
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
use Bio::EnsEMBL::Hive::Utils ('stringify', 'destringify');
use Bio::EnsEMBL::Hive::Utils ('destringify');
use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
......@@ -129,39 +129,6 @@ sub fetch_by_analysis_id_and_input_id { # It is a special case not covered b
}
sub class_specific_execute {
my ($self, $object, $sth, $values) = @_;
my $return_code;
eval {
$return_code = $self->SUPER::class_specific_execute($object, $sth, $values);
1;
} or do {
my $duplicate_regex = {
'mysql' => qr/Duplicate entry.+?for key/s,
'sqlite' => qr/columns.+?are not unique|UNIQUE constraint failed/s, # versions around 3.8 spit the first msg, versions around 3.15 - the second
'pgsql' => qr/duplicate key value violates unique constraint/s,
}->{$self->db->dbc->driver};
if( $@ =~ $duplicate_regex ) { # implementing 'INSERT IGNORE' of Jobs on the API side
my $emitting_job_id = $object->prev_job_id;
my $analysis_id = $object->analysis_id;
my $input_id = $object->input_id;
my $msg = "Attempt to insert a duplicate job (analysis_id=$analysis_id, input_id=$input_id) intercepted and ignored";
$self->db->get_LogMessageAdaptor->store_job_message( $emitting_job_id, $msg, 'INFO' );
$return_code = '0E0';
} else {
die $@;
}
};
return $return_code;
}
=head2 store_jobs_and_adjust_counters
Arg [1] : arrayref of Bio::EnsEMBL::Hive::AnalysisJob $jobs_to_store
......
......@@ -33,7 +33,11 @@ package Bio::EnsEMBL::Hive::DBSQL::AttemptAdaptor;
use strict;
use warnings;
use Digest::MD5 qw(md5_hex);
use Bio::EnsEMBL::Hive::Attempt;
use Bio::EnsEMBL::Hive::Utils ('stringify');
use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
......@@ -51,6 +55,68 @@ sub object_class {
# ------------------------------------ Attempt methods ------------------------------------------
sub check_job_uniqueness {
my ($self, $attempt) = @_;
my $job = $attempt->job;
# Assumes the parameters have already been loaded
my $checksum = md5_hex(stringify($job->{'_unsubstituted_param_hash'}));
$attempt->param_checksum($checksum);
$self->update_param_checksum($attempt);
# Check if this is a rerun (better not to trust retry_count
my $exist_sql = 'SELECT 1 FROM unique_job WHERE representative_job_id = ? AND param_checksum = ?';
my $exist_job = $self->dbc->protected_select( [ $exist_sql, $job->dbID, $checksum ],
sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'finding if the job has already run'.$after, 'INFO' ); }
);
if ($exist_job && scalar(@$exist_job)) {
# reruns don't count
return 0;
}
my $sql = 'INSERT INTO unique_job (analysis_id, param_checksum, representative_job_id) VALUES (?,?,?)';
my $is_redundant = 0;
eval {
$self->dbc->protected_prepare_execute( [ $sql, $job->analysis_id, $checksum, $job->dbID ],
sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'checking the job unicity'.$after, 'INFO' ); }
);
1;
} or do {
my $duplicate_regex = {
'mysql' => qr/Duplicate entry.+?for key/s,
'sqlite' => qr/columns.+?are not unique|UNIQUE constraint failed/s, # versions around 3.8 spit the first msg, versions around 3.15 - the second
'pgsql' => qr/duplicate key value violates unique constraint/s,
}->{$self->db->dbc->driver};
if( $@ =~ $duplicate_regex ) { # implementing 'INSERT IGNORE' of Jobs on the API side
my $other_sql = 'SELECT representative_job_id FROM unique_job WHERE analysis_id = ? AND param_checksum = ?';
my $other_job = $self->dbc->protected_select( [ $other_sql, $job->analysis_id, $checksum ],
sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( 'finding the representative job'.$after, 'INFO' ); }
);
die "Cannot check the unicity of the job" unless $other_job;
my $other_job_id = $other_job->[0]->{'representative_job_id'};
my $this_job_id = $job->dbID;
my $analysis_id = $job->analysis_id;
my $msg = "Discarding this job because another job (job_id=$other_job_id) is already onto (analysis_id=$analysis_id, param_checksum=$checksum)";
$self->db->get_LogMessageAdaptor->store_job_message( $this_job_id, $msg, 'INFO' );
$is_redundant = 1;
} else {
die $@;
}
};
return $is_redundant;
}
=head2 check_in_attempt
Arg [1] : Bio::EnsEMBL::Hive::Attempt $attempt
......
......@@ -125,6 +125,7 @@ sub life_cycle {
my $job = $self->input_job();
my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
my %job_partial_timing = ();
my $is_redundant;
$job->incomplete(1); # reinforce, in case the life_cycle is not run by a Worker
$job->autoflow(1);
......@@ -140,6 +141,17 @@ sub life_cycle {
$job->adaptor->db->get_AccumulatorAdaptor->remove_all_by_sending_job_id($job->dbID);
}
if (my $attempt_adaptor = $self->attempt->adaptor) {
$self->enter_status('PARAM_CHECK');
$is_redundant = $attempt_adaptor->check_job_uniqueness($self->attempt);
if ($is_redundant) {
$job->incomplete(0);
return;
}
}
if( $self->can('pre_cleanup') and $job->attempt_count()>1 ) {
$self->enter_status('PRE_CLEANUP');
$self->pre_cleanup;
......@@ -174,6 +186,8 @@ sub life_cycle {
# Restore the default handler
#$SIG{__WARN__} = 'DEFAULT';
return 'is_redundant' if $is_redundant;
if(my $life_cycle_msg = $@) {
$job->died_somewhere( $job->incomplete ); # it will be OR'd inside
Bio::EnsEMBL::Hive::Process::warning($self, $life_cycle_msg, $job->incomplete?'WORKER_ERROR':'INFO'); # In case the Runnable has redefined warning()
......
......@@ -587,6 +587,7 @@ sub run {
#
if($jobs_done_by_batches_loop) {
$self->worker_say("Worker::run did $jobs_done_by_batches_loop");
$self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
$self->current_role->analysis->dbID,
$jobs_done_by_batches_loop,
......@@ -761,11 +762,13 @@ sub run_one_batch {
# whether the job completed successfully or not:
$self->runnable_object->attempt( undef ); # release an extra reference to the job
$attempt->runtime_msec( $job_stopwatch->get_elapsed );
$attempt->query_count( $self->adaptor->db->dbc->query_count );
$attempt->adaptor->record_attempt_completion($attempt, $job->died_somewhere ? 0 : 1);
my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died' : 'complete' );
my $is_redundant = $job_partial_timing && ("$job_partial_timing" eq 'is_redundant');
my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died' : ($is_redundant ? 'redundant' : 'complete') );
print "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->died_somewhere)); # one copy goes to the job's STDERR
$self->stop_job_output_redirection($attempt); # and then we switch back to worker's STDERR
......@@ -790,9 +793,9 @@ sub run_one_batch {
last ONE_BATCH;
}
} else { # job successfully completed:
$self->more_work_done( $job_partial_timing );
$self->more_work_done( $job_partial_timing ) unless $is_redundant;
$jobs_done_here++;
$job->set_and_update_status('DONE');
$job->set_and_update_status($is_redundant ? 'REDUNDANT' : 'DONE');
if( my $controlled_semaphore = $job->controlled_semaphore ) {
$controlled_semaphore->decrease_by( [ $job ] );
......@@ -832,6 +835,7 @@ sub run_one_batch {
} # /while(my $job = shift @$jobs)
$self->worker_say("Worker::run_one_batch did $jobs_done_here");
return $jobs_done_here;
}
......
......@@ -34,6 +34,7 @@ ALTER TABLE analysis_stats_monitor ADD FOREIGN KEY (analysis_id)
ALTER TABLE dataflow_rule ADD FOREIGN KEY (from_analysis_id) REFERENCES analysis_base(analysis_id);
ALTER TABLE job ADD CONSTRAINT job_analysis_id_fkey FOREIGN KEY (analysis_id) REFERENCES analysis_base(analysis_id);
ALTER TABLE role ADD FOREIGN KEY (analysis_id) REFERENCES analysis_base(analysis_id);
ALTER TABLE unique_job ADD FOREIGN KEY (analysis_id) REFERENCES analysis_base(analysis_id);
ALTER TABLE job ADD FOREIGN KEY (last_attempt_id) REFERENCES attempt(attempt_id);
ALTER TABLE log_message ADD FOREIGN KEY (attempt_id) REFERENCES attempt(attempt_id) ON DELETE CASCADE;
......@@ -46,6 +47,7 @@ ALTER TABLE attempt ADD CONSTRAINT attempt_job_id_fkey
ALTER TABLE job ADD CONSTRAINT job_prev_job_id_fkey FOREIGN KEY (prev_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE log_message ADD FOREIGN KEY (job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE semaphore ADD CONSTRAINT semaphore_dependent_job_id_fkey FOREIGN KEY (dependent_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE unique_job ADD FOREIGN KEY (representative_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE analysis_base ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE resource_description ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
......
......@@ -196,7 +196,7 @@ MODIFIES SQL DATA
BEGIN
SET FOREIGN_KEY_CHECKS=0;
DROP VIEW IF EXISTS msg, progress, resource_usage_stats, live_roles, beekeeper_activity, semaphore_job;
DROP TABLE IF EXISTS pipeline_wide_parameters, analysis_stats_monitor, worker_resource_usage, resource_description, analysis_data, dataflow_target, dataflow_rule, analysis_ctrl_rule, analysis_stats, log_message, accu, job, attempt, semaphore, role, worker, beekeeper, analysis_base, resource_class, hive_meta;
DROP TABLE IF EXISTS pipeline_wide_parameters, analysis_stats_monitor, worker_resource_usage, resource_description, analysis_data, dataflow_target, dataflow_rule, analysis_ctrl_rule, analysis_stats, log_message, accu, unique_job, job, attempt, semaphore, role, worker, beekeeper, analysis_base, resource_class, hive_meta;
SET FOREIGN_KEY_CHECKS=1;
END; //
DELIMITER ;
......@@ -343,12 +343,11 @@ CREATE TABLE job (
when_created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
role_id INTEGER DEFAULT NULL,
last_attempt_id INTEGER DEFAULT NULL, -- the last attempt registered for this job
status ENUM('SEMAPHORED','READY','CLAIMED','IN_PROGRESS','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL,
status ENUM('SEMAPHORED','READY','CLAIMED','IN_PROGRESS','DONE','FAILED','PASSED_ON','REDUNDANT') DEFAULT 'READY' NOT NULL,
controlled_semaphore_id INTEGER DEFAULT NULL, -- terminology: fan jobs CONTROL semaphores; funnel jobs or remote semaphores DEPEND ON (local) semaphores
UNIQUE KEY input_id_stacks_analysis (input_id, param_id_stack, accu_id_stack, analysis_id), -- to avoid repeating tasks
KEY analysis_status_attempted (analysis_id, status, last_attempt_id), -- for claiming jobs
UNIQUE KEY analysis_status_virgin (analysis_id, status, last_attempt_id), -- for claiming jobs
KEY role_status (role_id, status) -- for fetching and releasing claimed jobs
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......@@ -375,11 +374,12 @@ CREATE TABLE attempt (
attempt_id INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,
role_id INTEGER NOT NULL,
job_id INTEGER NOT NULL, -- the job this attempt is about
status ENUM('INITIALIZATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_HEALTHCHECK','POST_CLEANUP','END') DEFAULT 'INITIALIZATION' NOT NULL,
status ENUM('INITIALIZATION','PARAM_CHECK','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_HEALTHCHECK','POST_CLEANUP','END') DEFAULT 'INITIALIZATION' NOT NULL,
when_initialized TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
when_updated TIMESTAMP NULL, -- mysql's special for "TIMESTAMP DEFAULT NULL"
when_ended TIMESTAMP NULL, -- mysql's special for "TIMESTAMP DEFAULT NULL"
is_success TINYINT(1),
param_checksum CHAR(32),
runtime_msec INTEGER DEFAULT NULL,
query_count INTEGER DEFAULT NULL,
stdout_file VARCHAR(255),
......@@ -388,6 +388,30 @@ CREATE TABLE attempt (
KEY job_id (job_id) -- for finding the attempts linked to a job
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
/**
@table unique_job
@colour #1D73DA
@desc The unique_job table ensures that a in each analysis, a given task (defined by its parameters) is only run once.
We record their pairs of (analysis_id, param_checksum) and the matching job_id. When new jobs are tried but are
found in this table, they are aborted.
@column analysis_id the analysis_id needed to accomplish this job.
@column param_checksum checksum over all the job parameters (input_id and accumulator, incl. inherited ones)
@column representative_job_id points at the job that has been selected to run this analysis with these parameters
*/
CREATE TABLE unique_job (
analysis_id INTEGER NOT NULL,
param_checksum CHAR(32) NOT NULL,
representative_job_id INTEGER NOT NULL,
PRIMARY KEY analysis_params (analysis_id, param_checksum), -- to avoid repeating tasks
KEY representative_job_id (representative_job_id)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
/**
@table semaphore
......
......@@ -334,7 +334,7 @@ CREATE TABLE resource_description (
@column controlled_semaphore_id the dbID of the semaphore that is controlled by this job (and whose counter it will decrement by 1 upon successful completion)
*/
CREATE TYPE job_status AS ENUM ('SEMAPHORED','READY','CLAIMED','IN_PROGRESS','DONE','FAILED','PASSED_ON');
CREATE TYPE job_status AS ENUM ('SEMAPHORED','READY','CLAIMED','IN_PROGRESS','DONE','FAILED','PASSED_ON','REDUNDANT');
CREATE TABLE job (
job_id SERIAL PRIMARY KEY,
prev_job_id INTEGER DEFAULT NULL, -- the job that created this one using a dataflow rule
......@@ -347,24 +347,21 @@ CREATE TABLE job (
last_attempt_id INTEGER DEFAULT NULL, -- the last attempt registered for this job
status job_status NOT NULL DEFAULT 'READY',
controlled_semaphore_id INTEGER DEFAULT NULL, -- terminology: fan jobs CONTROL semaphores; funnel jobs or remote semaphores DEPEND ON (local) semaphores
UNIQUE (input_id, param_id_stack, accu_id_stack, analysis_id) -- to avoid repeating tasks
controlled_semaphore_id INTEGER DEFAULT NULL -- terminology: fan jobs CONTROL semaphores; funnel jobs or remote semaphores DEPEND ON (local) semaphores
);
CREATE INDEX ON job (analysis_id, status, last_attempt_id); -- for claiming jobs
CREATE INDEX ON job (role_id, status); -- for fetching and releasing claimed jobs
/*
-- PostgreSQL is lacking INSERT IGNORE, so we need a RULE to silently
-- discard the insertion of duplicated entries in the job table
CREATE OR REPLACE RULE job_table_ignore_duplicate_inserts AS
ON INSERT TO job
WHERE EXISTS (
SELECT 1
FROM job
WHERE job.input_id=NEW.input_id AND job.param_id_stack=NEW.param_id_stack AND job.accu_id_stack=NEW.accu_id_stack AND job.analysis_id=NEW.analysis_id)
DO INSTEAD NOTHING;
*/
CREATE TABLE unique_job (
analysis_id INTEGER NOT NULL,
param_checksum CHAR(32) NOT NULL,
representative_job_id INTEGER NOT NULL,
PRIMARY KEY (analysis_id, param_checksum) -- to avoid repeating tasks
);
CREATE INDEX ON unique_job (representative_job_id);
/**
@table attempt
......@@ -385,7 +382,7 @@ CREATE OR REPLACE RULE job_table_ignore_duplicate_inserts AS
@column stderr_file path to the job's STDERR log
*/
CREATE TYPE attempt_status AS ENUM ('INITIALIZATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_HEALTHCHECK','POST_CLEANUP','END');
CREATE TYPE attempt_status AS ENUM ('INITIALIZATION','PARAM_CHECK','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_HEALTHCHECK','POST_CLEANUP','END');
CREATE TABLE attempt (
attempt_id SERIAL PRIMARY KEY,
role_id INTEGER NOT NULL,
......@@ -395,6 +392,7 @@ CREATE TABLE attempt (
when_updated TIMESTAMP NULL, -- mysql's special for "TIMESTAMP DEFAULT NULL"
when_ended TIMESTAMP NULL, -- mysql's special for "TIMESTAMP DEFAULT NULL"
is_success SMALLINT,
param_checksum CHAR(32),
runtime_msec INTEGER DEFAULT NULL,
query_count INTEGER DEFAULT NULL,
stdout_file VARCHAR(255),
......
......@@ -339,12 +339,11 @@ CREATE TABLE job (
when_created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
role_id INTEGER DEFAULT NULL,
last_attempt_id INTEGER DEFAULT NULL, -- the last attempt registered for this job
status TEXT NOT NULL DEFAULT 'READY', -- expected values: 'SEMAPHORED','READY','CLAIMED','IN_PROGRESS','DONE','FAILED','PASSED_ON'
status TEXT NOT NULL DEFAULT 'READY', -- expected values: 'SEMAPHORED','READY','CLAIMED','IN_PROGRESS','DONE','FAILED','PASSED_ON','REDUNDANT'
controlled_semaphore_id INTEGER DEFAULT NULL -- terminology: fan jobs CONTROL semaphores; funnel jobs or remote semaphores DEPEND ON (local) semaphores
);
CREATE UNIQUE INDEX job_input_id_stacks_analysis_idx ON job (input_id, param_id_stack, accu_id_stack, analysis_id);
CREATE INDEX job_analysis_status_attempted_idx ON job (analysis_id, status, last_attempt_id);
CREATE INDEX job_analysis_status_virgin_idx ON job (analysis_id, status, last_attempt_id);
CREATE INDEX job_role_id_status_id_idx ON job (role_id);
/**
......@@ -370,11 +369,12 @@ CREATE TABLE attempt (
attempt_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
role_id INTEGER NOT NULL,
job_id INTEGER NOT NULL, -- the job this attempt is about
status TEXT NOT NULL DEFAULT 'INITIALIZATION', -- expected values: 'INITIALIZATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_HEALTHCHECK','POST_CLEANUP','END'
status TEXT NOT NULL DEFAULT 'INITIALIZATION', -- expected values: 'INITIALIZATION','PARAM_CHECK','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_HEALTHCHECK','POST_CLEANUP','END'
when_initialized TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
when_updated TIMESTAMP NULL, -- mysql's special for "TIMESTAMP DEFAULT NULL"
when_ended TIMESTAMP NULL, -- mysql's special for "TIMESTAMP DEFAULT NULL"
is_success INTEGER,
param_checksum CHAR(32),
runtime_msec INTEGER DEFAULT NULL,
query_count INTEGER DEFAULT NULL,
stdout_file VARCHAR(255),
......@@ -383,6 +383,16 @@ CREATE TABLE attempt (
CREATE INDEX attempt_job_id ON attempt (job_id); -- for finding the attempts linked to a job
CREATE TABLE unique_job (
analysis_id INTEGER NOT NULL,
param_checksum CHAR(32) NOT NULL,
representative_job_id INTEGER NOT NULL,
PRIMARY KEY (analysis_id, param_checksum) -- to avoid repeating tasks
);
CREATE INDEX unique_job_representative_job_id ON unique_job (representative_job_id);
/**
@table semaphore
......
#!/usr/bin/env perl
# Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
# Copyright [2016-2019] EMBL-European Bioinformatics Institute
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
use strict;
use warnings;
use Data::Dumper;
use Test::More;
use Test::Exception;
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::Utils::Test qw(init_pipeline run_sql_on_db get_test_urls);
# eHive needs this to initialize the pipeline (and run db_cmd.pl)
use Cwd ();
use File::Basename ();
$ENV{'EHIVE_ROOT_DIR'} ||= File::Basename::dirname( File::Basename::dirname( File::Basename::dirname( Cwd::realpath($0) ) ) );
my $ehive_test_pipeline_urls = get_test_urls();
foreach my $pipeline_url (@$ehive_test_pipeline_urls) {
subtest 'Test on '.$pipeline_url, sub {
init_pipeline('Bio::EnsEMBL::Hive::Examples::LongMult::PipeConfig::LongMult_conf', $pipeline_url);
my $hive_dba = Bio::EnsEMBL::Hive::DBSQL::DBAdaptor->new( -url => $pipeline_url );
my $job_adaptor = $hive_dba->get_AnalysisJobAdaptor;
my $ref_job = $job_adaptor->fetch_all()->[0];
ok($ref_job, 'Could fetch a reference job');
my %job_param = (
'analysis' => $ref_job->analysis,
'input_id' => '{"a" => 3}',
);
my $new_job1 = Bio::EnsEMBL::Hive::AnalysisJob->new(%job_param);
my ($j1, $stored1) = $job_adaptor->store($new_job1);
ok($stored1, 'A new job could be stored');
lives_ok( sub {
my $new_job2 = Bio::EnsEMBL::Hive::AnalysisJob->new(%job_param);
my ($j2, $stored2) = $job_adaptor->store($new_job2);
ok(!$stored2, 'A copy of the job was *not* stored (as expected)');
}, 'Can survive the insertion of a duplicated job' );
$hive_dba->dbc->disconnect_if_idle();
run_sql_on_db($pipeline_url, 'DROP DATABASE');
}
}
done_testing();
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment