Commit 9d0bbeef authored by Leo Gordon's avatar Leo Gordon
Browse files

schema_change: new implementation of Semaphores as separate objects that can...

schema_change: new implementation of Semaphores as separate objects that can cross pipeline database boundaries
parent 0213e931
......@@ -98,7 +98,7 @@ sub display_name {
sub dataflow {
my ( $self, $output_ids, $emitting_job ) = @_;
if(my $receiving_job = $emitting_job->semaphored_job) {
if(my $receiving_job = $emitting_job->controlled_semaphore->ultimate_dependent_job) { # we need to reach all the way to the actual job
my $receiving_job_id = $receiving_job->dbID;
my $accu_adaptor = $receiving_job->adaptor->db->get_AccumulatorAdaptor;
......
......@@ -331,7 +331,6 @@ sub dataflow {
my $param_id_stack = '';
my $accu_id_stack = '';
my $emitting_job_id = undef;
my $semaphored_job = $emitting_job->semaphored_job;
if($same_db_dataflow) {
$param_id_stack = $emitting_job->param_id_stack;
......@@ -367,8 +366,8 @@ sub dataflow {
my $fan_cache_this_branch = $emitting_job->fan_cache->{"$funnel_dataflow_rule"} ||= [];
push @$fan_cache_this_branch, map { Bio::EnsEMBL::Hive::AnalysisJob->new(
@$common_params,
'input_id' => $_,
# semaphored_job => to be set when the $funnel_job has been stored
'input_id' => $_,
# controlled_semaphore => to be set when the $controlled_semaphore has been stored
) } @$output_ids_for_this_rule;
} else { # either a semaphored funnel or a non-semaphored dataflow:
......@@ -386,8 +385,10 @@ sub dataflow {
my $funnel_job = Bio::EnsEMBL::Hive::AnalysisJob->new(
@$common_params,
'input_id' => $output_ids_for_this_rule->[0],
'status' => 'SEMAPHORED',
);
# NB: $job_adaptor happens to belong to the $funnel_job, but not necesarily to $fan_jobs or $emitting_job
my ($funnel_job_id, @fan_job_ids) = $job_adaptor->store_a_semaphored_group_of_jobs( $funnel_job, $fan_jobs, $emitting_job );
push @output_job_ids, $funnel_job_id, @fan_job_ids;
......@@ -395,10 +396,11 @@ sub dataflow {
} else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
my @non_semaphored_jobs = map { Bio::EnsEMBL::Hive::AnalysisJob->new(
@$common_params,
'input_id' => $_,
'semaphored_job' => $semaphored_job, # propagate parent's semaphore if any
'input_id' => $_,
'controlled_semaphore' => $emitting_job->controlled_semaphore, # propagate parent's semaphore if any
) } @$output_ids_for_this_rule;
# NB: $job_adaptor happens to belong to the @non_semaphored_jobs, but not necessarily to the $emitting_job :
push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0, $emitting_job_id) };
}
} # /if funnel
......
......@@ -40,7 +40,7 @@ package Bio::EnsEMBL::Hive::AnalysisJob;
use strict;
use warnings;
use Bio::EnsEMBL::Hive::Utils ('stringify', 'destringify');
use Bio::EnsEMBL::Hive::Utils ('stringify', 'destringify', 'throw');
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
use Bio::EnsEMBL::Hive::TheApiary;
......@@ -56,7 +56,7 @@ use base ( 'Bio::EnsEMBL::Hive::Cacheable',# mainly to inherit hive_pipeline()
analysis_id / analysis
semaphored_job_id / semaphored_job
controlled_semaphore_id / controlled_semaphore
=cut
......@@ -94,8 +94,7 @@ sub role_id {
sub status {
my $self = shift;
$self->{'_status'} = shift if(@_);
$self->{'_status'} = ( ($self->semaphore_count>0) ? 'SEMAPHORED' : 'READY' ) unless(defined($self->{'_status'}));
return $self->{'_status'};
return $self->{'_status'} || 'READY';
}
sub retry_count {
......@@ -125,13 +124,6 @@ sub query_count {
return $self->{'_query_count'};
}
sub semaphore_count {
my $self = shift;
$self->{'_semaphore_count'} = shift if(@_);
$self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'}));
return $self->{'_semaphore_count'};
}
sub set_and_update_status {
my ($self, $status ) = @_;
......@@ -257,10 +249,6 @@ sub load_parameters {
);
$self->param_init( @params_precedence );
if(my $semaphored_job_url = $self->_param_silent('HIVE_semaphored_job_url')) {
$self->semaphored_job( Bio::EnsEMBL::Hive::TheApiary->find_by_url( $semaphored_job_url ) );
}
}
......
......@@ -52,6 +52,26 @@ sub is_local_to {
}
sub count_local_and_remote_objects {
my $self = shift @_;
my $objects = shift @_;
my $this_pipeline = $self->hive_pipeline;
my $local_count = 0;
my $remote_count = 0;
foreach my $object (@$objects) {
if($object->hive_pipeline == $this_pipeline) {
$local_count++;
} else {
$remote_count++;
}
}
return ($local_count, $remote_count);
}
sub relative_display_name {
my ($self, $ref_pipeline) = @_; # if 'reference' hive_pipeline is the same as 'my' hive_pipeline, a shorter display_name is generated
......
......@@ -206,6 +206,7 @@ our %adaptor_type_2_package_name = (
'ResourceClass' => 'Bio::EnsEMBL::Hive::DBSQL::ResourceClassAdaptor',
'ResourceDescription' => 'Bio::EnsEMBL::Hive::DBSQL::ResourceDescriptionAdaptor',
'Role' => 'Bio::EnsEMBL::Hive::DBSQL::RoleAdaptor',
'Semaphore' => 'Bio::EnsEMBL::Hive::DBSQL::SemaphoreAdaptor',
'Queen' => 'Bio::EnsEMBL::Hive::Queen',
# aliases:
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::DBSQL::SemaphoreAdaptor
=head1 SYNOPSIS
$semaphore_adaptor = $db_adaptor->get_SemaphoreAdaptor;
$semaphore_adaptor = $semaphore_object->adaptor;
=head1 DESCRIPTION
Module to encapsulate all db access for persistent class Semaphore.
There should be just one per application and database connection.
=head1 LICENSE
Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Copyright [2016-2017] EMBL-European Bioinformatics Institute
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
=head1 CONTACT
Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
=cut
package Bio::EnsEMBL::Hive::DBSQL::SemaphoreAdaptor;
use strict;
use warnings;
use Bio::EnsEMBL::Hive::Semaphore;
use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
sub default_table_name {
return 'semaphore';
}
sub object_class {
return 'Bio::EnsEMBL::Hive::Semaphore';
}
sub increment_column_by_inc_and_id {
my ($self, $column_name, $inc, $semaphore_id) = @_;
my $sql = "UPDATE semaphore SET $column_name = $column_name + $inc WHERE semaphore_id = $semaphore_id";
$self->dbc->protected_prepare_execute( [ $sql ],
sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_hive_message( "performing $sql".$after, 'INFO' ); }
);
}
1;
......@@ -78,8 +78,9 @@ sub find_by_query {
$self->hive_dba ? ('adaptor' => $self->hive_dba->get_adaptor($object_type, @specific_adaptor_params)) : (),
);
}
} elsif($object_type eq 'AnalysisJob') {
my $dbID = $query_params->{'job_id'};
} elsif($object_type eq 'AnalysisJob' or $object_type eq 'Semaphore') {
my $id_name = { 'AnalysisJob' => 'job_id', 'Semaphore' => 'semaphore_id' }->{$object_type};
my $dbID = $query_params->{$id_name};
my $coll = $self->collection_of($object_type);
unless($object = $coll->find_one_by( 'dbID' => $dbID )) {
......
......@@ -232,9 +232,9 @@ sub specialize_worker {
die "Analysis is BLOCKED, can't specialize a worker. Please use -force 1 to override";
}
if(($job_status eq 'DONE') and my $semaphored_job = $job->semaphored_job) {
if(($job_status eq 'DONE') and my $controlled_semaphore = $job->controlled_semaphore) {
warn "Increasing the semaphore count of the dependent job";
$semaphored_job->adaptor->increase_semaphore_count_for_jobid( $semaphored_job->dbID );
$controlled_semaphore->increase_by( [ $job ] );
}
my %status2counter = ('FAILED' => 'failed_job_count', 'READY' => 'ready_job_count', 'DONE' => 'done_job_count', 'PASSED_ON' => 'done_job_count', 'SEMAPHORED' => 'semaphored_job_count');
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::Semaphore
=head1 DESCRIPTION
A Semaphore object is our main instrument of fine-grained job control.
It is controlled (blocked) by a group of "fan" jobs and remote semaphores and has
either a dependent local job or a dependent remote semaphore
that will be unblocked when both local_job_counter and remote_job_counter reach zeros.
=head1 LICENSE
Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Copyright [2016-2017] EMBL-European Bioinformatics Institute
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
=head1 CONTACT
Please subscribe to the Hive mailing list: http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users to discuss Hive-related questions or to be notified of our updates
=head1 APPENDIX
The rest of the documentation details each of the object methods.
Internal methods are usually preceded with a _
=cut
package Bio::EnsEMBL::Hive::Semaphore;
use strict;
use warnings;
use Bio::EnsEMBL::Hive::TheApiary;
use Bio::EnsEMBL::Hive::DBSQL::SemaphoreAdaptor;
use base ( 'Bio::EnsEMBL::Hive::Storable' );
=head1 AUTOLOADED
dependent_job_id / dependent_job
=cut
# ---------------------------------------------------------------------------
sub local_jobs_counter {
my $self = shift;
$self->{'_local_jobs_counter'} = shift if(@_);
return $self->{'_local_jobs_counter'};
}
sub remote_jobs_counter {
my $self = shift;
$self->{'_remote_jobs_counter'} = shift if(@_);
return $self->{'_remote_jobs_counter'};
}
sub dependent_semaphore_url {
my $self = shift;
$self->{'_dependent_semaphore_url'} = shift if(@_);
return $self->{'_dependent_semaphore_url'};
}
# ---------------------------------------------------------------------------
sub dependent_semaphore {
my $self = shift @_;
if(my $dependent_semaphore_url = $self->dependent_semaphore_url) {
return Bio::EnsEMBL::Hive::TheApiary->find_by_url( $dependent_semaphore_url );
} else {
return undef;
}
}
sub ultimate_dependent_job {
my $self = shift @_;
return $self->dependent_job || $self->dependent_semaphore->ultimate_dependent_job;
}
sub url_query_params {
my ($self) = @_;
return {
'semaphore_id' => $self->dbID,
};
}
# ---------------------------------------------------------------------------
sub check_if_ripe {
my $self = shift @_;
$self->adaptor->refresh( $self );
return ($self->local_jobs_counter + $self->remote_jobs_counter <= 0);
}
sub increase_by {
my $self = shift @_;
my $blocking_objects_or_local_delta = shift @_;
my $sign = shift @_ || 1;
if(my $semaphore_adaptor = $self->adaptor) {
my ($local_delta, $remote_delta) = ref($blocking_objects_or_local_delta)
? $self->count_local_and_remote_objects( $blocking_objects_or_local_delta )
: ($blocking_objects_or_local_delta,0);
my $semaphore_id = $self->dbID;
if($local_delta) {
$semaphore_adaptor->increment_column_by_inc_and_id( 'local_jobs_counter', $sign * $local_delta, $semaphore_id );
}
if($remote_delta) {
$semaphore_adaptor->increment_column_by_inc_and_id( 'remote_jobs_counter', $sign * $remote_delta, $semaphore_id );
}
} else {
die "Local semaphore objects are not yet supported"; # but they could be, eventually!
}
}
sub reblock_by {
my $self = shift @_;
my $blocking_objects_or_local_delta = shift @_;
my $was_ripe = $self->check_if_ripe;
$self->increase_by( $blocking_objects_or_local_delta );
if( $was_ripe ) {
if(my $dependent_job = $self->dependent_job) {
if(my $dependent_job_adaptor = $dependent_job->adaptor) {
$dependent_job_adaptor->semaphore_job_by_id( $dependent_job->dbID );
} else {
die "Dependent job is expected to have a working JobAdaptor";
}
} elsif(my $dependent_semaphore = $self->dependent_semaphore) {
$dependent_semaphore->reblock( [ $self ] ); # recursion
} else {
die "The semaphore is not blocking anything!";
}
}
}
sub release_if_ripe {
my $self = shift @_;
if( $self->check_if_ripe ) {
if(my $dependent_job = $self->dependent_job) {
if(my $dependent_job_adaptor = $dependent_job->adaptor) {
$dependent_job_adaptor->unsemaphore_job_by_id( $dependent_job->dbID );
} else {
die "Dependent job is expected to have a working JobAdaptor";
}
} elsif(my $dependent_semaphore = $self->dependent_semaphore) {
$dependent_semaphore->adaptor->increment_column_by_inc_and_id( 'remote_jobs_counter', -1, $dependent_semaphore->dbID );
$dependent_semaphore->release_if_ripe(); # recursion
} else {
die "The semaphore is not blocking anything!";
}
}
}
sub decrease_by {
my $self = shift @_;
my $blocking_objects_or_local_delta = shift @_;
$self->increase_by( $blocking_objects_or_local_delta, -1);
$self->release_if_ripe();
}
1;
......@@ -124,6 +124,8 @@ sub parse {
}
} elsif($query_params->{'job_id'}) {
$object_type = 'AnalysisJob';
} elsif($query_params->{'semaphore_id'}) {
$object_type = 'Semaphore';
} elsif($query_params->{'accu_name'}) { # we don't require $query_params->{'accu_address'} to support scalar accu
$object_type = 'Accumulator';
} elsif($query_params->{'table_name'}) { # NB: the order is important here, in case table_name is reset for non-NakedTables
......
......@@ -745,8 +745,8 @@ sub run_one_batch {
$jobs_done_here++;
$job->set_and_update_status('DONE');
if(my $semaphored_job = $job->semaphored_job) {
$semaphored_job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job->dbID ); # step-unblock the semaphore
if( my $controlled_semaphore = $job->controlled_semaphore ) {
$controlled_semaphore->decrease_by( [ $job ] );
}
if($job->lethal_for_worker) {
......
......@@ -41,9 +41,9 @@ ALTER TABLE dataflow_target ADD FOREIGN KEY (source_dataflow_rule_id)
ALTER TABLE accu ADD FOREIGN KEY (sending_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE accu ADD FOREIGN KEY (receiving_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE job ADD CONSTRAINT job_prev_job_id_fkey FOREIGN KEY (prev_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE job ADD CONSTRAINT job_semaphored_job_id_fkey FOREIGN KEY (semaphored_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE job_file ADD CONSTRAINT job_file_job_id_fkey FOREIGN KEY (job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE log_message ADD FOREIGN KEY (job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE semaphore ADD CONSTRAINT semaphore_dependent_job_id_fkey FOREIGN KEY (dependent_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE analysis_base ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
ALTER TABLE resource_description ADD FOREIGN KEY (resource_class_id) REFERENCES resource_class(resource_class_id);
......@@ -59,3 +59,5 @@ ALTER TABLE worker_resource_usage ADD FOREIGN KEY (worker_id)
ALTER TABLE log_message ADD FOREIGN KEY (beekeeper_id) REFERENCES beekeeper(beekeeper_id) ON DELETE CASCADE;
ALTER TABLE worker ADD FOREIGN KEY (beekeeper_id) REFERENCES beekeeper(beekeeper_id) ON DELETE CASCADE;
ALTER TABLE job ADD CONSTRAINT job_controlled_semaphore_id_fkey FOREIGN KEY (controlled_semaphore_id) REFERENCES semaphore(semaphore_id) ON DELETE CASCADE;
-- ---------------------------------------------------------------------------------------------------
SET @expected_version = 88;
-- make MySQL stop immediately after it encounters division by zero:
SET SESSION sql_mode='TRADITIONAL';
-- warn that we detected the schema version mismatch:
SELECT CONCAT( 'The patch only applies to schema version ',
@expected_version,
', but the current schema version is ',
meta_value,
', so skipping the rest.') AS ''
FROM hive_meta WHERE meta_key='hive_sql_schema_version' AND meta_value<>@expected_version;
-- cause division by zero only if current version differs from the expected one:
INSERT INTO hive_meta (meta_key, meta_value)
SELECT 'this_should_never_be_inserted', 1 FROM hive_meta WHERE NOT 1/(meta_key<>'hive_sql_schema_version' OR meta_value=@expected_version);
SELECT CONCAT( 'The patch seems to be compatible with schema version ',
@expected_version,
', applying the patch...') AS '';
-- Now undo the change so that we could patch potentially non-TRADITIONAL schema:
SET SESSION sql_mode='';
-- ----------------------------------<actual_patch> -------------------------------------------------
CREATE TABLE semaphore (
semaphore_id INTEGER NOT NULL PRIMARY KEY AUTO_INCREMENT,
local_jobs_counter INTEGER DEFAULT 0,
remote_jobs_counter INTEGER DEFAULT 0,
dependent_job_id INTEGER DEFAULT NULL, -- Both should never be NULLs at the same time,
dependent_semaphore_url VARCHAR(255) DEFAULT NULL, -- we expect either one or the other to be set.
UNIQUE KEY unique_dependent_job_id (dependent_job_id) -- make sure two semaphores do not block the same job
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
ALTER TABLE job ADD COLUMN controlled_semaphore_id INTEGER DEFAULT NULL;
INSERT INTO semaphore (semaphore_id, local_jobs_counter, dependent_job_id)
( SELECT fan.semaphored_job_id, count(*), fan.semaphored_job_id
FROM job fan
JOIN job funnel
ON (fan.semaphored_job_id=funnel.job_id)
GROUP BY fan.semaphored_job_id
);
UPDATE job SET controlled_semaphore_id = semaphored_job_id;
ALTER TABLE job DROP FOREIGN KEY job_semaphored_job_id_fkey;
ALTER TABLE job DROP COLUMN semaphore_count;
ALTER TABLE job DROP COLUMN semaphored_job_id;
ALTER TABLE semaphore ADD CONSTRAINT semaphore_dependent_job_id_fkey FOREIGN KEY (dependent_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE job ADD CONSTRAINT job_controlled_semaphore_id_fkey FOREIGN KEY (controlled_semaphore_id) REFERENCES semaphore(semaphore_id) ON DELETE CASCADE;
-- ----------------------------------</actual_patch> -------------------------------------------------
-- increase the schema version by one and register the patch:
UPDATE hive_meta SET meta_value=meta_value+1 WHERE meta_key='hive_sql_schema_version';
INSERT INTO hive_meta (meta_key, meta_value) SELECT CONCAT("patched_to_", meta_value), CURRENT_TIMESTAMP FROM hive_meta WHERE meta_key = "hive_sql_schema_version";
-- ---------------------------------------------------------------------------------------------------
\set expected_version 88
\set ON_ERROR_STOP on
-- warn that we detected the schema version mismatch:
SELECT ('The patch only applies to schema version '
|| CAST(:expected_version AS VARCHAR)
|| ', but the current schema version is '
|| meta_value
|| ', so skipping the rest.') as incompatible_msg
FROM hive_meta WHERE meta_key='hive_sql_schema_version' AND meta_value!=CAST(:expected_version AS VARCHAR);
-- cause division by zero only if current version differs from the expected one:
INSERT INTO hive_meta (meta_key, meta_value)
SELECT 'this_should_never_be_inserted', 1 FROM hive_meta WHERE 1 != 1/CAST( (meta_key!='hive_sql_schema_version' OR meta_value=CAST(:expected_version AS VARCHAR)) AS INTEGER );
SELECT ('The patch seems to be compatible with schema version '
|| CAST(:expected_version AS VARCHAR)
|| ', applying the patch...') AS compatible_msg;
-- ----------------------------------<actual_patch> -------------------------------------------------
CREATE TABLE semaphore (
semaphore_id SERIAL PRIMARY KEY,
local_jobs_counter INTEGER DEFAULT 0,
remote_jobs_counter INTEGER DEFAULT 0,
dependent_job_id INTEGER DEFAULT NULL, -- Both should never be NULLs at the same time,
dependent_semaphore_url VARCHAR(255) DEFAULT NULL, -- we expect either one or the other to be set.
UNIQUE (dependent_job_id) -- make sure two semaphores do not block the same job
);
ALTER TABLE job ADD COLUMN controlled_semaphore_id INTEGER DEFAULT NULL;
INSERT INTO semaphore (semaphore_id, local_jobs_counter, dependent_job_id)
( SELECT fan.semaphored_job_id, count(*), fan.semaphored_job_id
FROM job fan
JOIN job funnel
ON (fan.semaphored_job_id=funnel.job_id)