Commit 4ef1e705 authored by Leo Gordon's avatar Leo Gordon
Browse files

schema_change: re-attaching accumulators to semaphores

parent f6961178
......@@ -98,12 +98,11 @@ sub display_name {
sub dataflow {
my ( $self, $output_ids, $emitting_job ) = @_;
if(my $receiving_job = $emitting_job->controlled_semaphore->ultimate_dependent_job) { # we need to reach all the way to the actual job
if(my $receiving_semaphore = $emitting_job->controlled_semaphore) {
my $receiving_job_id = $receiving_job->dbID;
my $accu_adaptor = $receiving_job->adaptor->db->get_AccumulatorAdaptor;
my $local_accu = $receiving_job->adaptor == $emitting_job->adaptor;
my $sending_job_id = $local_accu ? $emitting_job->dbID : undef;
my $sending_job_id = $emitting_job->dbID;
my $receiving_semaphore_id = $receiving_semaphore->dbID;
my $accu_adaptor = $receiving_semaphore->adaptor->db->get_AccumulatorAdaptor;
my $accu_name = $self->accu_name;
my $accu_address = $self->accu_address;
......@@ -117,18 +116,18 @@ sub dataflow {
$key_signature=~s/(\w+)/$emitting_job->_param_possibly_overridden($1,$output_id)/eg;
push @rows, {
'sending_job_id' => $sending_job_id,
'receiving_job_id' => $receiving_job_id,
'struct_name' => $accu_name,
'key_signature' => $key_signature,
'value' => stringify( $emitting_job->_param_possibly_overridden($accu_input_variable, $output_id) ),
'sending_job_id' => $sending_job_id,
'receiving_semaphore_id' => $receiving_semaphore_id,
'struct_name' => $accu_name,
'key_signature' => $key_signature,
'value' => stringify( $emitting_job->_param_possibly_overridden($accu_input_variable, $output_id) ),
};
}
$accu_adaptor->store( \@rows );
} else {
die "No semaphored job, cannot perform accumulated dataflow";
die "No controlled semaphore, cannot perform accumulated dataflow";
}
}
......
......@@ -57,7 +57,7 @@ sub fetch_structures_for_job_ids {
if( $job_ids_csv ) {
my $sql = "SELECT receiving_job_id, struct_name, key_signature, value FROM accu WHERE receiving_job_id in ($job_ids_csv)";
my $sql = "SELECT s.dependent_job_id, a.struct_name, a.key_signature, a.value FROM accu a JOIN semaphore s ON (s.semaphore_id=a.receiving_semaphore_id) WHERE s.dependent_job_id in ($job_ids_csv)";
my $sth = $self->prepare( $sql );
$sth->execute();
......
......@@ -169,6 +169,13 @@ sub reblock_by {
}
sub fetch_my_raw_accu_data {
my $self = shift @_;
return $self->adaptor->db->get_AccumulatorAdaptor->fetch_all_by_receiving_semaphore_id( $self->dbID );
}
sub release_if_ripe {
my $self = shift @_;
......@@ -184,7 +191,19 @@ sub release_if_ripe {
} elsif(my $dependent_semaphore = $self->dependent_semaphore) {
$dependent_semaphore->adaptor->increment_column_by_inc_and_id( 'remote_jobs_counter', -1, $dependent_semaphore->dbID );
my $dependent_semaphore_adaptor = $dependent_semaphore->adaptor;
my $ocean_separated = $dependent_semaphore_adaptor->db ne $self->adaptor->db;
# pass the accumulated data here:
if(my $raw_accu_data = $self->fetch_my_raw_accu_data) {
foreach my $vector ( @$raw_accu_data ) {
$vector->{'receiving_semaphore_id'} = $dependent_semaphore->dbID; # set the new consumer
$vector->{'sending_job_id'} = undef if($ocean_separated); # dissociate from the sending job as it was local
}
$dependent_semaphore_adaptor->db->get_AccumulatorAdaptor->store( $raw_accu_data );
}
$dependent_semaphore_adaptor->increment_column_by_inc_and_id( 'remote_jobs_counter', -1, $dependent_semaphore->dbID );
$dependent_semaphore->release_if_ripe(); # recursion
......
......@@ -208,7 +208,7 @@ sub add_job_node {
my $job_node_name = 'job_'.$job_id.'__'.$job_pipeline_name;
unless($job_node_hash{$job_node_name}++) {
my $job_shape = 'record';
my $job_shape = 'box3d';
my $job_status = $job->status;
my $job_status_colour = {'DONE' => 'DeepSkyBlue', 'READY' => 'green', 'SEMAPHORED' => 'grey', 'FAILED' => 'red'}->{$job_status} // 'yellow';
my $analysis_status_colour = {
......@@ -238,13 +238,6 @@ sub add_job_node {
$job_label .= "<tr><td>$param_key:</td><td> $param_value</td></tr>";
}
my $accu_adaptor = $job->adaptor->db->get_AccumulatorAdaptor;
my $accu = $accu_adaptor->fetch_structures_for_job_ids( $job_id )->{ $job_id };
foreach my $accu_name (sort keys %$accu) {
$job_label .= qq{<tr><td><u><i>accumulated:</i></u></td><td><i>$accu_name</i></td></tr>};
}
$job_label .= "</table>>";
......@@ -302,68 +295,118 @@ sub add_job_node {
}
sub add_semaphore_node {
my $semaphore = shift @_;
sub draw_semaphore_and_accu {
my ($semaphore, $dependent_node_name) = @_;
my $semaphore_url = $semaphore->relative_url( 0 ); # request for an absolute URL
my $semaphore_id = $semaphore->dbID;
my $semaphore_pipeline_name = $semaphore->hive_pipeline->hive_pipeline_name;
my $semaphore_node_name = 'semaphore_'.$semaphore_id.'__'.$semaphore_pipeline_name;
unless($semaphore_url_hash{$semaphore_url}++) {
my $semaphore_blockers = $semaphore->local_jobs_counter + $semaphore->remote_jobs_counter;
my $semaphore_is_blocked = $semaphore_blockers > 0;
my $semaphore_blockers = $semaphore->local_jobs_counter + $semaphore->remote_jobs_counter;
my $semaphore_is_blocked = $semaphore_blockers > 0;
my ($semaphore_colour, $semaphore_shape, $dependent_blocking_arrow_colour, $dependent_blocking_arrow_shape ) = $semaphore_is_blocked
? ('red', 'triangle', 'red', 'tee')
: ('darkgreen', 'invtriangle', 'darkgreen', 'none');
my ($semaphore_colour, $semaphore_shape, $dependent_blocking_arrow_colour, $dependent_blocking_arrow_shape ) = $semaphore_is_blocked
? ('red', 'triangle', 'red', 'tee')
: ('darkgreen', 'invtriangle', 'darkgreen', 'none');
my @semaphore_label_parts = ();
if($semaphore_is_blocked) {
if(my $local=$semaphore->local_jobs_counter) { push @semaphore_label_parts, "local: $local" }
if(my $remote=$semaphore->remote_jobs_counter) { push @semaphore_label_parts, "remote: $remote" }
} else {
push @semaphore_label_parts, "open";
}
my $semaphore_label = join("\n", @semaphore_label_parts);
my @semaphore_label_parts = ();
if($semaphore_is_blocked) {
if(my $local=$semaphore->local_jobs_counter) { push @semaphore_label_parts, "local: $local" }
if(my $remote=$semaphore->remote_jobs_counter) { push @semaphore_label_parts, "remote: $remote" }
} else {
push @semaphore_label_parts, "open";
$self->{'graph'}->add_node( $semaphore_node_name,
shape => $semaphore_shape,
style => 'filled',
fillcolor => $semaphore_colour,
label => $semaphore_label,
);
my $raw_accu_data = $semaphore->fetch_my_raw_accu_data;
my $accu_node_name;
if(@$raw_accu_data) {
$accu_node_name = 'accu_'.$semaphore_id.'__'.$semaphore_pipeline_name;
my $accu_label = qq{<<table border="0" cellborder="0" cellspacing="0" cellpadding="1">};
my %struct_name_2_key_signature = ();
foreach my $accu_vector (@$raw_accu_data) {
push @{ $struct_name_2_key_signature{ $accu_vector->{'struct_name'} } }, $accu_vector->{'key_signature'};
}
foreach my $struct_name (sort keys %struct_name_2_key_signature) {
$accu_label .= qq{<tr><td><b>$struct_name</b></td><td></td></tr>};
foreach my $key_signature ( @{ $struct_name_2_key_signature{$struct_name} } ) {
$accu_label .= qq{<tr><td></td><td>$key_signature</td></tr>};
}
}
my $semaphore_label = join("\n", @semaphore_label_parts);
$accu_label .= "</table>>";
$self->{'graph'}->add_node( $semaphore_node_name,
shape => $semaphore_shape,
$self->{'graph'}->add_node( $accu_node_name,
shape => 'note',
style => 'filled',
fillcolor => $semaphore_colour,
label => $semaphore_label,
label => $accu_label,
);
$self->{'graph'}->add_edge( $semaphore_node_name => $accu_node_name,
color => $dependent_blocking_arrow_colour,
style => 'dashed',
arrowhead => $dependent_blocking_arrow_shape,
tailport => 's',
headport => 'n',
);
$self->{'graph'}->add_edge( $accu_node_name => $dependent_node_name,
color => $dependent_blocking_arrow_colour,
style => 'dashed',
arrowhead => $dependent_blocking_arrow_shape,
tailport => 's',
headport => 'n',
);
} else {
$self->{'graph'}->add_edge( $semaphore_node_name => $dependent_node_name,
color => $dependent_blocking_arrow_colour,
style => 'dashed',
arrowhead => $dependent_blocking_arrow_shape,
tailport => 's',
headport => 'n',
);
}
return $accu_node_name;
}
sub add_semaphore_node {
my $semaphore = shift @_;
my $semaphore_url = $semaphore->relative_url( 0 ); # request for an absolute URL
my $semaphore_id = $semaphore->dbID;
my $semaphore_pipeline_name = $semaphore->hive_pipeline->hive_pipeline_name;
my $semaphore_node_name = 'semaphore_'.$semaphore_id.'__'.$semaphore_pipeline_name;
unless($semaphore_url_hash{$semaphore_url}++) {
my ($accu_node_name, $target_cluster_name);
if(my $dependent_job = $semaphore->dependent_job) {
my $dependent_job_node_name = add_job_node( $dependent_job );
$self->{'graph'}->add_edge( $semaphore_node_name => $dependent_job_node_name,
color => $dependent_blocking_arrow_colour,
style => 'dashed',
arrowhead => $dependent_blocking_arrow_shape,
tailport => 's',
);
$accu_node_name = draw_semaphore_and_accu($semaphore, $dependent_job_node_name);
my $analysis_name = $dependent_job->analysis->relative_display_name($main_pipeline);
$analysis_name=~s{/}{___};
$target_cluster_name = $dependent_job->analysis->relative_display_name($main_pipeline);
$target_cluster_name =~s{/}{___};
# adding the semaphore node to the cluster of the dependent job's analysis:
push @{$self->{'graph'}->cluster_2_nodes->{ $analysis_name }}, $semaphore_node_name;
} elsif(my $dependent_semaphore = $semaphore->dependent_semaphore) {
my $dependent_semaphore_node_name = add_semaphore_node( $dependent_semaphore );
$self->{'graph'}->add_edge( $semaphore_node_name => $dependent_semaphore_node_name,
color => $dependent_blocking_arrow_colour,
style => 'dashed',
arrowhead => $dependent_blocking_arrow_shape,
tailport => 's',
headport => 'n',
);
$accu_node_name = draw_semaphore_and_accu($semaphore, $dependent_semaphore_node_name);
# adding the semaphore node to its pipeline's cluster:
push @{$self->{'graph'}->cluster_2_nodes->{ $semaphore->hive_pipeline->hive_pipeline_name }}, $semaphore_node_name;
$target_cluster_name = $semaphore->hive_pipeline->hive_pipeline_name;
# can we trace the local blocking jobs up to their roots?
my $local_blocker_jobs = $dependent_semaphore->adaptor->db->get_AnalysisJobAdaptor->fetch_all_by_controlled_semaphore_id( $dependent_semaphore->dbID );
......@@ -374,6 +417,13 @@ sub add_semaphore_node {
} else {
die "This semaphore is not blocking anything at all";
}
# adding the semaphore node to the cluster of the dependent job's analysis:
push @{$self->{'graph'}->cluster_2_nodes->{ $target_cluster_name }}, $semaphore_node_name;
if($accu_node_name) {
# adding the accu node to the cluster of the dependent job's analysis:
push @{$self->{'graph'}->cluster_2_nodes->{ $target_cluster_name }}, $accu_node_name;
}
}
return $semaphore_node_name;
......
......@@ -39,7 +39,6 @@ ALTER TABLE dataflow_rule ADD FOREIGN KEY (funnel_dataflow_rule_id)
ALTER TABLE dataflow_target ADD FOREIGN KEY (source_dataflow_rule_id) REFERENCES dataflow_rule(dataflow_rule_id);
ALTER TABLE accu ADD FOREIGN KEY (sending_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE accu ADD FOREIGN KEY (receiving_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE job ADD CONSTRAINT job_prev_job_id_fkey FOREIGN KEY (prev_job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE job_file ADD CONSTRAINT job_file_job_id_fkey FOREIGN KEY (job_id) REFERENCES job(job_id) ON DELETE CASCADE;
ALTER TABLE log_message ADD FOREIGN KEY (job_id) REFERENCES job(job_id) ON DELETE CASCADE;
......@@ -61,3 +60,4 @@ ALTER TABLE log_message ADD FOREIGN KEY (beekeeper_id)
ALTER TABLE worker ADD FOREIGN KEY (beekeeper_id) REFERENCES beekeeper(beekeeper_id) ON DELETE CASCADE;
ALTER TABLE job ADD CONSTRAINT job_controlled_semaphore_id_fkey FOREIGN KEY (controlled_semaphore_id) REFERENCES semaphore(semaphore_id) ON DELETE CASCADE;
ALTER TABLE accu ADD CONSTRAINT accu_receiving_semaphore_id_fkey FOREIGN KEY (receiving_semaphore_id) REFERENCES semaphore(semaphore_id) ON DELETE CASCADE;
-- ---------------------------------------------------------------------------------------------------
SET @expected_version = 89;
-- make MySQL stop immediately after it encounters division by zero:
SET SESSION sql_mode='TRADITIONAL';
-- warn that we detected the schema version mismatch:
SELECT CONCAT( 'The patch only applies to schema version ',
@expected_version,
', but the current schema version is ',
meta_value,
', so skipping the rest.') AS ''
FROM hive_meta WHERE meta_key='hive_sql_schema_version' AND meta_value<>@expected_version;
-- cause division by zero only if current version differs from the expected one:
INSERT INTO hive_meta (meta_key, meta_value)
SELECT 'this_should_never_be_inserted', 1 FROM hive_meta WHERE NOT 1/(meta_key<>'hive_sql_schema_version' OR meta_value=@expected_version);
SELECT CONCAT( 'The patch seems to be compatible with schema version ',
@expected_version,
', applying the patch...') AS '';
-- Now undo the change so that we could patch potentially non-TRADITIONAL schema:
SET SESSION sql_mode='';
-- ----------------------------------<actual_patch> -------------------------------------------------
--
-- ---- reattaching accumulated data from job entries to their controlling semaphore entries: -------
ALTER TABLE accu ADD COLUMN receiving_semaphore_id INTEGER NOT NULL AFTER receiving_job_id;
UPDATE accu a JOIN semaphore s ON (a.receiving_job_id=s.dependent_job_id) SET a.receiving_semaphore_id = s.semaphore_id;
ALTER TABLE accu DROP FOREIGN KEY accu_ibfk_2;
ALTER TABLE accu DROP COLUMN receiving_job_id;
ALTER TABLE accu ADD CONSTRAINT accu_receiving_semaphore_id_fkey FOREIGN KEY (receiving_semaphore_id) REFERENCES semaphore(semaphore_id) ON DELETE CASCADE;
-- ----------------------------------</actual_patch> -------------------------------------------------
-- increase the schema version by one and register the patch:
UPDATE hive_meta SET meta_value=meta_value+1 WHERE meta_key='hive_sql_schema_version';
INSERT INTO hive_meta (meta_key, meta_value) SELECT CONCAT("patched_to_", meta_value), CURRENT_TIMESTAMP FROM hive_meta WHERE meta_key = "hive_sql_schema_version";
-- ---------------------------------------------------------------------------------------------------
\set expected_version 89
\set ON_ERROR_STOP on
-- warn that we detected the schema version mismatch:
SELECT ('The patch only applies to schema version '
|| CAST(:expected_version AS VARCHAR)
|| ', but the current schema version is '
|| meta_value
|| ', so skipping the rest.') as incompatible_msg
FROM hive_meta WHERE meta_key='hive_sql_schema_version' AND meta_value!=CAST(:expected_version AS VARCHAR);
-- cause division by zero only if current version differs from the expected one:
INSERT INTO hive_meta (meta_key, meta_value)
SELECT 'this_should_never_be_inserted', 1 FROM hive_meta WHERE 1 != 1/CAST( (meta_key!='hive_sql_schema_version' OR meta_value=CAST(:expected_version AS VARCHAR)) AS INTEGER );
SELECT ('The patch seems to be compatible with schema version '
|| CAST(:expected_version AS VARCHAR)
|| ', applying the patch...') AS compatible_msg;
-- ----------------------------------<actual_patch> -------------------------------------------------
--
-- ---- reattaching accumulated data from job entries to their controlling semaphore entries: -------
ALTER TABLE accu ADD COLUMN receiving_semaphore_id INTEGER; -- can't add a "NOT NULL" column to a PostgreSQL table that lacks data
UPDATE accu SET receiving_semaphore_id=semaphore.semaphore_id FROM semaphore WHERE accu.receiving_job_id=semaphore.dependent_job_id;
ALTER TABLE accu ALTER COLUMN receiving_semaphore_id SET NOT NULL; -- so have to delay setting "NOT NULL" until we have valid data in the column
ALTER TABLE accu DROP CONSTRAINT accu_receiving_job_id_fkey;
ALTER TABLE accu DROP COLUMN receiving_job_id;
ALTER TABLE accu ADD CONSTRAINT accu_receiving_semaphore_id_fkey FOREIGN KEY (receiving_semaphore_id) REFERENCES semaphore(semaphore_id) ON DELETE CASCADE;
-- ----------------------------------</actual_patch> -------------------------------------------------
-- increase the schema version by one and register the patch:
UPDATE hive_meta SET meta_value= (CAST(meta_value AS INTEGER) + 1) WHERE meta_key='hive_sql_schema_version';
INSERT INTO hive_meta (meta_key, meta_value) SELECT 'patched_to_' || meta_value, CURRENT_TIMESTAMP FROM hive_meta WHERE meta_key = 'hive_sql_schema_version';
......@@ -421,22 +421,22 @@ CREATE TABLE job_file (
@desc Accumulator for funneled dataflow.
@column sending_job_id semaphoring job in the "box"
@column receiving_job_id semaphored job outside the "box"
@column struct_name name of the structured parameter
@column key_signature locates the part of the structured parameter
@column value value of the part
@column sending_job_id semaphoring job in the "box"
@column receiving_semaphore_id semaphore just outside the "box"
@column struct_name name of the structured parameter
@column key_signature locates the part of the structured parameter
@column value value of the part
*/
CREATE TABLE accu (
sending_job_id INTEGER,
receiving_job_id INTEGER NOT NULL,
receiving_semaphore_id INTEGER NOT NULL,
struct_name VARCHAR(255) NOT NULL,
key_signature VARCHAR(255) NOT NULL,
value MEDIUMTEXT,
KEY accu_sending_idx (sending_job_id),
KEY accu_receiving_idx (receiving_job_id)
KEY accu_receiving_idx (receiving_semaphore_id)
) COLLATE=latin1_swedish_ci ENGINE=InnoDB;
......
......@@ -436,22 +436,22 @@ CREATE INDEX ON job_file (role_id);
@desc Accumulator for funneled dataflow.
@column sending_job_id semaphoring job in the "box"
@column receiving_job_id semaphored job outside the "box"
@column struct_name name of the structured parameter
@column key_signature locates the part of the structured parameter
@column value value of the part
@column sending_job_id semaphoring job in the "box"
@column receiving_semaphore_id semaphore just outside the "box"
@column struct_name name of the structured parameter
@column key_signature locates the part of the structured parameter
@column value value of the part
*/
CREATE TABLE accu (
sending_job_id INTEGER,
receiving_job_id INTEGER NOT NULL,
receiving_semaphore_id INTEGER NOT NULL,
struct_name VARCHAR(255) NOT NULL,
key_signature VARCHAR(255) NOT NULL,
value TEXT
);
CREATE INDEX ON accu (sending_job_id);
CREATE INDEX ON accu (receiving_job_id);
CREATE INDEX ON accu (receiving_semaphore_id);
/**
......
......@@ -417,22 +417,22 @@ CREATE INDEX job_file_role_id_idx ON job_file (role_id);
@desc Accumulator for funneled dataflow.
@column sending_job_id semaphoring job in the "box"
@column receiving_job_id semaphored job outside the "box"
@column struct_name name of the structured parameter
@column key_signature locates the part of the structured parameter
@column value value of the part
@column sending_job_id semaphoring job in the "box"
@column receiving_semaphore_id semaphore just outside the "box"
@column struct_name name of the structured parameter
@column key_signature locates the part of the structured parameter
@column value value of the part
*/
CREATE TABLE accu (
sending_job_id INTEGER,
receiving_job_id INTEGER NOT NULL,
receiving_semaphore_id INTEGER NOT NULL,
struct_name VARCHAR(255) NOT NULL,
key_signature VARCHAR(255) NOT NULL,
value TEXT
);
CREATE INDEX accu_sending_idx ON accu (sending_job_id);
CREATE INDEX accu_receiving_idx ON accu (receiving_job_id);
CREATE INDEX accu_receiving_idx ON accu (receiving_semaphore_id);
/**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment