Commit 55cd98de authored by Jessica Severin's avatar Jessica Severin
Browse files

created Bio::EnsEMBL::Hive::Process as a processing module superclass alternative

to RunnableDB to allow full benefit of dataflow graph capabilities.
- Removed from Extension.pm branch_code, analysis_job_id, reset_job extensions to
  RunnableDB (no longer trying to shoe-horn hive 'extra' functions into them)
- Bio::EnsEMBL::Hive::Process mirrors some of the RunnableDB interface
  (new, analysis, fetch_input, run, write_output)
  but uses a new job interface (input_job, dataflow_output_id) instead of
  input_id (but provides convenience method $self->input_id which redirects to
  $self->input_job->input_id to simplify porting)
- Changed Worker to only use hive 'extended' function if the processing module
  isa(Bio::EnsEMBL::Hive::Process).  Also allows all RunnableDB modules to
  still be used (or any object which implements a minimal 'RunnableDB interface')
  (new, input_id, db, fetch_input, run, write_output)
parent d6e4f9f9
......@@ -45,18 +45,18 @@ use Bio::EnsEMBL::Pipeline::RunnableDB;
#use Bio::EnsEMBL::Analysis::RunnableDB;
=head2 Bio::EnsEMBL::Analysis::runnableDB
=head2 Bio::EnsEMBL::Analysis::process
Arg [1] : none
Example : $runnable_db = $analysis->runnableDB;
Description: from the $analysis->module construct a runnableDB object
Returntype : Bio::EnsEMBL::Pipeline::RunnableDB
Example : $process = $analysis->process;
Description: from the $analysis->module construct a Process object
Returntype : Bio::EnsEMBL::Hive::Process subclass
Exceptions : none
Caller : general
=cut
sub Bio::EnsEMBL::Analysis::runnableDB
sub Bio::EnsEMBL::Analysis::process
{
my $self = shift; #self is an Analysis object
......@@ -64,25 +64,26 @@ sub Bio::EnsEMBL::Analysis::runnableDB
die("self must be a [Bio::EnsEMBL::Analysis] not a [$self]")
unless($self->isa('Bio::EnsEMBL::Analysis'));
my $runnable;
if($self->module =~ "Bio::") { $runnable = $self->module; }
else { $runnable = "Bio::EnsEMBL::Pipeline::RunnableDB::".$self->module; }
(my $file = $runnable) =~ s/::/\//g;
my $process_class;
if($self->module =~ /::/) { $process_class = $self->module; }
else { $process_class = "Bio::EnsEMBL::Pipeline::RunnableDB::".$self->module; }
(my $file = $process_class) =~ s/::/\//g;
require "$file.pm";
print STDERR "creating runnable ".$file."\n" if($self->{'verbose'});
#make copy of analysis ($self) to pass into the runnableDB
#to insulate the infrastructure from any modification the runnableDB may
#make copy of analysis ($self) to pass into the Process
#to insulate the infrastructure from any modification the Process may
#do to the analysis object
my $copy_self = new Bio::EnsEMBL::Analysis;
%$copy_self = %$self;
$runnable =~ s/\//::/g;
my $runobj = "$runnable"->new(-db => $self->adaptor->db,
$process_class =~ s/\//::/g;
my $runobj = "$process_class"->new(
-db => $self->adaptor->db,
-input_id => '1',
-analysis => $self,
);
print STDERR "Instantiated ".$runnable." runnabledb\n" if($self->{'verbose'});
print STDERR "Instantiated ". $process_class. " runnabledb\n" if($self->{'verbose'});
return $runobj
}
......@@ -194,12 +195,6 @@ sub Bio::EnsEMBL::Analysis::stats
# Bio::EnsEMBL::Pipeline::RunnableDB
#######################################
sub Bio::EnsEMBL::Pipeline::RunnableDB::reset_job
{
my $self = shift;
return 1;
}
=head2 Bio::EnsEMBL::Pipeline::RunnableDB::global_cleanup
Arg [1] : none
......@@ -217,34 +212,6 @@ sub Bio::EnsEMBL::Pipeline::RunnableDB::global_cleanup
return 1;
}
=head2 Bio::EnsEMBL::Pipeline::RunnableDB::branch_code
Arg [1] : none
Description : method which user RunnableDB can override if it needs to return
a specific branch code. Used by the dataflow rules to determine which
job to create/run next
Returntype : int (default 1)
Exceptions : none
Caller : Bio::EnsEMBL::Hive::Worker
=cut
sub Bio::EnsEMBL::Pipeline::RunnableDB::branch_code
{
my $self = shift;
$self->{'_branch_code'} = shift if(@_);
$self->{'_branch_code'}=1 unless($self->{'_branch_code'});
return $self->{'_branch_code'};
}
sub Bio::EnsEMBL::Pipeline::RunnableDB::analysis_job_id
{
my $self = shift;
$self->{'_analysis_job_id'} = shift if(@_);
$self->{'_analysis_job_id'}=0 unless($self->{'_analysis_job_id'});
return $self->{'_analysis_job_id'};
}
sub Bio::EnsEMBL::Pipeline::RunnableDB::debug {
my $self = shift;
$self->{'_debug'} = shift if(@_);
......@@ -257,34 +224,12 @@ sub Bio::EnsEMBL::Pipeline::RunnableDB::debug {
# Bio::EnsEMBL::Analysis::RunnableDB
#######################################
sub Bio::EnsEMBL::Analysis::RunnableDB::reset_job
{
my $self = shift;
return 1;
}
sub Bio::EnsEMBL::Analysis::RunnableDB::global_cleanup
{
my $self = shift;
return 1;
}
sub Bio::EnsEMBL::Analysis::RunnableDB::branch_code
{
my $self = shift;
$self->{'_branch_code'} = shift if(@_);
$self->{'_branch_code'}=1 unless($self->{'_branch_code'});
return $self->{'_branch_code'};
}
sub Bio::EnsEMBL::Analysis::RunnableDB::analysis_job_id
{
my $self = shift;
$self->{'_analysis_job_id'} = shift if(@_);
$self->{'_analysis_job_id'}=0 unless($self->{'_analysis_job_id'});
return $self->{'_analysis_job_id'};
}
sub Bio::EnsEMBL::Analysis::RunnableDB::debug {
my $self = shift;
$self->{'_debug'} = shift if(@_);
......
#
# You may distribute this module under the same terms as perl itself
#
# POD documentation - main docs before the code
=pod
=head1 NAME
Bio::EnsEMBL::Hive::Process
=cut
=head1 SYNOPSIS
Object categories to extend the functionality of existing classes
=cut
=head1 DESCRIPTION
=cut
=head1 CONTACT
Contact Jessica Severin on EnsEMBL::Hive implemetation/design detail: jessica@ebi.ac.uk
Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk
=cut
=head1 APPENDIX
The rest of the documentation details each of the object methods.
Internal methods are usually preceded with a _
=cut
my $g_hive_process_workdir; # a global directory location for the process using this module
package Bio::EnsEMBL::Hive::Process;
use strict;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::AnalysisJob;
sub new {
my ($class,@args) = @_;
my $self = bless {}, $class;
my ($analysis) = rearrange([qw( ANALYSIS )], @args);
$self->analysis($analysis) if($analysis);
return $self;
}
=head2 queen
Title : queen
Usage : my $hiveDBA = $self->db;
Function: getter/setter for 'Queen' this Process was created by
Returns : Bio::EnsEMBL::Hive::Queen
=cut
sub queen {
my $self = shift;
$self->{'_queen'} = shift if(@_);
return $self->{'_queen'};
}
=head2 db
Title : db
Usage : my $hiveDBA = $self->db;
Function: returns DBAdaptor to Hive database
Returns : Bio::EnsEMBL::Hive::DBSQL::DBAdaptor
=cut
sub db {
my $self = shift;
return undef unless($self->queen);
return $self->queen->db;
}
sub dbc {
my $self = shift;
return undef unless($self->queen);
return $self->queen->dbc;
}
=head2 analysis
Title : analysis
Usage : $self->analysis;
Function: Gets or sets the stored Analysis object
Set by Worker, available to get by the process.
Returns : Bio::EnsEMBL::Analysis object
Args : Bio::EnsEMBL::Analysis object
=cut
sub analysis {
my ($self, $analysis) = @_;
if($analysis) {
throw("Not a Bio::EnsEMBL::Analysis object")
unless ($analysis->isa("Bio::EnsEMBL::Analysis"));
$self->{'_analysis'} = $analysis;
}
return $self->{'_analysis'};
}
=head2 input_job
Title : input_job
Function: Gets or sets the AnalysisJob to be run by this process
Set by Worker, available to get by the process.
Returns : Bio::EnsEMBL::Analysis object
=cut
sub input_job {
my( $self, $job ) = @_;
if($job) {
throw("Not a Bio::EnsEMBL::Hive::AnalysisJob object")
unless ($job->isa("Bio::EnsEMBL::Hive::AnalysisJob"));
$self->{'_input_job'} = $job;
}
return $self->{'_input_job'};
}
=head2 dataflow_output_id
Title : dataflow_output_id
Arg[1](req) : <string> $output_id
Arg[2](opt) : <int> $branch_code (optional, defaults to 1)
Usage : $self->dataflow_output_id($output_id, $branch_code);
Function:
If Process needs to create jobs, this allows it to have 'extra' jobs
created and flowed through the dataflow rules of the analysis graph.
The 'output_id' becomes the 'input_id' of the newly created job and
the ends of the dataflow pipes. The optional 'branch_code' determines
which pipe(s) to flow the job through.
=cut
sub dataflow_output_id {
my ($self, $output_id, $branch_code) = @_;
return unless($output_id);
return unless($self->analysis);
my $job = new Bio::EnsEMBL::Hive::AnalysisJob;
$job->input_id($output_id);
$job->analysis_id($self->analysis->dbID);
$job->branch_code($branch_code) if(defined($branch_code));
$self->queen->flow_output_job($job);
}
sub debug {
my $self = shift;
$self->{'_debug'} = shift if(@_);
$self->{'_debug'}=0 unless(defined($self->{'_debug'}));
return $self->{'_debug'};
}
sub encode_hash {
my $self = shift;
my $hash_ref = shift;
return "" unless($hash_ref);
my $hash_string = "{";
my @keys = sort(keys %{$hash_ref});
foreach my $key (@keys) {
if(defined($hash_ref->{$key})) {
$hash_string .= "'$key'=>'" . $hash_ref->{$key} . "',";
}
}
$hash_string .= "}";
return $hash_string;
}
sub worker_temp_directory {
unless(defined($g_hive_process_workdir) and (-e $g_hive_process_workdir)) {
#create temp directory to hold fasta databases
$g_hive_process_workdir = "/tmp/worker.$$/";
mkdir($g_hive_process_workdir, 0777);
}
return $g_hive_process_workdir;
}
#################################################
#
# methods to make porting from RunnableDB easier
#
#################################################
sub input_id {
my $self = shift;
return '' unless($self->input_job);
return $self->input_job->input_id;
}
##########################################
#
# methods subclasses should override
# in order to give this process function
#
##########################################
sub fetch_input {
my $self = shift;
return 1;
}
sub run {
my $self = shift;
return 1;
}
sub write_output {
my $self = shift;
return 1;
}
sub global_cleanup {
if($g_hive_process_workdir) {
unlink(<$g_hive_process_workdir/*>);
rmdir($g_hive_process_workdir);
}
return 1;
}
1;
......@@ -51,8 +51,8 @@ package Bio::EnsEMBL::Hive::RunnableDB::Dummy;
use strict;
use Bio::EnsEMBL::Pipeline::RunnableDB;
our @ISA = qw(Bio::EnsEMBL::Pipeline::RunnableDB);
use Bio::EnsEMBL::Hive::Process;
our @ISA = qw(Bio::EnsEMBL::Hive::Process);
##############################################################
......
......@@ -50,8 +50,8 @@ package Bio::EnsEMBL::Hive::RunnableDB::SystemCmd;
use strict;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisDataAdaptor;
use Bio::EnsEMBL::Pipeline::RunnableDB;
our @ISA = qw(Bio::EnsEMBL::Pipeline::RunnableDB);
use Bio::EnsEMBL::Hive::Process;
our @ISA = qw(Bio::EnsEMBL::Hive::Process);
##############################################################
......
......@@ -77,11 +77,11 @@ use Time::HiRes qw(time);
use Bio::EnsEMBL::Analysis;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Pipeline::RunnableDB;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
use Bio::EnsEMBL::Hive::Extensions;
use Bio::EnsEMBL::Hive::Process;
sub new {
my ($class,@args) = @_;
......@@ -298,7 +298,7 @@ sub print_worker {
before querying the database for the next job batch. Used by the
Hive system to manage the number of workers needed to complete a
particular job type.
DefaultValue : batch_size of runnableDB in analysis
DefaultValue : batch_size of analysis
Returntype : integer scalar
=cut
......@@ -335,8 +335,8 @@ sub batch_size {
First all STDOUT/STDERR is rediected, then looping commences.
Looping consists of
1) claiming jobs,
2) processing those jobs through the module(runnableDB) of
the analysis asigned to this worker,
2) processing those jobs through an instance of the 'module class' of
the analysis asigned to this worker,
3) updating the analysis_job, analysis_stats, and hive tables to track the
progress of the job, the analysis and this worker.
Looping stops when any one of these are met:
......@@ -419,7 +419,7 @@ sub run
if($self->perform_global_cleanup) {
#have runnable cleanup any global/process files/data it may have created
$self->analysis->runnableDB->global_cleanup();
$self->analysis->process->global_cleanup();
}
$self->queen->register_worker_death($self);
......@@ -445,16 +445,21 @@ sub run_module_with_job
my $self = shift;
my $job = shift;
my $runObj = $self->analysis->runnableDB;
my $runObj = $self->analysis->process;
return 0 unless($runObj);
return 0 unless($job and ($job->hive_id eq $self->hive_id));
my $start_time = time() * 1000;
$self->queen->dbc->query_count(0);
#pass the input_id from the job into the runnableDB object
$runObj->input_id($job->input_id);
$runObj->analysis_job_id($job->dbID);
#pass the input_id from the job into the Process object
if($runObj->isa("Bio::EnsEMBL::Hive::Process")) {
$runObj->input_job($job);
$runObj->queen($self->queen);
} else {
$runObj->input_id($job->input_id);
$runObj->db($self->db);
}
$runObj->debug($self->debug);
$job->update_status('GET_INPUT');
......@@ -469,11 +474,6 @@ sub run_module_with_job
print("WRITE_OUTPUT\n") if($self->debug);
$runObj->write_output;
#runnableDB is allowed to alter its input_id on output
#This modified input_id is passed as input to the next jobs in the graph
$job->input_id($runObj->input_id);
$job->branch_code($runObj->branch_code);
$job->query_count($self->queen->dbc->query_count);
$job->runtime_msec(time()*1000 - $start_time);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment