Commit 48d7de63 authored by Matthieu Muffato's avatar Matthieu Muffato
Browse files

Work in progress 2

parent be49c2f8
......@@ -128,19 +128,22 @@ sub is_redundant {
}
=head2 fetch_by_analysis_id_and_input_id
=head2 fetch_semaphore_by_analysis_id_and_input_id
Arg [1] : Integer $analysis_id
Arg [2] : String $input_id
Example : $funnel_job = $job_adaptor->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id);
Example : $funnel_job = $job_adaptor->fetch_semaphore_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id);
Description: Attempts to find the job by contents, then makes another attempt if the input_id is expected to have overflown into analysis_data
Returntype : AnalysisJob object
=cut
sub fetch_by_analysis_id_and_input_id { # It is a special case not covered by AUTOLOAD; note the lowercase _and_
sub fetch_semaphore_by_analysis_id_and_input_id {
my ($self, $analysis_id, $input_id) = @_;
# checksum input_id
# look into unique_semaphore table
# if found return job fetched
my $job = $self->fetch_by_analysis_id_AND_input_id( $analysis_id, $input_id);
if(!$job and length($input_id)>$self->default_overflow_limit->{input_id}) {
......@@ -152,39 +155,6 @@ sub fetch_by_analysis_id_and_input_id { # It is a special case not covered b
}
sub class_specific_execute {
my ($self, $object, $sth, $values) = @_;
my $return_code;
eval {
$return_code = $self->SUPER::class_specific_execute($object, $sth, $values);
1;
} or do {
my $duplicate_regex = {
'mysql' => qr/Duplicate entry.+?for key/s,
'sqlite' => qr/columns.+?are not unique|UNIQUE constraint failed/s, # versions around 3.8 spit the first msg, versions around 3.15 - the second
'pgsql' => qr/duplicate key value violates unique constraint/s,
}->{$self->db->dbc->driver};
if( $@ =~ $duplicate_regex ) { # implementing 'INSERT IGNORE' of Jobs on the API side
my $emitting_job_id = $object->prev_job_id;
my $analysis_id = $object->analysis_id;
my $input_id = $object->input_id;
my $msg = "Attempt to insert a duplicate job (analysis_id=$analysis_id, input_id=$input_id) intercepted and ignored";
$self->db->get_LogMessageAdaptor->store_job_message( $emitting_job_id, $msg, 'INFO' );
$return_code = '0E0';
} else {
die $@;
}
};
return $return_code;
}
=head2 store_jobs_and_adjust_counters
Arg [1] : arrayref of Bio::EnsEMBL::Hive::AnalysisJob $jobs_to_store
......@@ -306,8 +276,16 @@ sub store_a_semaphored_group_of_jobs {
$emitting_job_id = $emitting_job->dbID;
}
my $funnel_semaphore;
# FIXME: don't know if it is a string or a hash
#my $funnel_checksum = stringify($funnel_input_id);
my $funnel_semaphore_adaptor = $self->db->get_SemaphoreAdaptor; # assuming $self was $funnel_job_adaptor
my $funnel_semaphore = $funnel_semaphore_adaptor->fetch_by;
# INSERT IGNORE in semaphore table
# If already exists:
# - leech
# Else:
# -
my ($funnel_job_id) = $funnel_job ? @{ $self->store_jobs_and_adjust_counters( [ $funnel_job ], 0, $emitting_job_id) } : ();
......@@ -316,7 +294,8 @@ sub store_a_semaphored_group_of_jobs {
if($no_leeching) {
die "The funnel job could not be stored, but leeching was not allowed, so bailing out";
} elsif( $funnel_job = $self->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id) ) {
# FIXME
} elsif( $funnel_job = $self->fetch_semaphore_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id) ) {
$funnel_job_id = $funnel_job->dbID;
# If the job hasn't run yet, we can still block it
......
......@@ -221,6 +221,7 @@ our %adaptor_type_2_package_name = (
'Semaphore' => 'Bio::EnsEMBL::Hive::DBSQL::SemaphoreAdaptor',
'Queen' => 'Bio::EnsEMBL::Hive::Queen',
'UniqueJob' => 'Bio::EnsEMBL::Hive::DBSQL::UniqueJobAdaptor',
'UniqueSempahore' => 'Bio::EnsEMBL::Hive::DBSQL::UniqueSempahoreAdaptor',
# aliases:
'Job' => 'Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor',
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::DBSQL::UniqueSemaphoreAdaptor
=head1 SYNOPSIS
$pipeline_wide_parameters_adaptor = $db_adaptor->get_UniqueSemaphoreAdaptor;
=head1 DESCRIPTION
This module deals with pipeline_wide_parameters' storage and retrieval, and also stores 'schema_version' for compatibility with Core API
=head1 LICENSE
Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Copyright [2016-2019] EMBL-European Bioinformatics Institute
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
=head1 CONTACT
Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
=cut
package Bio::EnsEMBL::Hive::DBSQL::UniqueSemaphoreAdaptor;
use strict;
use warnings;
use base ('Bio::EnsEMBL::Hive::DBSQL::UniqueAdaptor');
sub default_table_name {
return 'representative_semaphore';
}
sub category_fields {
return [qw(analysis_id)];
}
sub checksum_field {
return 'param_checksum';
}
sub representative_field {
return 'representative_semaphore_id';
}
1;
......@@ -79,13 +79,15 @@ sub fetch_input {
my $b_multiplier = $self->param_required('b_multiplier');
my %digit_hash = ();
my @sub_tasks;
foreach my $digit (split(//,$b_multiplier)) {
# next if (($digit eq '0') or ($digit eq '1'));
$digit_hash{$digit}++;
push @sub_tasks, { 'digit' => $digit };
}
# parameter hashes of partial multiplications to be computed:
my @sub_tasks = map { { 'digit' => $_ } } sort { $a <=> $b } keys %digit_hash;
#my @sub_tasks = map { { 'digit' => $_ } } sort { $a <=> $b } keys %digit_hash;
# store them for future use:
$self->param('sub_tasks', \@sub_tasks);
......
......@@ -34,6 +34,7 @@ ALTER TABLE analysis_stats_monitor ADD FOREIGN KEY (analysis_id)
ALTER TABLE dataflow_rule ADD FOREIGN KEY (from_analysis_id) REFERENCES analysis_base(analysis_id);
ALTER TABLE job ADD CONSTRAINT job_analysis_id_fkey FOREIGN KEY (analysis_id) REFERENCES analysis_base(analysis_id);
ALTER TABLE role ADD FOREIGN KEY (analysis_id) REFERENCES analysis_base(analysis_id);
ALTER TABLE unique_job ADD FOREIGN KEY (analysis_id) REFERENCES analysis_base(analysis_id);
ALTER TABLE job ADD FOREIGN KEY (last_attempt_id) REFERENCES attempt(attempt_id);
ALTER TABLE log_message ADD FOREIGN KEY (attempt_id) REFERENCES attempt(attempt_id) ON DELETE CASCADE;
......@@ -46,6 +47,7 @@ ALTER TABLE attempt ADD CONSTRAINT attempt_job_id_fkey
ALTER TABLE job ADD CONSTRAINT job_prev_job_id_fkey FOREIGN KEY (prev_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE log_message ADD FOREIGN KEY (job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE semaphore ADD CONSTRAINT semaphore_dependent_job_id_fkey FOREIGN KEY (dependent_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE unique_job ADD FOREIGN KEY (representative_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE analysis_base ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE resource_description ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
......
......@@ -196,7 +196,7 @@ MODIFIES SQL DATA
BEGIN
SET FOREIGN_KEY_CHECKS=0;
DROP VIEW IF EXISTS msg, progress, resource_usage_stats, live_roles, beekeeper_activity, semaphore_job;
DROP TABLE IF EXISTS pipeline_wide_parameters, analysis_stats_monitor, worker_resource_usage, resource_description, analysis_data, dataflow_target, dataflow_rule, analysis_ctrl_rule, analysis_stats, log_message, accu, job, attempt, semaphore, role, worker, beekeeper, analysis_base, resource_class, hive_meta;
DROP TABLE IF EXISTS pipeline_wide_parameters, analysis_stats_monitor, worker_resource_usage, resource_description, analysis_data, dataflow_target, dataflow_rule, analysis_ctrl_rule, analysis_stats, log_message, accu, unique_job, job, attempt, semaphore, role, worker, beekeeper, analysis_base, resource_class, hive_meta;
SET FOREIGN_KEY_CHECKS=1;
END; //
DELIMITER ;
......@@ -387,6 +387,41 @@ CREATE TABLE attempt (
KEY job_id (job_id) -- for finding the attempts linked to a job
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
/**
@table representative_job
@colour #1D73DA
@desc The representative_job table ensures that a in each analysis, a given task (defined by its parameters) is only run once.
We record their pairs of (analysis_id, param_checksum) and the matching job_id. When new jobs are tried but are
found in this table, they are aborted.
@column analysis_id the analysis_id needed to accomplish this job.
@column param_checksum checksum over all the job parameters (input_id and accumulator, incl. inherited ones)
@column representative_job_id points at the job that has been selected to run this analysis with these parameters
*/
CREATE TABLE representative_job (
analysis_id INTEGER NOT NULL,
param_checksum CHAR(32) NOT NULL,
representative_job_id INTEGER NOT NULL,
PRIMARY KEY analysis_params (analysis_id, param_checksum), -- to avoid repeating tasks
KEY representative_job_id (representative_job_id)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
CREATE TABLE representative_semaphore (
analysis_id INTEGER NOT NULL,
param_checksum CHAR(32) NOT NULL,
representative_semaphore_id INTEGER NOT NULL,
PRIMARY KEY analysis_params (analysis_id, param_checksum), -- to avoid repeating tasks
KEY representative_semaphore_id (representative_semaphore_id)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
/**
@table semaphore
......@@ -407,6 +442,8 @@ CREATE TABLE attempt (
CREATE TABLE semaphore (
semaphore_id INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,
analysis_id INTEGER NOT NULL,
param_checksum CHAR(32) NOT NULL,
local_jobs_counter INTEGER DEFAULT 0,
remote_jobs_counter INTEGER DEFAULT 0,
dependent_job_id INTEGER DEFAULT NULL, -- Both should never be NULLs at the same time,
......@@ -416,6 +453,18 @@ CREATE TABLE semaphore (
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
CREATE TABLE funnel_template (
semaphore_id INTEGER NOT NULL PRIMARY KEY,
emitting_job_id INTEGER DEFAULT NULL, -- the job that created this semaphore using a dataflow rule
analysis_id INTEGER NOT NULL,
input_id CHAR(255) NOT NULL,
param_id_stack CHAR(64) NOT NULL DEFAULT '',
accu_id_stack CHAR(64) NOT NULL DEFAULT '',
when_created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY input_id_stacks_analysis (input_id, param_id_stack, accu_id_stack, analysis_id) -- to avoid repeating tasks
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
/**
@table accu
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment