Commit d5c63e99 authored by Leo Gordon's avatar Leo Gordon
Browse files

restructured parameters, so they are now a part of AnalysisJob and can be...

restructured parameters, so they are now a part of AnalysisJob and can be manipulated outside of a living worker/process
parent 6fdfc56b
...@@ -28,9 +28,11 @@ package Bio::EnsEMBL::Hive::AnalysisJob; ...@@ -28,9 +28,11 @@ package Bio::EnsEMBL::Hive::AnalysisJob;
use strict; use strict;
use Bio::EnsEMBL::Analysis; #use Bio::EnsEMBL::Analysis;
use Bio::EnsEMBL::DBSQL::DBAdaptor; #use Bio::EnsEMBL::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::Worker; #use Bio::EnsEMBL::Hive::Worker;
use base ('Bio::EnsEMBL::Hive::Params');
sub new { sub new {
my ($class,@args) = @_; my ($class,@args) = @_;
...@@ -191,6 +193,81 @@ sub incomplete { # Job should set this to 0 prior to throwing if the ...@@ -191,6 +193,81 @@ sub incomplete { # Job should set this to 0 prior to throwing if the
##-----------------[/indicators to the Worker]------------------------------- ##-----------------[/indicators to the Worker]-------------------------------
=head2 dataflow_output_id
Title : dataflow_output_id
Arg[1](req) : <string> $output_id
Arg[2](opt) : <int> $branch_code (optional, defaults to 1)
Arg[3](opt) : <hashref> $create_job_options (optional, defaults to {}, options added to the CreateNewJob method)
Usage : $self->dataflow_output_id($output_id, $branch_code);
Function:
If a RunnableDB(Process) needs to create jobs, this allows it to have jobs
created and flowed through the dataflow rules of the workflow graph.
This 'output_id' becomes the 'input_id' of the newly created job at
the ends of the dataflow pipes. The optional 'branch_code' determines
which dataflow pipe(s) to flow the job through.
=cut
sub dataflow_output_id {
my ($self, $output_ids, $branch_code, $create_job_options) = @_;
$output_ids ||= [ $self->input_id() ]; # replicate the input_id in the branch_code's output by default
$output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY'); # force previously used single values into an arrayref
$branch_code ||= 1; # default branch_code is 1
$create_job_options ||= {}; # { -block => 1 } or { -semaphore_count => scalar(@fan_job_ids) } or { -semaphored_job_id => $funnel_job_id }
# this tricky code is responsible for correct propagation of semaphores down the dataflow pipes:
my $propagate_semaphore = not exists ($create_job_options->{'-semaphored_job_id'}); # CONVENTION: if zero is explicitly supplied, it is a request not to propagate
# However if nothing is supplied, semaphored_job_id will be propagated from the parent job:
my $semaphored_job_id = $create_job_options->{'-semaphored_job_id'} ||= $self->semaphored_job_id();
# if branch_code is set to 1 (explicitly or impliticly), turn off automatic dataflow:
$self->autoflow(0) if($branch_code==1);
my @output_job_ids = ();
my $rules = $self->adaptor->db->get_DataflowRuleAdaptor->fetch_from_analysis_id_branch_code($self->analysis_id, $branch_code);
foreach my $rule (@{$rules}) {
my $substituted_template;
if(my $template = $rule->input_id_template()) {
$substituted_template = $self->param_substitute($template);
}
my $target_analysis_or_table = $rule->to_analysis();
foreach my $output_id ($substituted_template ? ($substituted_template) : @$output_ids) {
if($target_analysis_or_table->can('dataflow')) {
my $insert_id = $target_analysis_or_table->dataflow( $output_id );
} else {
if(my $job_id = $self->adaptor->CreateNewJob(
-input_id => $output_id,
-analysis => $target_analysis_or_table,
-input_job_id => $self->dbID, # creator_job's id
%$create_job_options
)) {
if($semaphored_job_id and $propagate_semaphore) {
$self->adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id ); # propagate the semaphore
}
# only add the ones that were indeed created:
push @output_job_ids, $job_id;
} elsif($semaphored_job_id and !$propagate_semaphore) {
$self->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # if we didn't succeed in creating the job, fix the semaphore
}
}
}
}
return \@output_job_ids;
}
sub print_job { sub print_job {
my $self = shift; my $self = shift;
my $logic_name = $self->adaptor() my $logic_name = $self->adaptor()
......
...@@ -40,7 +40,6 @@ package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor; ...@@ -40,7 +40,6 @@ package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use strict; use strict;
use Data::UUID; use Data::UUID;
use Sys::Hostname;
use Bio::EnsEMBL::DBSQL::BaseAdaptor; use Bio::EnsEMBL::DBSQL::BaseAdaptor;
use Bio::EnsEMBL::Utils::Argument; use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception; use Bio::EnsEMBL::Utils::Exception;
...@@ -50,8 +49,6 @@ use Bio::EnsEMBL::Hive::Utils 'stringify'; # import 'stringify()' ...@@ -50,8 +49,6 @@ use Bio::EnsEMBL::Hive::Utils 'stringify'; # import 'stringify()'
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor'); use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
# our $max_retry_count = 7;
############################################################################### ###############################################################################
# #
# CLASS methods # CLASS methods
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::Params
=head1 SYNOPSIS
By inheriting from this module you make your module able to deal with parameters:
1) parsing of parameters in the order of precedence, starting with the lowest:
#
## general usage:
# $self->param_init( $lowest_precedence_hashref, $middle_precedence_hashref, $highest_precedence_hashref );
#
## typical usage:
# $job->param_init(
# $runObj->param_defaults(), # module-wide built-in defaults have the lowest precedence (will always be the same for this module)
# $self->db->get_MetaContainer->get_param_hash(), # then come the pipeline-wide parameters from the 'meta' table (define things common to all modules in this pipeline)
# $self->analysis->parameters(), # analysis-wide 'parameters' are even more specific (can be defined differently for several occurence of the same module)
# $job->input_id(), # job-specific 'input_id' parameters have the highest precedence
# );
2) reading a parameter's value
#
# my $source = $self->param('source'); )
3) dynamically setting a parameter's value
#
# $self->param('binpath', '/software/ensembl/compara');
#
Note: It proved to be a convenient mechanism to exchange params
between fetch_input(), run(), write_output() and other methods.
=head1 DESCRIPTION
Most of Compara RunnableDB methods work under assumption
that both analysis.parameters and analysis_job.input_id fields contain a Perl-style parameter hashref as a string.
This module implements a generic param() method that allows to set parameters according to the following parameter precedence rules:
(1) Job-Specific parameters defined in analysis_job.input_id hash, they have the highest priority and override everything else.
(2) Analysis-Wide parameters defined in analysis.parameters hash. Can be overridden by (1).
(3) Pipeline-Wide parameters defined in the 'meta' table. Can be overridden by (1) and (2).
(4) Module_Defaults that are hard-coded into modules have the lowest precedence. Can be overridden by (1), (2) and (3).
=head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut
package Bio::EnsEMBL::Hive::Params;
use strict;
use Bio::EnsEMBL::Hive::Utils ('stringify'); # import stringify()
=head2 param_init
Description: Parses the parameters from all sources in the reverse precedence order (supply the lowest precedence hash first).
=cut
sub param_init {
my $self = shift @_;
my $strict_hash_format = shift @_;
if( !$self->{'_param_hash'} ) {
$self->{'_param_hash'} = {};
foreach my $source (@_) {
if(ref($source) ne 'HASH') {
if($strict_hash_format or $source=~/^\{.*\}$/) {
my $param_hash = eval($source) || {};
if($@ or (ref($param_hash) ne 'HASH')) {
die "Expected a {'param'=>'value'} hashref, but got the following string instead: '$source'\n";
}
$source = $param_hash;
} else {
$source = {};
}
}
while(my ($k,$v) = each %$source ) {
$self->{'_param_hash'}{$k} = $v;
}
}
}
}
=head2 param
Arg [1] : string $param_name
Arg [2] : (optional) $param_value
Description: A getter/setter method for a job's parameters that are initialized through 4 levels of precedence (see param_init() )
Example 1 : my $source = $self->param('source'); # acting as a getter
Example 2 : $self->param('binpath', '/software/ensembl/compara'); # acting as a setter
Returntype : any Perl structure or object that you dared to store
=cut
sub param {
my $self = shift @_;
my $param_name = shift @_;
if(@_) { # If there is a value (even if undef), then set it!
$self->{'_param_hash'}{$param_name} = shift @_;
}
return $self->{'_param_hash'}{$param_name};
}
=head2 param_substitute
Arg [1] : Perl structure $string_with_templates
Description: Performs parameter substitution on strings that contain templates like " #param_name# followed by #another_param_name# " .
Returntype : *another* Perl structure with matching topology (may be more complex as a result of substituting a substructure for a term)
=cut
sub param_substitute {
my ($self, $structure) = @_;
my $type = ref($structure);
if(!$type) {
if($structure=~/^#([^#]*)#$/) { # if the given string is one complete substitution, we don't want to force the output into a string
return $self->_subst_one_hashpair($1);
} else {
$structure=~s/(?:#(.+?)#)/$self->_subst_one_hashpair($1)/eg;
return $structure;
}
} elsif($type eq 'ARRAY') {
my @substituted_array = ();
foreach my $element (@$structure) {
push @substituted_array, $self->param_substitute($element);
}
return \@substituted_array;
} elsif($type eq 'HASH') {
my %substituted_hash = ();
while(my($key,$value) = each %$structure) {
$substituted_hash{$self->param_substitute($key)} = $self->param_substitute($value);
}
return \%substituted_hash;
} else {
die "Could not substitute parameters in $structure";
}
}
sub mysql_conn { # an example stringification formatter (others can be defined here or in a descendent of Params)
my ($self, $db_conn) = @_;
return "--host=$db_conn->{-host} --port=$db_conn->{-port} --user='$db_conn->{-user}' --pass='$db_conn->{-pass}' $db_conn->{-dbname}";
}
sub mysql_dbname { # another example stringification formatter
my ($self, $db_conn) = @_;
return $db_conn->{-dbname};
}
sub csvq { # another example stringification formatter
my ($self, $list) = @_;
return join(',', map { "'$_'" } @$list);
}
#--------------------------------------------[private methods]----------------------------------------------
=head2 _subst_one_hashpair
Description: this is a private method that performs one substitution. Called by param_substitute().
=cut
sub _subst_one_hashpair {
my ($self, $inside_hashes) = @_;
if($inside_hashes=~/^\w+$/) {
return $self->param($inside_hashes);
} elsif($inside_hashes=~/^(\w+):(\w+)$/) {
return $self->$1($self->param($2));
} elsif($inside_hashes=~/^expr\((.*)\)expr$/) {
my $expression = $1;
$expression=~s/(?:\$(\w+))/stringify($self->param($1))/eg;
return eval($expression);
}
}
1;
...@@ -107,6 +107,31 @@ sub new { ...@@ -107,6 +107,31 @@ sub new {
# #
########################################## ##########################################
=head2 strict_hash_format
Title : strict_hash_format
Function: if a subclass wants more flexibility in parsing analysis_job.input_id and analysis.parameters,
it should redefine this method to return 0
=cut
sub strict_hash_format {
return 1;
}
=head2 param_defaults
Title : param_defaults
Function: sublcass can define defaults for all params used by the RunnableDB/Process
=cut
sub param_defaults {
return {};
}
=head2 fetch_input =head2 fetch_input
Title : fetch_input Title : fetch_input
...@@ -270,85 +295,31 @@ sub input_job { ...@@ -270,85 +295,31 @@ sub input_job {
return $self->{'_input_job'}; return $self->{'_input_job'};
} }
=head2 dataflow_output_id
Title : dataflow_output_id # ##################### subroutines that link through to Job's methods #########################
Arg[1](req) : <string> $output_id
Arg[2](opt) : <int> $branch_code (optional, defaults to 1)
Arg[3](opt) : <hashref> $create_job_options (optional, defaults to {}, options added to the CreateNewJob method)
Usage : $self->dataflow_output_id($output_id, $branch_code);
Function:
If Process needs to create jobs, this allows it to have jobs
created and flowed through the dataflow rules of the workflow graph.
This 'output_id' becomes the 'input_id' of the newly created job at
the ends of the dataflow pipes. The optional 'branch_code' determines
which dataflow pipe(s) to flow the job through.
=cut sub input_id {
my $self = shift;
return '' unless($self->input_job);
return $self->input_job->input_id;
}
sub param {
my $self = shift @_;
return $self->input_job->param(@_);
}
sub param_substitute {
my $self = shift @_;
return $self->input_job->param_substitute(@_);
}
sub dataflow_output_id { sub dataflow_output_id {
my ($self, $output_ids, $branch_code, $create_job_options) = @_; my $self = shift @_;
return unless($self->analysis); return $self->input_job->dataflow_output_id(@_);
return unless($self->input_job);
$output_ids ||= [ $self->input_id() ]; # replicate the input_id in the branch_code's output by default
$output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY'); # force previously used single values into an arrayref
$branch_code ||= 1; # default branch_code is 1
$create_job_options ||= {}; # { -block => 1 } or { -semaphore_count => scalar(@fan_job_ids) } or { -semaphored_job_id => $funnel_job_id }
# this tricky code is responsible for correct propagation of semaphores down the dataflow pipes:
my $propagate_semaphore = not exists ($create_job_options->{'-semaphored_job_id'}); # CONVENTION: if zero is explicitly supplied, it is a request not to propagate
# However if nothing is supplied, semaphored_job_id will be propagated from the parent job:
my $semaphored_job_id = $create_job_options->{'-semaphored_job_id'} ||= $self->input_job->semaphored_job_id();
# if branch_code is set to 1 (explicitly or impliticly), turn off automatic dataflow:
$self->input_job->autoflow(0) if($branch_code==1);
my @output_job_ids = ();
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
my $rules = $self->db->get_DataflowRuleAdaptor->fetch_from_analysis_id_branch_code($self->analysis->dbID, $branch_code);
foreach my $rule (@{$rules}) {
my $substituted_template;
if(my $template = $rule->input_id_template()) {
if($self->can('param_substitute')) {
$substituted_template = $self->param_substitute($template);
} else {
die "In order to use input_id_template your RunnableDB has to be derived from Bio::EnsEMBL::Hive::ProcessWithParams\n";
}
}
my $target_analysis_or_table = $rule->to_analysis();
foreach my $output_id ($substituted_template ? ($substituted_template) : @$output_ids) {
if($target_analysis_or_table->can('dataflow')) {
my $insert_id = $target_analysis_or_table->dataflow( $output_id );
} else {
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-analysis => $target_analysis_or_table,
-input_job_id => $self->input_job->dbID, # creator_job's id
%$create_job_options
)) {
if($semaphored_job_id and $propagate_semaphore) {
$job_adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id ); # propagate the semaphore
}
# only add the ones that were indeed created:
push @output_job_ids, $job_id;
} elsif($semaphored_job_id and !$propagate_semaphore) {
$job_adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # if we didn't succeed in creating the job, fix the semaphore
}
}
}
}
return \@output_job_ids;
} }
...@@ -432,12 +403,6 @@ sub worker_temp_directory { ...@@ -432,12 +403,6 @@ sub worker_temp_directory {
# #
################################################# #################################################
sub input_id {
my $self = shift;
return '' unless($self->input_job);
return $self->input_job->input_id;
}
sub parameters { sub parameters {
my $self = shift; my $self = shift;
return '' unless($self->analysis); return '' unless($self->analysis);
...@@ -526,4 +491,3 @@ sub check_if_exit_cleanly { ...@@ -526,4 +491,3 @@ sub check_if_exit_cleanly {
1; 1;
=pod =pod
=head1 NAME =head1 OBSOLETE CLASS
Bio::EnsEMBL::Hive::ProcessWithParams Bio::EnsEMBL::Hive::ProcessWithParams
=head1 SYNOPSIS =head1 SYNOPSIS
This module extends Bio::EnsEMBL::Hive::Process by implementing the following capabilities: This module is scheduled for removal. Please use Bio::EnsEMBL::Hive::Process instead.
1) parsing of parameters in the right order of precedence (including built-in defaults, if supplied).
This parsing happens automatically and in most cases you should not need to activate it.
However you may want to provide built-in defaults for your RunnableDB
by defining your own param_defaults() method that returns a hashref.
2) reading a parameter's value
#
# my $source = $self->param('source'); )
3) dynamically setting a parameter's value
#
# $self->param('binpath', '/software/ensembl/compara');
#
Note: It proved to be a convenient mechanism to exchange params
between fetch_input(), run(), write_output() and other methods.
=head1 DESCRIPTION
Most of Compara RunnableDB methods work under assumption
that both analysis.parameters and analysis_job.input_id fields contain a Perl-style parameter hashref as a string.
This module extends Bio::EnsEMBL::Hive::Process by implementing a generic param() method that sets module parameters
accorting to the following parameter precedence rules:
(1) Job-Specific parameters defined in analysis_job.input_id hash, they have the highest priority and override everything else.
(2) Analysis-Wide parameters defined in analysis.parameters hash. Can be overridden by (1).
(3) Pipeline-Wide parameters defined in the 'meta' table. Can be overridden by (1) and (2).
(4) Module_Defaults that are hard-coded into modules have the lowest precedence. Can be overridden by (1), (2) and (3).
=head1 CONTACT
Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
=cut =cut
...@@ -51,205 +15,6 @@ accorting to the following parameter precedence rules: ...@@ -51,205 +15,6 @@ accorting to the following parameter precedence rules:
package Bio::EnsEMBL::Hive::ProcessWithParams; package Bio::EnsEMBL::Hive::ProcessWithParams;
use strict; use strict;
use Bio::EnsEMBL::Hive::Utils ('stringify'); # import stringify()
use base ('Bio::EnsEMBL::Hive::Process'); use base ('Bio::EnsEMBL::Hive::Process');
=head2 strict_hash_format
Description: This public virtual method should either return 1 or 0, depending on whether it is expected that input_id() and parameters() contain a hashref or not
Callers : Bio::EnsEMBL::Hive::RunnableDB::ProcessWithParams
=cut
sub strict_hash_format { # This public virtual must be redefined to "return 0;" in all inheriting classes
# that want more flexibility for the format of parameters() or input_id()
return 1;
}
=head2 param_defaults
Description: This public virtual method should return a hash of param defaults of the RunnableDB
Callers : Bio::EnsEMBL::Hive::RunnableDB::ProcessWithParams