-
Andy Yates authored
[ENSCORESW-550]. Made the pipeline run on the new farm infrastructure by editting memory requirements. The original version had memory requirements in KB. New farm infrastructure says we specify in MB. This is good so now all memory requirement params are all in the same order of magnitude. The final collection job now specifies its memory requirements. This is a good thing again. Currently this is set to 5MB.
29afdeac
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
SubmitMapper.pm 30.21 KiB
package XrefMapper::SubmitMapper;
use strict;
use vars '@ISA';
@ISA = qw{ XrefMapper::BasicMapper };
use warnings;
use XrefMapper::BasicMapper;
use Cwd;
use DBI;
use File::Basename;
use IPC::Open3;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
##################################################################
# JOB 1 Do exonerate jobs and get core info
##################################################################
# Also load database with connection details of the core database for use later in JOBS
# One get all info from core that is needed. :- stable_id's etc.
# Process the direct xrefs by putting them in the object_xref table
# dump the fasta files
# submit exonerate jobs (fill tables mapping, mapping_jobs) if not already done
# submit coordinate xrefs if not processed
sub new {
my($class, $mapper) = @_;
my $self ={};
bless $self,$class;
$self->core($mapper->core);
$self->xref($mapper->xref);
$self->mapper($mapper);
$self->verbose($mapper->verbose);
$self->nofarm($mapper->nofarm);
return $self;
}
sub mapper{
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_mapper} = $arg );
return $self->{_mapper};
}
sub store_core_database_details{
my ($self, $port, $user, $pass, $dbname, $dir);
}
##############################################################################
# dump fasta files code
##############################################################################
=head2 dump_seqs
Arg[1]: xref object which holds info needed for the dump of xref
Description: Dumps out the files for the mapping. Xref object should hold
the value of the databases and source to be used.
Returntype : none
Exceptions : will die if species not known or an error occurs while
: trying to write to files.
Caller : general
=cut
sub dump_seqs{
my ($self, $location) = @_;
$self->core->dbc->disconnect_if_idle(1);
$self->core->dbc->disconnect_when_inactive(1);
$self->dump_xref();
$self->core->dbc->disconnect_if_idle(0);
$self->core->dbc->disconnect_when_inactive(0);
$self->xref->dbc->disconnect_when_inactive(1);
$self->xref->dbc->disconnect_if_idle(1);
$self->dump_ensembl($location);
$self->xref->dbc->disconnect_if_idle(0);
$self->xref->dbc->disconnect_when_inactive(0);
}
sub no_dump_xref {
my ($self) = @_;
my @method= @{$self->get_stored_methods()};
$self->method(\@method);
$self->core->dna_file($self->core->dir."/".$self->core->species."_dna.fasta");
$self->core->protein_file($self->core->dir."/".$self->core->species."_protein.fasta");
}
=head2 dump_xref
Arg[1]: xref object which holds info on method and files.
Description: Dumps the Xref data as fasta file(s)
Returntype : none
Exceptions : none
Caller : dump_seqs
=cut
=head2 get_stored_methods
Description: Retrieves exonerate methods stored in the source_mapping_method table in the xref database
Returntype : arrayref
Exceptions : none
Caller : no_dump_xref
=cut
sub get_stored_methods {
my ($self) = @_;
my $sth = $self->xref()->dbc->prepare("select distinct method from source_mapping_method order by method");
$sth->execute();
my $method;
my @methods;
$sth->bind_columns(\$method);
while($sth->fetch()){
push(@methods,$method);
}
$sth->finish;
return \@methods;
}
sub dump_xref{
my ($self) = @_;
my $xref =$self->xref();
if(!defined($xref->dir())){
if(defined($self->dir)){
$xref->species($self->dir);
$self->species_id($self->get_id_from_species_name($self->species));
}
else{
$xref->dir(".");
}
}
#we will populate @method with all exonerate methods which will be used to perform sequence mapping
my @method;
my ($default_method, $override_method_for_source);
if($self->mapper->can('set_methods')){
($default_method, $override_method_for_source) = $self->mapper->set_methods();
}
else {
($default_method, $override_method_for_source) = $self->set_methods();
}
push(@method, $default_method);
print "Default exonerate method is $default_method\n" if($self->verbose);
# %override_method_for_source is keyed on exonerate methods; values are arrays of xref sources
my %override_method_for_source= %{$override_method_for_source};
#array which holds all source ids, for which the default exonerate method was overriden
my @all_source_ids;
#similar hash to %override_method_for_source but instead of array of source names as values, it contains array of
#corresponding source ids as values
my %methods_and_source_ids;
my %source_mapping_method;
#only sources which have xrefs in the primary_xref table (which stores sequences) are relevant here
#multiple sources can have the same name but different priority_descriptions so
#one name can map to multiple ids
#%source_ids is keyed on source name, values are arrays of source ids - populate it with all
#sources which have xrefs in primary_xref table
my %source_ids;
my $source_sth = $xref->dbc->prepare("select distinct name, source_id from primary_xref join xref using(xref_id) join source using(source_id) order by name;");
$source_sth->execute();
while ( my ($name, $source_id) = $source_sth->fetchrow_array() ){
push (@{$source_ids{$name}}, $source_id);
}
$source_sth->finish();
if (%override_method_for_source) {
print "Default exonerate method overridden for some sources\n" if($self->verbose);
foreach my $method (keys %override_method_for_source) {
#convert source names to source_ids
my @source_names = @{$override_method_for_source{$method}};
my $a_source_exists = 0;
foreach my $source_name (@source_names) {
#a source name should be defined only once against one exonerate method only
if (exists($source_mapping_method{$source_name})) {
die "$source_name source name defined more than once in set_methods method (SubmitMapper method which can be overriden in species.pm module)\n";
}
$source_mapping_method{$source_name} = $method;
my @source_ids = @{$source_ids{$source_name}} if (exists($source_ids{$source_name}));
if (@source_ids) {
$a_source_exists = 1;
foreach my $source_id (@source_ids) {
push @{$methods_and_source_ids{$method}}, $source_id;
push @all_source_ids, $source_id;
}
} else {
print "WARNING: source id for $source_name not found. Xrefs for this source will not be sequence mapped. There are no xrefs from this source in the primary_xref table.\n" if($self->verbose);
}
}
#if we found at least one source id with xrefs to be mapped using $method, add $method to @method
if ($a_source_exists) {
push(@method, $method);
}
}
}
#store source_ids and mapping methods in source_mapping_method table
my $insert_src_method_sth = $xref->dbc->prepare("insert into source_mapping_method values(?,?)");
foreach my $source_name (keys %source_ids){
my $method;
if (exists($source_mapping_method{$source_name})){
$method = $source_mapping_method{$source_name};
} else {
$method = $default_method;
}
foreach my $source_id (@{$source_ids{$source_name}}){
$insert_src_method_sth->execute($source_id,$method);
print "Will use $method method for source id $source_id, $source_name\n" if($self->verbose);
}
}
$insert_src_method_sth->finish();
$self->method(\@method);
if(defined($self->mapper->dumpcheck())){
my $skip = 1;
foreach my $method (@method){
if(!-e $xref->dir()."/xref_".$method."_dna.fasta"){
$skip = 0;
}
if(!-e $xref->dir()."/xref_".$method."_peptide.fasta"){
$skip = 0;
}
}
if($skip){
print "Xref fasta files found and will be used (No new dumping)\n" if($self->verbose);
return;
}
}
print "Dumping Xref fasta files\n" if($self->verbose());
foreach my $method (@method){
for my $sequence_type ('dna', 'peptide') {
my $filename = $xref->dir() . "/xref_".$method."_" . $sequence_type . ".fasta";
open( my $DH,">", $filename) || die "Could not open $filename";
my $sql = "SELECT p.xref_id, p.sequence, x.species_id , x.source_id ";
$sql .= " FROM primary_xref p, xref x ";
$sql .= " WHERE p.xref_id = x.xref_id AND ";
$sql .= " p.sequence_type ='" . $sequence_type ."' ";
#for the dafault method don't select sources for which the method was overriden
if ($method eq $default_method && scalar(@all_source_ids) > 0 ) {
$sql .= "AND x.source_id not in (" . join(',',@all_source_ids).")";
}
#for a non dafault method only select sources which should have their xrefs mapped using this method
if ($method ne $default_method) {
$sql .= "AND x.source_id in (" . join(',',@{$methods_and_source_ids{$method}}).")";
}
my $sth = $xref->dbc->prepare($sql);
$sth->execute();
while(my @row = $sth->fetchrow_array()){
$row[1] =~ s/(.{60})/$1\n/g;
print $DH ">".$row[0]."\n".$row[1]."\n";
}
close $DH;
$sth->finish();
}
}
my $sth = $xref->dbc->prepare("insert into process_status (status, date) values('xref_fasta_dumped',now())");
$sth->execute();
$sth->finish;
return;
}
=head2 dump_ensembl
Description: Dumps the ensembl data to a file in fasta format.
Returntype : none
Exceptions : none
Caller : dump_seqs
=cut
sub dump_ensembl{
my ($self) = @_;
my $num_seqs = 0;
my $sth = $self->core->dbc->prepare("select count(distinct (seq_region_id)) from gene");
$sth->execute;
$sth->bind_columns(\$num_seqs);
$sth->fetch;
$sth->finish;
my $num_genes;
$sth = $self->core->dbc->prepare("select count(1) from gene");
$sth->execute;
$sth->bind_columns(\$num_genes);
$sth->fetch;
$sth->finish;
# my $ignore_genes = $self->fetch_alt_allele_list();
# comment out until we know what we want to do with alt alleles.
my %temp_hash;
my $ignore_genes = \%temp_hash;
if ($num_genes < $num_seqs) {
$self->fetch_and_dump_seq_via_genes($ignore_genes);
}
else {
$self->fetch_and_dump_seq_via_toplevel($ignore_genes);
}
}
sub fetch_alt_allele_list {
my ($self) = @_;
my %allele;
my $sth = $self->xref->dbc->prepare(" select gene_id from alt_allele where is_reference = 0");
$sth->execute();
my $gene_id;
$sth->bind_columns(\$gene_id);
while ($sth->fetch()){
$allele{$gene_id} = 1;
}
return \%allele;
}
sub fetch_and_dump_seq_via_toplevel{
my ($self, $ignore_genes) = @_;
my $inc_dupes = 0; # do not include duplicate regions like PARs?
my $inc_nonref = 1; # include non-reference regions like haplotypes?
my $ensembl = $self->core;
$self->add_meta_pair("dump_method","fetch_and_dump_seq_via_toplevel");
#
# store ensembl dna file name and open it
#
if(!defined($ensembl->dir())){
$ensembl->dir(".");
}
$ensembl->dna_file($ensembl->dir."/".$ensembl->species."_dna.fasta");
#
# store ensembl protein file name and open it
#
$ensembl->protein_file($ensembl->dir."/".$ensembl->species."_protein.fasta");
if(defined($self->mapper->dumpcheck()) and -e $ensembl->protein_file() and -e $ensembl->dna_file()){
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
$sth->execute();
print "Ensembl Fasta files found (no new dumping)\n" if($self->verbose());
return;
}
print "Dumping Ensembl Fasta files\n" if($self->verbose());
open(my $dnah,">", $ensembl->dna_file())
|| die("Could not open dna file for writing: ".$ensembl->dna_file."\n");
open(my $peph, ">", $ensembl->protein_file())
|| die("Could not open protein file for writing: ".$ensembl->protein_file."\n");
my $db = new Bio::EnsEMBL::DBSQL::DBAdaptor(-dbconn => $ensembl->dbc);
my $slice_adaptor = $db->get_SliceAdaptor();
my $slices = $slice_adaptor->fetch_all( 'toplevel', undef, $inc_nonref, $inc_dupes ) || [];
my $script_count = 0;
my $lation_count = 0;
my $ama = $slice_adaptor->db()->get_AssemblyMapperAdaptor();
my $seqa = $slice_adaptor->db()->get_SequenceAdaptor();
while(my $slice = shift @$slices){
#get genes from this slice
my @genes = sort { $a->start() <=> $b->start() }
@{$slice->get_all_Genes(undef, undef, 1, undef, undef)};
# my $genes = $slice->get_all_Genes(undef,undef,1);
# while(my $gene = shift @$genes){
while(my $gene = shift @genes){
next if $gene->biotype eq 'J_segment';
next if $gene->biotype eq 'D_segment';
next if $ignore_genes->{$gene->dbID};
foreach my $transcript (@{$gene->get_all_Transcripts()}) {
my $seq = $transcript->spliced_seq();
$seq =~ s/(.{60})/$1\n/g;
print $dnah ">" . $transcript->dbID() . "\n" .$seq."\n" || die "Error writing for transcript ".$transcript->dbID."\n$!\n";
$script_count++;
my $trans = $transcript->translation();
my $translation = $transcript->translate();
if(defined($translation)){
my $pep_seq = $translation->seq();
$pep_seq =~ s/(.{60})/$1\n/g;
print $peph ">".$trans->dbID()."\n".$pep_seq."\n" || die "Error writing for translation ".$trans->dbID."\n$!\n";
$lation_count++;
}
}
}
# Zap caches!
%{ $slice_adaptor->{'sr_name_cache'} } = ();
%{ $slice_adaptor->{'sr_id_cache'} } = ();
$ama->delete_cache();
%{ $seqa->{'seq_cache'} } = ();
}
close $dnah || die "unable to close dna file\n$!\n";
close $peph || die "unable to close peptide file\n$!\n";
sleep(10); # give the disks a chance.
#####################################################
# Sanity check as some of the disks get timeouts etc.
#####################################################
my $line = 'grep "^>" '.$ensembl->dna_file().' | wc -l';
my $out = `$line`;
chomp $out;
if($out != $script_count){
die "Problem writing DNA file as there should be $script_count entries but file has only $out\n";
}
$line = 'grep "^>" '.$ensembl->protein_file().' | wc -l';
$out = `$line`;
chomp $out;
if($out != $lation_count){
die "Problem writing PEPTIDE file as there should be $lation_count entries but file has only $out\n";
}
print $script_count ." Transcripts dumped ". $lation_count. " Transaltions dumped\n" if($self->verbose);
#######################################
# Update the database about the status.
#######################################
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
$sth->execute();
$sth->finish;
}
=head2 fetch_and_dump_seq_via_genes
Description: Dumps the ensembl data to a file in fasta format.
Returntype : none
Exceptions : wil die if the are errors in db connection or file creation.
Caller : dump_ensembl
=cut
sub fetch_and_dump_seq_via_genes{
my ($self,$ignore_genes) = @_;
my $ensembl = $self->core;
my $db = new Bio::EnsEMBL::DBSQL::DBAdaptor(-dbconn => $ensembl->dbc);
my $slice_adaptor = $db->get_SliceAdaptor();
my $ama = $db->get_AssemblyMapperAdaptor();
my $seqa = $db->get_SequenceAdaptor();
my $gene_adaptor = $db->get_GeneAdaptor();
#
# store ensembl dna file name and open it
#
if(!defined($ensembl->dir())){
$ensembl->dir(".");
}
$ensembl->dna_file($ensembl->dir."/".$ensembl->species."_dna.fasta");
#
# store ensembl protein file name and open it
#
$ensembl->protein_file($ensembl->dir."/".$ensembl->species."_protein.fasta");
$self->add_meta_pair("dump_method","fetch_and_dump_seq_via_genes");
if(defined($self->mapper->dumpcheck()) and -e $ensembl->protein_file() and -e $ensembl->dna_file()){
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
$sth->execute();
print "Ensembl Fasta files found (no new dumping)\n" if($self->verbose());
return;
}
print "Dumping Ensembl Fasta files\n" if($self->verbose());
open(my $dnah, ">", $ensembl->dna_file())
|| die("Could not open dna file for writing: ".$ensembl->dna_file."\n");
open(my $peph, ">", $ensembl->protein_file())
|| die("Could not open protein file for writing: ".$ensembl->protein_file."\n");
# fetch by location, or everything if not defined
my @genes;
my $constraint;
# TEST PURPOSES ONLY#################################################
#####################################################################
@genes = @{$gene_adaptor->fetch_all()};
# push @genes, $gene_adaptor->fetch_by_stable_id("ENSG00000139618");
#####################################################################
# my $max = undef;
my $script_count=0;
my $lation_count=0;
foreach my $gene (@genes){
next if $gene->biotype eq 'J_segment';
next if $gene->biotype eq 'D_segment';
next if $ignore_genes->{$gene->dbID};
foreach my $transcript (@{$gene->get_all_Transcripts()}) {
my $seq = $transcript->spliced_seq();
$seq =~ s/(.{60})/$1\n/g;
print $dnah ">" . $transcript->dbID() . "\n" .$seq."\n" || die "Error writing for transcript ".$transcript->dbID."\n$!\n";
$script_count++;
my $trans = $transcript->translation();
my $translation = $transcript->translate();
if(defined($translation)){
my $pep_seq = $translation->seq();
$pep_seq =~ s/(.{60})/$1\n/g;
print $peph ">".$trans->dbID()."\n".$pep_seq."\n" || die "Error writing for translation ".$trans->dbID."\n$!\n";
$lation_count++;
}
}
# Zap caches!
%{ $slice_adaptor->{'sr_name_cache'} } = ();
%{ $slice_adaptor->{'sr_id_cache'} } = ();
$ama->delete_cache();
%{ $seqa->{'seq_cache'} } = ();
}
close $dnah || die "unable to close dna file\n$!\n";
close $peph || die "unable to close peptide file\n$!\n";
sleep(10);
#####################################################
# Sanity check as some of the disks get timeouts etc.
#####################################################
my $line = 'grep "^>" '.$ensembl->dna_file().' | wc -l';
my $out = `$line`;
chomp $out;
if($out != $script_count){
die "Problem writing DNA file as there should be $script_count entries but file has only $out\n";
}
$line = 'grep "^>" '.$ensembl->protein_file().' | wc -l';
$out = `$line`;
chomp $out;
if($out != $lation_count){
die "Problem writing PEPTIDE file as there should be $lation_count entries but file has only $out\n";
}
print $script_count ." Transcripts dumped ". $lation_count. " Transaltions dumped\n" if($self->verbose);
#######################################
# Update the database about the status.
#######################################
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
$sth->execute();
$sth->finish;
}
=head2 set_methods
Description: Specifies the default exonerate method and non default methods which should be used for
one or more sources.
Returns a string containing the default method and a hash refrence.
The hash key is an exonerate method, the corresponding value is an array of source names
whose xrefs are to be mapped using the exonerate method in the hash key.
Multiple sources can be matched but only those with xrefs in the primary_xref table
which holds sequence data are considered. If a particular source is not found, a warning
will be written out by the caller.
This method can be overriden in species.pm
Returntype : string, hash of arrays
Example : my ($default_method, $override_method_for_source) = $self->set_methods();
Exceptions : none
Caller : dump_xref
=cut
sub set_methods{
my $default_method = 'ExonerateGappedBest1';
my %override_method_for_source = ( ExonerateGappedBest_100_perc_id => ['Uniprot/SWISSPROT', 'Uniprot/SPTREMBL'] ,
ExonerateGappedBest5 => ['RefSeq_mRNA','RefSeq_mRNA_predicted', 'RefSeq_ncRNA', 'RefSeq_ncRNA_predicted' ],
);
return $default_method, \%override_method_for_source;
}
###################################################################################################
# exonerate subs
###################################################################################################
=head2 jobcount
Arg [1] : (optional)
Example : $mapper->jobcount(1004);
Description: Getter / Setter for number of jobs submitted.
Returntype : scalar
Exceptions : none
=cut
sub jobcount {
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_jobcount} = $arg );
return $self->{_jobcount};
}
sub method{
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_method} = $arg );
return $self->{_method};
}
=head2 build_list_and_map
Arg[1]: xref object which holds info on method and files.
Description: runs the mapping of the list of files with species methods
Returntype : none
Exceptions : none
Caller : general
=cut
sub build_list_and_map {
my ($self) = @_;
my @list=();
foreach my $method (@{$self->method()}){
my @dna=();
my $q_dna_file = $self->xref->dir."/xref_".$method."_dna.fasta";
if (-e $q_dna_file and -s $q_dna_file) {
push @dna, $method;
push @dna, $q_dna_file;
push @dna, $self->core->dna_file();
push @list, \@dna;
}
my @pep=();
my $q_pep_file = $self->xref->dir."/xref_".$method."_peptide.fasta";
if (-e $q_pep_file and -s $q_pep_file) {
push @pep, $method;
push @pep, $self->xref->dir."/xref_".$method."_peptide.fasta";
push @pep, $self->core->protein_file();
push @list, \@pep;
}
}
$self->run_mapping(\@list);
}
=head2 fix_mappings
Example : none
Description: submit mapping jobs to LSF for those that had problems, and wait for them to finish.
Returntype : none
Exceptions : none
Caller : general
=cut
sub fix_mappings {
my $self = shift;
# 1) find the broken jobs
# 2) remove the object_xrefs from object_xref_start to object_xref_end
# Plus identity xrefs and go_xrefs for these
my $rm_object_xrefs_sth = $self->xref->dbc->prepare("delete o from object_xref o where object_xref_id >= ? and object_xref_id <= ?");
my $rm_identity_xrefs_sth = $self->xref->dbc->prepare("delete i from identity_xref i where object_xref_id >= ? and object_xref_id <= ?");
my $rm_go_xrefs_sth = $self->xref->dbc->prepare("delete g from go_xref g where object_xref_id >= ? and object_xref_id <= ?");
my $reset_object_xref_limits_sth = $self ->xref->dbc->prepare("update mapping_jobs set object_xref_start = null, object_xref_end = null where job_id = ? and array_number = ?");
# 3) remove map, err and out files for each
# 4) rerun the mapping jobs SETTING NEW DATA IN mapping_jobs
my $sql = 'SELECT mapping.job_id, map_file, out_file, err_file, object_xref_start, object_xref_end, array_number, command_line, method, root_dir from mapping_jobs, mapping where mapping.job_id = mapping_jobs.job_id and status = "FAILED"';
my $sth = $self->xref->dbc->prepare($sql);
$sth->execute();
my ($job_id, $map_file, $out_file, $err_file, $object_xref_start, $object_xref_end, $array_number, $command_line, $method, $root_dir);
$sth->bind_columns(\$job_id, \$map_file, \$out_file, \$err_file, \$object_xref_start, \$object_xref_end, \$array_number, \$command_line, \$method, \$root_dir);
my @running_methods;
my @job_names;
while ($sth->fetch){
my $start = undef;
my $end = undef;
$command_line =~ s/\$LSB_JOBINDEX/$array_number/g;
if(defined($object_xref_start) and $object_xref_start){
$start = $object_xref_start;
}
if(defined($object_xref_end) and $object_xref_end){
$end = $object_xref_end;
}
if(!defined($start) and !defined($end)){
}
elsif(!defined($start) or !defined($end)){
print STDERR "Could not clean up for ".$job_id."[".$array_number."]\n";
next;
}
else{ # REMOVE the entries
print "removing object_xref etc from $start to $end\n";
$rm_object_xrefs_sth->execute($start, $end);
$rm_identity_xrefs_sth->execute($start, $end);
$rm_go_xrefs_sth->execute($start, $end);
$reset_object_xref_limits_sth->execute($job_id, $array_number);
}
unlink($root_dir."/".$map_file);
unlink($root_dir."/".$out_file);
unlink($root_dir."/".$err_file);
# Run the Mapping.
my $obj_name = "XrefMapper::Methods::$method";
# check that the appropriate object exists
eval "require $obj_name";
if($@) {
warn("Could not find object $obj_name corresponding to mapping method $method, skipping\n$@");
} else {
my $obj = $obj_name->new($self->mapper);
print "DO resubmit for $array_number\n";
my $job_name = $obj->resubmit_exonerate($self->mapper, $command_line, $out_file, $err_file, $job_id, $array_number, $root_dir);
print "Job name is $job_name\n";
push @job_names, $job_name;
push @running_methods, $obj;
my $sth = $self->mapper->xref->dbc->prepare('update mapping_jobs set status = "SUBMITTED"'." where job_id = $job_id and array_number = $array_number");
$sth->execute();
$sth->finish;
sleep 1; # make sure unique names really are unique
$self->jobcount(1);
}
}
$sth->finish;
# submit depend job to wait for all mapping jobs
foreach my $method( @running_methods ){
# Submit all method-specific depend jobs
if( $method->can('submit_depend_job') ){
$method->submit_depend_job;
}
}
# Submit generic depend job. Defaults to LSF IF any exist.
if(defined($job_names[0])){
$self->submit_depend_job($self->core->dir, @job_names);
}
$self->check_err($self->core->dir);
}
=head2 run_mapping
Arg[1] : List of lists of (method, query, target)
Arg[2] :
Example : none
Description: Create and submit mapping jobs to LSF, and wait for them to finish.
Returntype : none
Exceptions : none
Caller : general
=cut
sub run_mapping {
my ($self, $lists) = @_;
# delete old output files in target directory if we're going to produce new ones
my $dir = $self->core->dir();
print "Deleting out, err and map files from output dir: $dir\n" if($self->verbose());
unlink (glob("$dir/*.map"));
unlink (glob("$dir/*.out"));
unlink (glob("$dir/*.err"));
$self->remove_all_old_output_files();
#disconnect so that we can then reconnect after the long mapping bit.
$self->core->dbc->disconnect_if_idle(1);
$self->xref->dbc->disconnect_if_idle(1);
$self->core->dbc->disconnect_when_inactive(1);
$self->xref->dbc->disconnect_when_inactive(1);
# foreach method, submit the appropriate job & keep track of the job name
# note we check if use_existing_mappings is set here, not earlier, as we
# still need to instantiate the method object in order to fill
# method_query_threshold and method_target_threshold
my @job_names;
my @running_methods;
foreach my $list (@$lists){
my ($method, $queryfile ,$targetfile) = @$list;
my $obj_name = "XrefMapper::Methods::$method";
# check that the appropriate object exists
eval "require $obj_name";
if($@) {
warn("Could not find object $obj_name corresponding to mapping method $method, skipping\n$@");
} else {
my $obj = $obj_name->new($self->mapper);
my $job_name = $obj->run($queryfile, $targetfile, $self);
push @job_names, $job_name;
push @running_methods, $obj;
sleep 1; # make sure unique names really are unique
$self->jobcount(($self->jobcount||0)+$obj->jobcount);
}
} # foreach method
# submit depend job to wait for all mapping jobs
foreach my $method( @running_methods ){
# Submit all method-specific depend jobs
if( $method->can('submit_depend_job') ){
$method->submit_depend_job;
}
}
# Submit generic depend job. Defaults to LSF
$self->submit_depend_job($self->core->dir, @job_names);
$self->core->dbc->disconnect_if_idle(0);
$self->xref->dbc->disconnect_if_idle(0);
$self->core->dbc->disconnect_when_inactive(0);
$self->xref->dbc->disconnect_when_inactive(0);
$self->check_err($self->core->dir);
} # run_mapping
sub nofarm{
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_nofarm} = $arg );
return $self->{_nofarm};
}
sub check_err {
my ($self, $dir) = @_;
foreach my $err (glob("$dir/*.err")) {
print STDERR "\n\n*** Warning: $err has non-zero size; may indicate".
" problems with exonerate run\n\n\n" if (-s $err);
}
}
=head2 submit_depend_job
Arg[1] : List of job names.
Arg[2] :
Example : none
Description: Submit an LSF job that waits for other jobs to finish.
Returntype : none
Exceptions : none
Caller : general
=cut
sub submit_depend_job {
my ($self, $root_dir, @job_names) = @_;
if(defined($self->nofarm)){
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('mapping_finished',now())");
$sth->execute();
$sth->finish;
return;
}
# Submit a job that does nothing but wait on the main jobs to
# finish. This job is submitted interactively so the exec does not
# return until everything is finished.
# build up the bsub command; first part
# my @depend_bsub = ('bsub', '-K');
# build -w 'ended(job1) && ended(job2)' clause
my $ended_str = '-w "';
my $i = 0;
foreach my $job (@job_names) {
$ended_str .= "ended($job)";
$ended_str .= " && " if ($i < $#job_names);
$i++;
}
$ended_str .= '"';
# push @depend_bsub, $ended_str;
# rest of command
my $queue = $self->mapper->farm_queue || 'small';
# push @depend_bsub, ('-q', $queue, '-o', "$root_dir/depend.out", '-e', "$root_dir/depend.err");
my $jobid = 0;
my $memory_resources = q{-M 5 -R"select[mem>5] rusage[mem=5]"}
my $com = "bsub -K -q $queue $memory_resources -o $root_dir/depend.out -e $root_dir/depend.err $ended_str /bin/true";
my $line = `$com`;
if ($line =~ /^Job <(\d+)> is submitted/) {
$jobid = $1;
print "LSF job ID for Depend job: $jobid (job array with 1 job)\n";
}
if (!$jobid) {
# Something went wrong
warn("Job submission failed:\n$@\n");
}
else{
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('mapping_finished',now())");
$sth->execute();
$sth->finish;
}
}
sub remove_all_old_output_files{
my ($self) =@_;
my $dir = $self->core->dir();
print "Deleting txt and sql files from output dir: $dir\n" if($self->verbose);
unlink(glob("$dir/*.txt $dir/*.sql"));
# $self->cleanup_projections_file(); # now to be done when we load core.
}
1;