Skip to content
Snippets Groups Projects
Commit 94086e7b authored by Patrick Meidl's avatar Patrick Meidl
Browse files

now isa Serialisable; added code to parallelise synteny rescoring

parent a7f2db25
No related branches found
No related tags found
No related merge requests found
......@@ -36,20 +36,45 @@ use strict;
use warnings;
no warnings 'uninitialized';
use Bio::EnsEMBL::IdMapping::BaseObject;
our @ISA = qw(Bio::EnsEMBL::IdMapping::BaseObject);
use Bio::EnsEMBL::IdMapping::Serialisable;
our @ISA = qw(Bio::EnsEMBL::IdMapping::Serialisable);
use Bio::EnsEMBL::Utils::Argument qw(rearrange);
use Bio::EnsEMBL::Utils::Exception qw(throw warning);
use Bio::EnsEMBL::Utils::ScriptUtils qw(path_append);
use Bio::EnsEMBL::IdMapping::SyntenyRegion;
use Bio::EnsEMBL::IdMapping::ScoredMappingMatrix;
use FindBin qw($Bin);
FindBin->again;
sub new {
my $caller = shift;
my $class = ref($caller) || $caller;
my $self = $class->SUPER::new(@_);
# initialise internal datastructure
$self->{'merged_syntenies'} = [];
my ($logger, $conf, $cache) = rearrange(['LOGGER', 'CONF', 'CACHE'], @_);
unless ($logger and ref($logger) and
$logger->isa('Bio::EnsEMBL::Utils::Logger')) {
throw("You must provide a Bio::EnsEMBL::Utils::Logger for logging.");
}
unless ($conf and ref($conf) and
$conf->isa('Bio::EnsEMBL::Utils::ConfParser')) {
throw("You must provide configuration as a Bio::EnsEMBL::Utils::ConfParser object.");
}
unless ($cache and ref($cache) and
$cache->isa('Bio::EnsEMBL::IdMapping::Cache')) {
throw("You must provide configuration as a Bio::EnsEMBL::IdMapping::Cache object.");
}
# initialise
$self->logger($logger);
$self->conf($conf);
$self->cache($cache);
$self->{'cache'} = [];
return $self;
}
......@@ -95,7 +120,11 @@ sub build_synteny {
}
# sort synteny regions
my @sorted = sort _by_overlap @synteny_regions;
#my @sorted = sort _by_overlap @synteny_regions;
my @sorted = reverse sort {
$a->source_seq_region_name cmp $b->source_seq_region_name ||
$a->source_start <=> $b->source_start ||
$a->source_end <=> $b->source_end } @synteny_regions;
$self->logger->info("SyntenyRegions before merging: ".scalar(@sorted)."\n");
......@@ -106,19 +135,22 @@ sub build_synteny {
my $last_sr = shift(@sorted);
while (my $sr = shift(@sorted)) {
#$self->logger->debug("this ".$sr->to_string."\n");
my $merged_sr = $last_sr->merge($sr);
if (! $merged_sr) {
unless ($last_merged) {
$self->add_SyntenyRegion($last_sr->stretch(2));
$last_merged = 0;
#$self->logger->debug("nnn ".$last_sr->to_string."\n");
}
$last_merged = 0;
} else {
$self->add_SyntenyRegion($merged_sr->stretch(2));
#$self->logger->debug("mmm ".$merged_sr->to_string."\n");
$last_merged = 1;
}
$last_sr = $sr;
}
......@@ -127,6 +159,10 @@ sub build_synteny {
$self->add_SyntenyRegion($last_sr->stretch(2));
$last_merged = 0;
}
#foreach my $sr (@{ $self->get_all_SyntenyRegions }) {
# $self->logger->debug("SRs ".$sr->to_string."\n");
#}
$self->logger->info("SyntenyRegions after merging: ".scalar(@{ $self->get_all_SyntenyRegions })."\n");
......@@ -135,24 +171,145 @@ sub build_synteny {
sub _by_overlap {
# first sort by seq_region
my $retval = ($a->source_seq_region_name cmp $b->source_seq_region_name);
my $retval = ($b->source_seq_region_name cmp $a->source_seq_region_name);
return $retval if ($retval);
# then sort by overlap:
# return -1 if $a is downstream, 1 if it's upstream, 0 if they overlap
if ($a->source_end < $b->source_start) { return -1; }
if ($a->source_start < $b->source_end) { return 1; }
if ($a->source_end < $b->source_start) { return 1; }
if ($a->source_start < $b->source_end) { return -1; }
return 0;
}
sub add_SyntenyRegion {
push @{ $_[0]->{'merged_syntenies'} }, $_[1];
push @{ $_[0]->{'cache'} }, $_[1];
}
sub get_all_SyntenyRegions {
return $_[0]->{'merged_syntenies'};
return $_[0]->{'cache'};
}
sub rescore_gene_matrix_lsf {
my $self = shift;
my $matrix = shift;
unless ($matrix and
$matrix->isa('Bio::EnsEMBL::IdMapping::ScoredMappingMatrix')) {
throw('Need a Bio::EnsEMBL::IdMapping::ScoredMappingMatrix.');
}
# serialise SyntenyFramework to disk
$self->logger->debug("Serialising SyntenyFramework...\n", 0, 'stamped');
$self->write_to_file;
$self->logger->debug("Done.\n", 0, 'stamped');
# split the ScoredMappingMatrix into chunks and write to disk
my $matrix_size = $matrix->size;
$self->logger->debug("Scores before rescoring: $matrix_size.\n");
my $num_jobs = $self->conf->param('synteny_rescore_jobs') || 20;
$num_jobs++;
my $dump_path = path_append($self->conf->param('dumppath'),
'matrix/synteny_rescore');
$self->logger->debug("Creating sub-matrices...\n", 0, 'stamped');
foreach my $i (1..$num_jobs) {
my $start = (int($matrix_size/($num_jobs-1)) * ($i - 1)) + 1;
my $end = int($matrix_size/($num_jobs-1)) * $i;
$self->logger->debug("$start-$end\n", 1);
my $sub_matrix = $matrix->sub_matrix($start, $end);
$sub_matrix->cache_file_name("gene_matrix_synteny$i.ser");
$sub_matrix->dump_path($dump_path);
$sub_matrix->write_to_file;
}
$self->logger->debug("Done.\n", 0, 'stamped');
# create an empty lsf log directory
my $logpath = path_append($self->logger->logpath, 'lsf_synteny_rescore');
system("rm -rf $logpath") == 0 or
$self->logger->error("Unable to delete lsf log dir $logpath: $!\n");
system("mkdir -p $logpath") == 0 or
$self->logger->error("Can't create lsf log dir $logpath: $!\n");
# build lsf command
my $lsf_name = 'idmapping_synteny_rescore_'.time;
my $options = $self->conf->create_commandline_options(
logauto => 1,
logautobase => "synteny_rescore_",
logpath => $logpath,
interactive => 0,
is_component => 1,
);
my $cmd = qq{$Bin/synteny_rescore.pl $options --index \$LSB_JOBINDEX};
my $pipe = qq{|bsub -J$lsf_name\[1-$num_jobs\] } .
qq{-o $logpath/synteny_rescore.\%I.out } .
qq{-e $logpath/synteny_rescore.\%I.err } .
$self->conf->param('lsf_opt_synteny_rescore');
# run lsf job array
$self->logger->info("Submitting $num_jobs jobs to lsf.\n");
$self->logger->debug("$cmd\n\n");
local *BSUB;
open BSUB, $pipe or
$self->logger->error("Could not open open pipe to bsub: $!\n");
print BSUB $cmd;
$self->logger->error("Error submitting synteny rescoring jobs: $!\n")
unless ($? == 0);
close BSUB;
# submit dependent job to monitor finishing of jobs
$self->logger->info("Waiting for jobs to finish...\n", 0, 'stamped');
my $dependent_job = qq{bsub -K -w "ended($lsf_name)" -q small } .
qq{-o $logpath/synteny_rescore_depend.out /bin/true};
system($dependent_job) == 0 or
$self->logger->error("Error submitting dependent job: $!\n");
$self->logger->info("All jobs finished.\n", 0, 'stamped');
# check for lsf errors
sleep(5);
my $err;
foreach my $i (1..$num_jobs) {
$err++ unless (-e "$logpath/synteny_rescore_$i.success");
}
if ($err) {
$self->logger->error("At least one of your jobs failed.\nPlease check the logfiles at $logpath for errors.\n");
}
# merge and return matrix
$self->logger->debug("Merging rescored matrices...\n");
$matrix->flush;
foreach my $i (1..$num_jobs) {
# read partial matrix created by lsf job from file
my $sub_matrix = Bio::EnsEMBL::IdMapping::ScoredMappingMatrix->new(
-DUMP_PATH => $dump_path,
-CACHE_FILE => "gene_matrix_synteny$i.ser",
);
$sub_matrix->read_from_file;
# merge with main matrix
$matrix->merge($sub_matrix);
}
$self->logger->debug("Done.\n");
$self->logger->debug("Scores after rescoring: ".$matrix->size.".\n");
return $matrix;
}
......@@ -171,7 +328,6 @@ sub rescore_gene_matrix {
my $retain_factor = 0.7;
foreach my $entry (@{ $matrix->get_all_Entries }) {
my $source_gene = $self->cache->get_by_key('genes_by_id', 'source',
$entry->source);
......@@ -185,10 +341,75 @@ sub rescore_gene_matrix {
$highest_score = $score if ($score > $highest_score);
}
#$self->logger->debug("highscore ".$entry->to_string." ".
# sprintf("%.6f\n", $highest_score));
$matrix->set_score($entry->source, $entry->target,
($entry->score * 0.7 + $highest_score * 0.3));
}
return $matrix;
}
=head2 logger
Arg[1] : (optional) Bio::EnsEMBL::Utils::Logger - the logger to set
Example : $object->logger->info("Starting ID mapping.\n");
Description : Getter/setter for logger object
Return type : Bio::EnsEMBL::Utils::Logger
Exceptions : none
Caller : constructor
Status : At Risk
: under development
=cut
sub logger {
my $self = shift;
$self->{'_logger'} = shift if (@_);
return $self->{'_logger'};
}
=head2 conf
Arg[1] : (optional) Bio::EnsEMBL::Utils::ConfParser - the configuration
to set
Example : my $dumppath = $object->conf->param('dumppath');
Description : Getter/setter for configuration object
Return type : Bio::EnsEMBL::Utils::ConfParser
Exceptions : none
Caller : constructor
Status : At Risk
: under development
=cut
sub conf {
my $self = shift;
$self->{'_conf'} = shift if (@_);
return $self->{'_conf'};
}
=head2 cache
Arg[1] : (optional) Bio::EnsEMBL::IdMapping::Cache - the cache to set
Example : $object->cache->read_from_file('source');
Description : Getter/setter for cache object
Return type : Bio::EnsEMBL::IdMapping::Cache
Exceptions : none
Caller : constructor
Status : At Risk
: under development
=cut
sub cache {
my $self = shift;
$self->{'_cache'} = shift if (@_);
return $self->{'_cache'};
}
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment