my $self = shift @_;
warn "Adding hive_meta table entries ...\n";
my $new_meta_entries = $self->hive_meta_table();
while( my ($meta_key, $meta_value) = each %$new_meta_entries ) {
'meta_key' => $meta_key,
'meta_value' => $meta_value,
);
}
warn "Done.\n\n";
warn "Adding pipeline-wide parameters ...\n";
my $new_pwp_entries = $self->pipeline_wide_parameters();
while( my ($param_name, $param_value) = each %$new_pwp_entries ) {
'param_name' => $param_name,
'param_value' => stringify($param_value),
);
}
warn "Done.\n\n";
warn "Adding Resources ...\n";
my $resource_classes_hash = $self->resource_classes;
unless( exists $resource_classes_hash->{'default'} ) {
warn "\tNB:'default' resource class is not in the database (did you forget to inherit from SUPER::resource_classes ?) - creating it for you\n";
$resource_classes_hash->{'default'} = {};
}
my @resource_classes_order = sort { ($b eq 'default') or -($a eq 'default') or ($a cmp $b) } keys %$resource_classes_hash; # put 'default' to the front
foreach my $rc_name (@resource_classes_order) {
if($rc_name=~/^\d+$/) {
die "-rc_id syntax is no longer supported, please use the new resource notation (-rc_name)";
}
'name' => $rc_name,
);
while( my($meadow_type, $resource_param_list) = each %{ $resource_classes_hash->{$rc_name} } ) {
$resource_param_list = [ $resource_param_list ] unless(ref($resource_param_list)); # expecting either a scalar or a 2-element array
'resource_class' => $resource_class,
'meadow_type' => $meadow_type,
'submission_cmd_args' => $resource_param_list->[0],
'worker_cmd_args' => $resource_param_list->[1],
);
}
}
warn "Done.\n\n";
my %seen_logic_name = ();
warn "Adding Analyses ...\n";
foreach my $aha (@{$self->pipeline_analyses}) {
my ($logic_name, $module, $parameters_hash, $input_ids, $blocked, $batch_size, $hive_capacity, $failed_job_tolerance,
$max_retry_count, $can_be_empty, $rc_id, $rc_name, $priority, $meadow_type, $analysis_capacity)
= @{$aha}{qw(-logic_name -module -parameters -input_ids -blocked -batch_size -hive_capacity -failed_job_tolerance
-max_retry_count -can_be_empty -rc_id -rc_name -priority -meadow_type -analysis_capacity)}; # slicing a hash reference
unless($logic_name) {
die "logic_name' must be defined in every analysis";
}
if($seen_logic_name{$logic_name}++) {
die "an entry with logic_name '$logic_name' appears at least twice in the same configuration file, probably a typo";
}
if($rc_id) {
die "(-rc_id => $rc_id) syntax is deprecated, please use (-rc_name => 'your_resource_class_name')";
}
my $stats;
if( $analysis ) {
warn "Skipping creation of already existing analysis '$logic_name'.\n";
next;
} else {
$rc_name ||= 'default';
or die "Could not find local resource with name '$rc_name', please check that resource_classes() method of your PipeConfig either contains or inherits it from the parent class";
if ($meadow_type and not exists $valley->available_meadow_hash()->{$meadow_type}) {
die "The meadow '$meadow_type' is currently not registered (analysis '$logic_name')\n";
}
$parameters_hash ||= {}; # in case nothing was given
die "'-parameters' has to be a hash" unless(ref($parameters_hash) eq 'HASH');
'logic_name' => $logic_name,
'module' => $module,
'parameters' => $parameters_hash,
'resource_class' => $resource_class,
'failed_job_tolerance' => $failed_job_tolerance,
'max_retry_count' => $max_retry_count,
'can_be_empty' => $can_be_empty,
'priority' => $priority,
'meadow_type' => $meadow_type,
'analysis_capacity' => $analysis_capacity,
);
$analysis->get_compiled_module_name(); # check if it compiles and is named correctly
'analysis' => $analysis,
'batch_size' => $batch_size,
'hive_capacity' => $hive_capacity,
'status' => $blocked ? 'BLOCKED' : 'EMPTY', # be careful, as this "soft" way of blocking may be accidentally unblocked by deep sync
'total_job_count' => 0,
'semaphored_job_count' => 0,
'ready_job_count' => 0,
'done_job_count' => 0,
'failed_job_count' => 0,
'num_running_workers' => 0,
'num_required_workers' => 0,
'behaviour' => 'STATIC',
'input_capacity' => 4,
'output_capacity' => 4,
'sync_lock' => 0,
);
}
# now create the corresponding jobs (if there are any):
if($input_ids) {
'prev_job' => undef, # these jobs are created by the initialization script, not by another job
'analysis' => $analysis,
'input_id' => $_, # input_ids are now centrally stringified in the AnalysisJob itself
) } @$input_ids;
$stats->recalculate_from_job_counts( { 'READY' => scalar(@$input_ids) } );
}
}
warn "Done.\n\n";
warn "Adding Control and Dataflow Rules ...\n";
foreach my $aha (@{$self->pipeline_analyses}) {
my ($logic_name, $wait_for, $flow_into)
= @{$aha}{qw(-logic_name -wait_for -flow_into)}; # slicing a hash reference
$wait_for ||= [];
$wait_for = [ $wait_for ] unless(ref($wait_for) eq 'ARRAY'); # force scalar into an arrayref
# create control rules:
foreach my $condition_url (@$wait_for) {
unless ($condition_url =~ m{^\w*:
or die "Could not find a local analysis '$condition_url' to create a control rule (in '".($analysis->logic_name)."')\n";
}
'condition_analysis_url' => $condition_url,
'ctrled_analysis' => $analysis,
);
}
$flow_into ||= {};
$flow_into = { 1 => $flow_into } unless(ref($flow_into) eq 'HASH'); # force non-hash into a hash
my %group_tag_to_funnel_dataflow_rule = ();
my $semaphore_sign = '->';
my @all_branch_tags = keys %$flow_into;
foreach my $branch_tag ((grep {/^[A-Z]$semaphore_sign/} @all_branch_tags), (grep {/$semaphore_sign[A-Z]$/} @all_branch_tags), (grep {!/$semaphore_sign/} @all_branch_tags)) {
my ($branch_name_or_code, $group_role, $group_tag);
if($branch_tag=~/^([A-Z])$semaphore_sign(-?\w+)$/) {
($branch_name_or_code, $group_role, $group_tag) = ($2, 'funnel', $1);
} elsif($branch_tag=~/^(-?\w+)$semaphore_sign([A-Z])$/) {
($branch_name_or_code, $group_role, $group_tag) = ($1, 'fan', $2);
} elsif($branch_tag=~/^(-?\w+)$/) {
($branch_name_or_code, $group_role, $group_tag) = ($1, '');
} elsif($branch_tag=~/:/) {
die "Please use newer '2${semaphore_sign}A' and 'A${semaphore_sign}1' notation instead of '2:1' and '1'\n";
} else {
die "Error parsing the group tag '$branch_tag'\n";
}
my $funnel_dataflow_rule = undef; # NULL by default
if($group_role eq 'fan') {
unless($funnel_dataflow_rule = $group_tag_to_funnel_dataflow_rule{$group_tag}) {
die "No funnel dataflow_rule defined for group '$group_tag'\n";
}
}
my $heirs = $flow_into->{$branch_tag};
$heirs = [ $heirs ] unless(ref($heirs)); # force scalar into an arrayref first
$heirs = { map { ($_ => undef) } @$heirs } if(ref($heirs) eq 'ARRAY'); # now force it into a hash if it wasn't
while(my ($heir_url, $input_id_template_list) = each %$heirs) {
unless ($heir_url =~ m{^\w*://}) {
my $heir_analysis = Bio::EnsEMBL::Hive::Analysis->collection()->find_one_by('logic_name', $heir_url)
or die "Could not find a local analysis named '$heir_url' (dataflow from analysis '".($analysis->logic_name)."')\n";
}
$input_id_template_list = [ $input_id_template_list ] unless(ref($input_id_template_list) eq 'ARRAY'); # allow for more than one template per analysis
foreach my $input_id_template (@$input_id_template_list) {
my $df_rule = Bio::EnsEMBL::Hive::DataflowRule->add_new_or_update(
'from_analysis' => $analysis,
'to_analysis_url' => $heir_url,
'branch_code' => $branch_name_or_code,
'funnel_dataflow_rule' => $funnel_dataflow_rule,
'input_id_template' => $input_id_template,
);
if($group_role eq 'funnel') {
if($group_tag_to_funnel_dataflow_rule{$group_tag}) {
die "More than one funnel dataflow_rule defined for group '$group_tag'\n";
} else {
$group_tag_to_funnel_dataflow_rule{$group_tag} = $df_rule;
}
}
} # /for all templates
} # /for all heirs
} # /for all branch_tags
} # /for all pipeline_analyses
warn "Done.\n\n";
}