Skip to content
Snippets Groups Projects
Commit ce6571c3 authored by Leo Gordon's avatar Leo Gordon
Browse files

analysis_topup and job_topup modes supported

parent 7eeef124
No related branches found
No related tags found
No related merge requests found
......@@ -49,6 +49,7 @@ use Getopt::Long;
use Bio::EnsEMBL::Utils::Argument; # import 'rearrange()'
use Bio::EnsEMBL::Hive::Utils 'stringify'; # import 'stringify()'
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::Extensions;
# ---------------------------[the following methods will be overridden by specific pipelines]-------------------------
......@@ -284,10 +285,11 @@ sub process_options {
=cut
sub run {
my $self = shift @_;
my $topup_flag = $self->{_cmdline_options}{topup};
my $self = shift @_;
my $analysis_topup = $self->{_cmdline_options}{'analysis_topup'};
my $job_topup = $self->{_cmdline_options}{'job_topup'};
unless($topup_flag) {
unless($analysis_topup || $job_topup) {
foreach my $cmd (@{$self->pipeline_create_commands}) {
warn "Running the command:\n\t$cmd\n";
if(my $retval = system($cmd)) {
......@@ -300,35 +302,37 @@ sub run {
my $hive_dba = new Bio::EnsEMBL::Hive::DBSQL::DBAdaptor(%{$self->o('pipeline_db')});
my $meta_container = $hive_dba->get_MetaContainer;
warn "Loading pipeline-wide parameters ...\n";
my $pipeline_wide_parameters = $self->pipeline_wide_parameters;
while( my($meta_key, $meta_value) = each %$pipeline_wide_parameters ) {
if($topup_flag) {
$meta_container->delete_key($meta_key);
unless($job_topup) {
my $meta_container = $hive_dba->get_MetaContainer;
warn "Loading pipeline-wide parameters ...\n";
my $pipeline_wide_parameters = $self->pipeline_wide_parameters;
while( my($meta_key, $meta_value) = each %$pipeline_wide_parameters ) {
if($analysis_topup) {
$meta_container->delete_key($meta_key);
}
$meta_container->store_key_value($meta_key, stringify($meta_value));
}
$meta_container->store_key_value($meta_key, stringify($meta_value));
}
warn "Done.\n\n";
# pre-load the resource_description table
my $resource_description_adaptor = $hive_dba->get_ResourceDescriptionAdaptor;
warn "Loading the ResourceDescriptions ...\n";
my $resource_classes = $self->resource_classes;
while( my($rc_id, $mt2param) = each %$resource_classes ) {
my $description = delete $mt2param->{-desc};
while( my($meadow_type, $xparams) = each %$mt2param ) {
$resource_description_adaptor->create_new(
-RC_ID => $rc_id,
-MEADOW_TYPE => $meadow_type,
-PARAMETERS => $xparams,
-DESCRIPTION => $description,
);
warn "Done.\n\n";
# pre-load the resource_description table
my $resource_description_adaptor = $hive_dba->get_ResourceDescriptionAdaptor;
warn "Loading the ResourceDescriptions ...\n";
my $resource_classes = $self->resource_classes;
while( my($rc_id, $mt2param) = each %$resource_classes ) {
my $description = delete $mt2param->{-desc};
while( my($meadow_type, $xparams) = each %$mt2param ) {
$resource_description_adaptor->create_new(
-RC_ID => $rc_id,
-MEADOW_TYPE => $meadow_type,
-PARAMETERS => $xparams,
-DESCRIPTION => $description,
);
}
}
warn "Done.\n\n";
}
warn "Done.\n\n";
my $analysis_adaptor = $hive_dba->get_AnalysisAdaptor;
......@@ -336,32 +340,41 @@ sub run {
my ($logic_name, $module, $parameters_hash, $input_ids, $program_file, $blocked, $batch_size, $hive_capacity, $failed_job_tolerance, $rc_id) =
rearrange([qw(logic_name module parameters input_ids program_file blocked batch_size hive_capacity failed_job_tolerance rc_id)], %$aha);
if($topup_flag and $analysis_adaptor->fetch_by_logic_name($logic_name)) {
if($analysis_topup and $analysis_adaptor->fetch_by_logic_name($logic_name)) {
warn "Skipping already existing analysis '$logic_name'\n";
next;
}
warn "Creating '$logic_name'...\n";
my $analysis;
my $analysis = Bio::EnsEMBL::Analysis->new (
-db => '',
-db_file => '',
-db_version => '1',
-logic_name => $logic_name,
-module => $module,
-parameters => stringify($parameters_hash), # have to stringify it here, because Analysis code is external wrt Hive code
-program_file => $program_file,
);
if($job_topup) {
$analysis_adaptor->store($analysis);
$analysis = $analysis_adaptor->fetch_by_logic_name($logic_name) || die "Could not fetch analysis '$logic_name'";
} else {
warn "Creating '$logic_name'...\n";
$analysis = Bio::EnsEMBL::Analysis->new (
-db => '',
-db_file => '',
-db_version => '1',
-logic_name => $logic_name,
-module => $module,
-parameters => stringify($parameters_hash), # have to stringify it here, because Analysis code is external wrt Hive code
-program_file => $program_file,
);
my $stats = $analysis->stats();
$stats->batch_size( $batch_size ) if(defined($batch_size));
$stats->hive_capacity( $hive_capacity ) if(defined($hive_capacity));
$stats->failed_job_tolerance( $failed_job_tolerance ) if(defined($failed_job_tolerance));
$stats->rc_id( $rc_id ) if(defined($rc_id));
$stats->status($blocked ? 'BLOCKED' : 'READY'); # (some analyses will be waiting for human intervention in blocked state)
$stats->update();
$analysis_adaptor->store($analysis);
my $stats = $analysis->stats();
$stats->batch_size( $batch_size ) if(defined($batch_size));
$stats->hive_capacity( $hive_capacity ) if(defined($hive_capacity));
$stats->failed_job_tolerance( $failed_job_tolerance ) if(defined($failed_job_tolerance));
$stats->rc_id( $rc_id ) if(defined($rc_id));
$stats->status($blocked ? 'BLOCKED' : 'READY'); # (some analyses will be waiting for human intervention in blocked state)
$stats->update();
}
# now create the corresponding jobs (if there are any):
foreach my $input_id_hash (@$input_ids) {
......@@ -374,47 +387,50 @@ sub run {
}
}
# Now, run separately through the already created analyses and link them together:
#
my $ctrl_rule_adaptor = $hive_dba->get_AnalysisCtrlRuleAdaptor;
my $dataflow_rule_adaptor = $hive_dba->get_DataflowRuleAdaptor;
unless($job_topup) {
foreach my $aha (@{$self->pipeline_analyses}) {
my ($logic_name, $wait_for, $flow_into) =
rearrange([qw(logic_name wait_for flow_into)], %$aha);
# Now, run separately through the already created analyses and link them together:
#
my $ctrl_rule_adaptor = $hive_dba->get_AnalysisCtrlRuleAdaptor;
my $dataflow_rule_adaptor = $hive_dba->get_DataflowRuleAdaptor;
my $analysis = $analysis_adaptor->fetch_by_logic_name($logic_name);
foreach my $aha (@{$self->pipeline_analyses}) {
my ($logic_name, $wait_for, $flow_into) =
rearrange([qw(logic_name wait_for flow_into)], %$aha);
$wait_for ||= [];
$wait_for = [ $wait_for ] unless(ref($wait_for) eq 'ARRAY'); # force scalar into an arrayref
my $analysis = $analysis_adaptor->fetch_by_logic_name($logic_name);
# create control rules:
foreach my $condition_logic_name (@$wait_for) {
if(my $condition_analysis = $analysis_adaptor->fetch_by_logic_name($condition_logic_name)) {
$ctrl_rule_adaptor->create_rule( $condition_analysis, $analysis);
warn "Created Control rule: $condition_logic_name -| $logic_name\n";
} else {
die "Could not fetch analysis '$condition_logic_name' to create a control rule";
$wait_for ||= [];
$wait_for = [ $wait_for ] unless(ref($wait_for) eq 'ARRAY'); # force scalar into an arrayref
# create control rules:
foreach my $condition_logic_name (@$wait_for) {
if(my $condition_analysis = $analysis_adaptor->fetch_by_logic_name($condition_logic_name)) {
$ctrl_rule_adaptor->create_rule( $condition_analysis, $analysis);
warn "Created Control rule: $condition_logic_name -| $logic_name\n";
} else {
die "Could not fetch analysis '$condition_logic_name' to create a control rule";
}
}
}
$flow_into ||= {};
$flow_into = { 1 => $flow_into } unless(ref($flow_into) eq 'HASH'); # force non-hash into a hash
$flow_into ||= {};
$flow_into = { 1 => $flow_into } unless(ref($flow_into) eq 'HASH'); # force non-hash into a hash
foreach my $branch_code (sort {$a <=> $b} keys %$flow_into) {
my $heirs = $flow_into->{$branch_code};
foreach my $branch_code (sort {$a <=> $b} keys %$flow_into) {
my $heirs = $flow_into->{$branch_code};
$heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first
$heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first
$heirs = { map { ($_ => undef) } @$heirs } if(ref($heirs) eq 'ARRAY'); # now force it into a hash if it wasn't
$heirs = { map { ($_ => undef) } @$heirs } if(ref($heirs) eq 'ARRAY'); # now force it into a hash if it wasn't
while(my ($heir_logic_name, $input_id_template) = each %$heirs) {
if(my $heir_analysis = $analysis_adaptor->fetch_by_logic_name($heir_logic_name)) {
$dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis, $branch_code, $input_id_template);
warn "Created DataFlow rule: [$branch_code] $logic_name -> $heir_logic_name"
.($input_id_template ? " WITH TEMPLATE: $input_id_template" : '')."\n";
} else {
die "Could not fetch analysis '$heir_logic_name' to create a dataflow rule";
while(my ($heir_logic_name, $input_id_template) = each %$heirs) {
if(my $heir_analysis = $analysis_adaptor->fetch_by_logic_name($heir_logic_name)) {
$dataflow_rule_adaptor->create_rule( $analysis, $heir_analysis, $branch_code, $input_id_template);
warn "Created DataFlow rule: [$branch_code] $logic_name -> $heir_logic_name"
.($input_id_template ? " WITH TEMPLATE: $input_id_template" : '')."\n";
} else {
die "Could not fetch analysis '$heir_logic_name' to create a dataflow rule";
}
}
}
}
......@@ -455,7 +471,8 @@ sub _load_cmdline_options {
GetOptions( \%cmdline_options,
'help!',
'topup!',
'analysis_topup!',
'job_topup!',
map { "$_=s".((ref($self->o($_)) eq 'HASH') ? '%' : '') } keys %{$self->o}
);
return \%cmdline_options;
......
......@@ -81,7 +81,7 @@ __DATA__
=head1 SYNOPSIS
init_pipeline.pl <config_module_or_filename> [-help | [ [-topup] <options_for_this_particular_pipeline>]
init_pipeline.pl <config_module_or_filename> [-help | [ [-analysis_topup | -job_topup] <options_for_this_particular_pipeline>]
=head1 DESCRIPTION
......@@ -105,10 +105,12 @@ __DATA__
=head1 OPTIONS
-help : get automatically generated list of options that can be set/changed when initializing a particular pipeline
-help : get automatically generated list of options that can be set/changed when initializing a particular pipeline
-topup : a special initialization mode when (1) pipeline_create_commands are switched off and (2) only newly defined analyses are added to the database
This mode is only useful in the process of putting together a new pipeline.
-analysis_topup : a special initialization mode when (1) pipeline_create_commands are switched off and (2) only newly defined analyses are added to the database
This mode is only useful in the process of putting together a new pipeline.
-job_topup : another special initialization mode when only jobs are created - no other structural changes to the pipeline are acted upon.
=head1 CONTACT
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment