Skip to content
Snippets Groups Projects
Commit 86cc763f authored by Ian Longden's avatar Ian Longden
Browse files

new xref mapping code. Added for safety and will not affect the old code yet

parent 2ab43b85
No related branches found
No related tags found
No related merge requests found
package XrefMapper::CoreInfo;
use vars '@ISA';
@ISA = qw{ XrefMapper::BasicMapper };
use strict;
use warnings;
use XrefMapper::BasicMapper;
use Cwd;
use DBI;
use File::Basename;
use IPC::Open3;
# Get info from the core database.
# Need to load tables:-
#
# gene_transcript_translation
# gene_stable_id
# transcript_stable_id
# translation_stable_id
# Also for Direct xref process these and add to object_xref
# also add dependents while we are at it.
sub new {
my($class, $mapper) = @_;
my $self ={};
bless $self,$class;
$self->core($mapper->core);
$self->xref($mapper->xref);
return $self;
}
sub test_return_codes {
my $self = shift;
my $sth = $self->xref->dbc->prepare("create table arse_test like object_xref");
$sth->execute();
$sth->finish;
my $ins_ox_ignore = $self->xref->dbc->prepare("insert into arse_test (object_xref_id, ensembl_id, xref_id, ensembl_object_type, linkage_type) values(?, ?,?,?,?)");
local $ins_ox_ignore->{RaiseError};
my $object_xref = 1;
my $out = $ins_ox_ignore->execute(1, 1, 1, "Gene", 'DIRECT');
print "one failed: $@\n" if $@;
print "successfull insert = $out\n";
if($ins_ox_ignore->err){
print $ins_ox_ignore->errstr."\n";
}
$out = $ins_ox_ignore->execute(2, 1, 1, "Gene", 'DIRECT');
if($ins_ox_ignore->err){
my $err = $ins_ox_ignore->errstr;
if($err =~ /Duplicate/){
print "2 not added becouse duplicate (correct)\n";
}
else{
die "Problem loading error is $err\n";
}
}
else{
print "AHHHHHHH insert 2 okay\n";
}
#same primary key should blow up
$out = $ins_ox_ignore->execute(1, 3, 3, "RUBBISH");
if($ins_ox_ignore->err){
my $err = $ins_ox_ignore->errstr;
if($err =~ /Duplicate/){
print "Not added becouse duplicate (WRNG)\n";
}
else{
$ins_ox_ignore->finish;
die "Problem loading data error is $err\n";
}
}
print "END\n";
}
sub get_core_data {
my $self = shift;
# gene_transcript_translation
# gene_stable_id
# transcript_stable_id
# translation_stable_id
my $object_xref_id;
my $ox_sth = $self->xref->dbc->prepare("select max(object_xref_id) from object_xref");
$ox_sth->execute();
$ox_sth->bind_columns(\$object_xref_id);
$ox_sth->fetch();
$ox_sth->finish;
# load table gene_transcript_translation
my $ins_sth = $self->xref->dbc->prepare("insert into gene_transcript_translation (gene_id, transcript_id, translation_id) values (?, ?, ?)");
my $sql = "select tn.gene_id, tn.transcript_id, tl.translation_id from transcript tn left join translation tl on tl.transcript_id = tn.transcript_id";
my $sth = $self->core->dbc->prepare($sql);
$sth->execute();
my ($gene_id, $transcript_id, $translation_id);
$sth->bind_columns(\$gene_id, \$transcript_id, \$translation_id);
while($sth->fetch()){
$ins_sth->execute($gene_id, $transcript_id, $translation_id);
}
$ins_sth->finish;
$sth->finish;
# load table xxx_stable_id
my ($id, $stable_id);
foreach my $table (qw(gene transcript translation)){
my $sth = $self->core->dbc->prepare("select ".$table."_id, stable_id from ".$table."_stable_id");
my $ins_sth = $self->xref->dbc->prepare("insert into ".$table."_stable_id (internal_id, stable_id) values(?, ?)");
$sth->execute();
$sth->bind_columns(\$id, \$stable_id);
while($sth->fetch){
$ins_sth->execute($id, $stable_id);
}
$ins_sth->finish;
$sth->finish;
}
$sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_data_loaded',now())");
$sth->execute();
$sth->finish;
# Now process the direct xrefs and add data to the object xrefs remember dependent xrefs.
my $ins_ox_sth = $self->xref->dbc->prepare("insert into object_xref (object_xref_id, ensembl_id, xref_id, ensembl_object_type, linkage_type) values(?, ?,?,?,?)");
local $ins_ox_sth->{RaiseError}; # want to see duplicates and not add de
local $ins_ox_sth->{PrintError};
my $ins_go_sth = $self->xref->dbc->prepare("insert into go_xref (object_xref_id, linkage_type, source_xref_id) values(?,?,?)");
my $dep_sth = $self->xref->dbc->prepare("select dependent_xref_id, linkage_annotation from dependent_xref where master_xref_id = ?");
my $stable_sql=(<<SQL);
SELECT dx.general_xref_id, s.internal_id, dx.ensembl_stable_id
FROM TYPE_direct_xref dx left join TYPE_stable_id s on s.stable_id = dx.ensembl_stable_id
SQL
foreach my $table (qw(gene transcript translation)){
my ($xref_id, $internal_id, $stable_id);
my $sql = $stable_sql;
$sql =~ s/TYPE/$table/g;
my $sth = $self->xref->dbc->prepare($sql);
# print "sql = $sql\n";
$sth->execute();
$sth->bind_columns(\$xref_id, \$internal_id, \$stable_id);
my $count =0;
my $duplicate_direct_count = 0;
my $duplicate_dependent_count = 0;
while($sth->fetch){
if(!defined($internal_id)){ # not found either it is an internal id already or stable_id no longer exists
if($stable_id =~ /^\d+$/){
$internal_id = $stable_id;
}
else{
print "COuld not find stable id $stable_id in table to get the internal id hence ignoring!!!\n";
next;
}
}
$object_xref_id++;
$count++;
my @master_xref_ids;
$ins_ox_sth->execute($object_xref_id, $internal_id, $xref_id, $table, 'DIRECT');
if($ins_ox_sth->err){
$duplicate_direct_count++;
next; #duplicate
}
else{
push @master_xref_ids, $xref_id;
}
while(my $master_xref_id = pop(@master_xref_ids)){
my ($dep_xref_id, $link);
$dep_sth->execute($master_xref_id);
$dep_sth->bind_columns(\$dep_xref_id, \$link);
while($dep_sth->fetch){
$object_xref_id++;
$ins_ox_sth->execute($object_xref_id, $internal_id, $dep_xref_id, $table, 'DEPENDENT');
if($ins_ox_sth->err){
my $err = $ins_ox_sth->errstr;
if($err =~ /Duplicate/){
$duplicate_dependent_count++;
next;
}
else{
die "Problem loading error is $err\n";
}
}
push @master_xref_ids, $dep_xref_id; # get the dependent, dependents just in case
if(defined($link) and $link ne ""){ # we have a go term linkage type
$ins_go_sth->execute($object_xref_id, $link, $master_xref_id);
}
}
}
}
$sth->finish;
if($duplicate_direct_count or $duplicate_dependent_count){
print "duplicate entrys ignored for $duplicate_direct_count direct xrefs and $duplicate_dependent_count dependent xrefs\n";
}
print $count." direct_xrefs added to ensembl ".$table."s\n";
}
$ins_go_sth->finish;
$ins_ox_sth->finish;
$dep_sth->finish;
$sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('direct_xrefs_parsed',now())");
$sth->execute();
$sth->finish;
}
1;
package XrefMapper::LoadMapper;
use vars '@ISA';
@ISA = qw{ XrefMapper::BasicMapper };
use strict;
use warnings;
use XrefMapper::BasicMapper;
use Cwd;
use DBI;
use File::Basename;
use IPC::Open3;
sub update_core {
}
1;
package XrefMapper::ProcessMappings;
use vars '@ISA';
@ISA = qw{ XrefMapper::BasicMapper };
use strict;
use warnings;
use XrefMapper::BasicMapper;
use Cwd;
use DBI;
use File::Basename;
use IPC::Open3;
##################################################################
# JOB 2
##################################################################
# process exonerate mapping file (include checks)
# Save all data to object_xref. Plus remember to add dependents
# process priority xrefs to leave those excluded flagged as such
# Do tests on xref database wrt to core before saving data.
sub new {
my($class, $mapper) = @_;
my $self ={};
bless $self,$class;
$self->core($mapper->core);
$self->xref($mapper->xref);
return $self;
}
sub process_mappings {
my ($self) = @_;
# get the jobs from the mapping table
# for i =1 i < jobnum{
# if( Not parsed already see mapping_jobs){
# check the .err, .out and .map files in that order.
# put data into object_xref, identity_xref, go_xref etc...
# add data to mapping_job table
# }
# }
my %query_cutoff;
my %target_cutoff;
my ($job_id, $percent_query_cutoff, $percent_target_cutoff);
my $sth = $self->xref->dbc->prepare("select job_id, percent_query_cutoff, percent_target_cutoff from mapping");
$sth->execute();
$sth->bind_columns(\$job_id, \$percent_query_cutoff, \$percent_target_cutoff);
while($sth->fetch){
$query_cutoff{$job_id} = $percent_query_cutoff;
$target_cutoff{$job_id} = $percent_target_cutoff;
}
$sth->finish;
my ($root_dir, $map, $status, $out, $err, $array_number);
my ($map_file, $out_file, $err_file);
my $map_sth = $self->xref->dbc->prepare("select root_dir, map_file, status, out_file, err_file, array_number, job_id from mapping_jobs");
$map_sth->execute();
$map_sth->bind_columns(\$root_dir, \$map, \$status, \$out, \$err, \$array_number, \$job_id);
my $already_processed_count = 0;
my $processed_count = 0;
my $error_count = 0;
my $stat_sth = $self->xref->dbc->prepare("update mapping_jobs set status = ? where job_id = ? and array_number = ?");
while($map_sth->fetch()){
my $err_file = $root_dir."/".$err;
my $out_file = $root_dir."/".$out;
my $map_file = $root_dir."/".$map;
if($status eq "SUCCESS"){
$already_processed_count++;
}
else{
if(-s $err_file){
$error_count++;
print "Problem $err_file is non zero\n";
if(open(ERR,"<$err_file")){
while(<ERR>){
print "#".$_;
}
}
else{
print "No file exists $err_file???\n Resubmit this job\n";
}
if($status eq "SUBMITTED"){
$stat_sth->execute('FAILED',$job_id, $array_number);
}
}
else{ #err file checks out so process the mapping file.
if(-e $map_file){
if($self->process_map_file($map_file, $query_cutoff{$job_id}, $target_cutoff{$job_id} ) >= 0){
$processed_count++;
$stat_sth->execute('SUCCESS',$job_id, $array_number);
}
else{
$error_count++;
$stat_sth->execute('FAILED',$job_id, $array_number);
}
}
else{
$error_count++;
print "Could not open file $map_file???\n Resubmit this job\n";
$stat_sth->execute('FAILED',$job_id, $array_number);
}
}
}
}
$map_sth->finish;
$stat_sth->finish;
print "already processed = $already_processed_count, processed = $processed_count, errors = $error_count\n";
if(!$error_count){
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('mapping_processed',now())");
$sth->execute();
}
}
#return number of lines parsed if succesfull. -1 for fail
sub process_map_file{
my ($self, $map_file, $query_cutoff, $target_cutoff) = @_;
my $ret = 1;
my $ensembl_type = "Translation";
if($map_file =~ /_dna_/){
$ensembl_type = "Transcript";
}
if(!open(MAP ,"<$map_file")){
print "Could not open file $map_file\n Resubmit this job??\n";
return -1;
}
my $total_lines = 0;
my $root_dir = $self->core->dir;
my $ins_go_sth = $self->xref->dbc->prepare("insert ignore into go_xref (object_xref_id, linkage_type, source_xref_id) values(?,?,?)");
my $dep_sth = $self->xref->dbc->prepare("select dependent_xref_id, linkage_annotation from dependent_xref where master_xref_id = ?");
my $object_xref_id;
my $sth = $self->xref->dbc->prepare("select max(object_xref_id) from object_xref");
$sth->execute();
$sth->bind_columns(\$object_xref_id);
$sth->fetch();
$sth->finish;
my $object_xref_sth = $self->xref->dbc->prepare("insert into object_xref (object_xref_id, ensembl_id,ensembl_object_type, xref_id, linkage_type, ox_status ) values (?, ?, ?, ?, ?, ?)");
local $object_xref_sth->{RaiseError}; #catch duplicates
local $object_xref_sth->{PrintError}; # cut down on error messages
my $identity_xref_sth = $self->xref->dbc->prepare("insert into identity_xref (object_xref_id, query_identity, target_identity, hit_start, hit_end, translation_start, translation_end, cigar_line, score ) values (?, ?, ?, ?, ?, ?, ?, ?, ?)");
while(<MAP>){
$total_lines++;
chomp();
my ($label, $query_id, $target_id, $identity, $query_length, $target_length, $query_start, $query_end, $target_start, $target_end, $cigar_line, $score) = split(/:/, $_);
# calculate percentage identities
my $query_identity = int (100 * $identity / $query_length);
my $target_identity = int (100 * $identity / $target_length);
my $status = "DUMP_OUT";
if($query_identity < $query_cutoff and $target_identity < $target_cutoff){
$status = "FAILED_CUTOFF";
}
$object_xref_id++;
$object_xref_sth->execute($object_xref_id, $target_id, $ensembl_type, $query_id, 'SEQUENCE_MATCH', $status) ;
if($object_xref_sth->err){
my $err = $object_xref_sth->errstr;
if($err =~ /Duplicate/){
# can get match from same xref and ensembl entity e.g.
# ensembl/ExonerateGappedBest1_dna_569.map:xref:934818:155760:54:1617:9648:73:12:3456:3517: M 61:242
# ensembl/ExonerateGappedBest1_dna_569.map:xref:934818:151735:58:1617:10624:73:6:5329:5397: M 48 D 1 M 19:242
next;
}
else{
die "Problem loading error is $err\n";
}
}
$cigar_line =~ s/ //g;
$cigar_line =~ s/([MDI])(\d+)/$2$1/ig;
$identity_xref_sth->execute($object_xref_id, $query_identity, $target_identity, $query_start+1, $query_end, $target_start+1, $target_end, $cigar_line, $score) || die "Problem loading identity_xref";
my @master_xref_ids;
push @master_xref_ids, $query_id;
while(my $master_xref_id = pop(@master_xref_ids)){
my ($dep_xref_id, $link);
$dep_sth->execute($master_xref_id);
$dep_sth->bind_columns(\$dep_xref_id, \$link);
while($dep_sth->fetch){
$object_xref_id++;
$object_xref_sth->execute($object_xref_id, $target_id, $ensembl_type, $dep_xref_id, 'DEPENDENT', $status);
if($object_xref_sth->err){
my $err = $object_xref_sth->errstr;
if($err =~ /Duplicate/){
# $duplicate_dependent_count++;
next;
}
else{
die "Problem loading error is $err\n";
}
}
push @master_xref_ids, $dep_xref_id; # get the dependent, dependents just in case
if(defined($link) and $link ne ""){ # we have a go term linkage type
$ins_go_sth->execute($object_xref_id, $link, $master_xref_id);
}
}
}
}
close MAP;
$dep_sth->finish;
$ins_go_sth->finish;
$object_xref_sth->finish;
$identity_xref_sth->finish;
return $total_lines;
}
1;
package XrefMapper::ProcessPaired;
use vars '@ISA';
@ISA = qw{ XrefMapper::BasicMapper };
use strict;
use warnings;
use XrefMapper::BasicMapper;
use Cwd;
use DBI;
use File::Basename;
use IPC::Open3;
sub new {
my($class, $mapper) = @_;
my $self ={};
bless $self,$class;
$self->xref($mapper->xref);
return $self;
}
sub process{
my ($self) = @_;
print "Prscess Pairs\n";
my $object_xref_id;
my $sth = $self->xref->dbc->prepare("select MAX(object_xref_id) from object_xref");
$sth->execute;
$sth->bind_columns(\$object_xref_id);
$sth->fetch;
$object_xref_id++;
print "Starting at object_xref of $object_xref_id\n";
my $psth = $self->xref->dbc->prepare("select p.accession1, p.accession2 from pairs p");
my $ox_count_sth = $self->xref->dbc->prepare('select count(1) from object_xref ox, xref x where ox.xref_id = x.xref_id and ox.ox_status = "DUMP_OUT" and x.accession = ?');
my $ox_transcript_sth = $self->xref->dbc->prepare('select gtt.transcript_id from object_xref ox, xref x, gene_transcript_translation gtt where ox.ox_status = "DUMP_OUT" and ox.xref_id = x.xref_id and gtt.translation_id = ox.ensembl_id and x.accession = ?');
my $ox_translation_sth = $self->xref->dbc->prepare('select gtt.translation_id from object_xref ox, xref x, gene_transcript_translation gtt where ox.ox_status = "DUMP_OUT" and ox.xref_id = x.xref_id and gtt.transcript_id = ox.ensembl_id and x.accession = ?');
my $xref_sth = $self->xref->dbc->prepare("select xref_id from xref where accession = ?");
my $ox_insert_sth = $self->xref->dbc->prepare("insert into object_xref (object_xref_id, xref_id, ensembl_id, ensembl_object_type, linkage_type, ox_status) values(?, ?, ?, ?, 'INFERRED_PAIR', 'DUMP_OUT')");
local $ox_insert_sth->{RaiseError}; #catch duplicates
local $ox_insert_sth->{PrintError}; # cut down on error messages
my $ox_get_id_sth = $self->xref->dbc->prepare("select object_xref_id,ox_status from object_xref where xref_id = ? and ensembl_id = ? and ensembl_object_type = ?");
my $ox_update_sth = $self->xref->dbc->prepare('update object_xref set ox_status = "DUMP_OUT", linkage_type = "INFERRED_PAIR" where object_xref_id = ?');
$psth->execute() || die "execute failed";
my ($acc1, $acc2);
$psth->execute();
my $refseq_count = 0;
my %change;
$psth->bind_columns(\$acc1, \$acc2);
while($psth->fetch()){
my $count1;
my $count2;
$ox_count_sth->execute($acc1); # translation alignment
$ox_count_sth->bind_columns(\$count1);
$ox_count_sth->fetch;
$ox_count_sth->execute($acc2); # transcript alignment
$ox_count_sth->bind_columns(\$count2);
$ox_count_sth->fetch;
if(( $count1 and $count2) || (!($count1) and !($count2)) ){
next; # eithr both matched or neither is.
}
if($count1){
#need xref_id for acc2
my $xref_id;
$xref_sth->execute($acc2);
$xref_sth->bind_columns(\$xref_id);
if(!$xref_sth->fetch){
# print "Could not find xref_id for accession $acc2\n";
next;
}
next if(!defined($xref_id));
# print "$acc2\t$xref_id (search using $acc1)\n";
# insert new object_xref
# trap error code. if duplicate then just set linkage_type = "INFERRED_PAIR" and ox_status = "DUMP"
# "maybe" the original failed the cutoff!!! so will have an entry alread but no good.
$ox_transcript_sth->execute($acc1);
my $transcript_id=undef;
$ox_transcript_sth->bind_columns(\$transcript_id);
while($ox_transcript_sth->fetch){
if(defined($transcript_id)){ # remember not all transcripts ahve translations.
$object_xref_id++;
$ox_insert_sth->execute($object_xref_id, $xref_id, $transcript_id, "Transcript") ;
if($ox_insert_sth->err){
my $err = $ox_insert_sth->errstr;
if($err =~ /Duplicate/){
$change{"UPDATE"}++;
# duplicate this can happen as it might have failed the cutoff
# find the old object_xref_id and the update the status's
my $old_object_xref_id=undef;
my $status;
$ox_get_id_sth->execute($xref_id, $transcript_id, "Transcript");
$ox_get_id_sth->bind_columns(\$old_object_xref_id, \$status);
$ox_get_id_sth->fetch();
if($status eq "DUMP_OUT"){
print "Problem status for object_xref_id is DUMP_OUT but this should never happen as it was not found earlier??? (transcript_id = $transcript_id, $xref_id\n";
}
if(!defined($old_object_xref_id)){
die "Duplicate but can't find the original?? xref_id = $xref_id, ensembl_id = $transcript_id, type = Transcript\n";
}
$ox_update_sth->execute($old_object_xref_id)|| die "Could not set update for object_xref_id = $old_object_xref_id";
}
else{
die "Problem loading error is $err\n";
}
}
else{
$change{"NEW"}++;
}
# print "insert $xref_id transcript $transcript_id ........\n";
$refseq_count++;
}
}
}
elsif($count2){
my $xref_id;
$xref_sth->execute($acc1);
$xref_sth->bind_columns(\$xref_id);
if(!$xref_sth->fetch){
# print "Could not find xref_id for accession $acc1\n";
next;
}
next if(!defined($xref_id));
# print "$acc1\t$xref_id (search using $acc2)\n";
# insert new object_xref
# trap error code. if duplicate then just set linkage_type = "INFERRED_PAIR" and ox_status = "DUMP"
# "maybe" the original failed the cutoff!!! so will have an entry alread but no good.
$ox_translation_sth->execute($acc2);
my $translation_id = undef;
$ox_translation_sth->bind_columns(\$translation_id);
while($ox_translation_sth->fetch){
if(defined($translation_id)){ # remember not all transcripts ahve translations.
$object_xref_id++;
$ox_insert_sth->execute($object_xref_id, $xref_id, $translation_id, "Translation") ;
if($ox_insert_sth->err){
$change{"UPDATE"}++;
my $err = $ox_insert_sth->errstr;
if($err =~ /Duplicate/){
# duplicate this can happen as it might have failed the cutoff
# find the old object_xref_id and the update the status's
my $old_object_xref_id=undef;
my $status;
$ox_get_id_sth->execute($xref_id, $translation_id, "Translation");
$ox_get_id_sth->bind_columns(\$old_object_xref_id,\$status);
$ox_get_id_sth->fetch();
if($status eq "DUMP_OUT"){
print "Problem status for object_xref_id is DUMP_OUT but this should never happen as it was not found earlier??? (trasnlation_id = $translation_id, $xref_id\n";
}
if(!defined($old_object_xref_id)){
die "Duplicate but can't find the original?? xref_id = $xref_id, ensembl_id = $translation_id, type = Translation\n";
}
$ox_update_sth->execute($old_object_xref_id)|| die "Could not set update for object_xref_id = $old_object_xref_id";
}
else{
die "Problem loading error is $err\n";
}
}
else{
$change{"NEW"}++;
}
# print "insert $xref_id translation $translation_id ........\n";
$refseq_count++;
}
}
}
else{
print "HMMM how did i get here. This should be impossible. {logic error]\n";
}
}
$psth->finish;
$ox_count_sth->finish;
$ox_transcript_sth->finish;
$ox_translation_sth->finish;
$xref_sth->finish;
foreach my $key (keys %change){
print "\t$key\t".$change{$key}."\n";
}
print "$refseq_count new relations ships added\n";
}
1;
package XrefMapper::ProcessPrioritys;
use vars '@ISA';
@ISA = qw{ XrefMapper::BasicMapper };
use strict;
use warnings;
use XrefMapper::BasicMapper;
use Cwd;
use DBI;
use File::Basename;
use IPC::Open3;
# Process the priority xrefs.
#
# 1) create a list of source "names" that are priority xrefs
#
# 2) Just to be sure set all ox_status in object_xref to 'DUMP_OUT'
# set dumped in xref to NULL
#
# 3) for each of the source names
# set ox_status to 'FAILED_PRIORITY' for those not the best match
# Also do this fro its depenedents
#
sub new {
my($class, $mapper) = @_;
my $self ={};
bless $self,$class;
# $self->core($mapper->core);
$self->xref($mapper->xref);
return $self;
}
sub get_priority_names{
my ($self) = @_;
my $psth = $self->xref->dbc->prepare("select s.priority_description, s.name from source s, xref x where x.source_id = s.source_id group by s.priority_description, s.name order by s.name") || die "prepare failed";
$psth->execute() || die "execute failed";
my @names;
my %seen;
my $last_name = "rubbish";
my ($desc,$name);
$psth->bind_columns(\$desc,\$name);
while($psth->fetch()){
if($name eq $last_name and !defined($seen{$name})){
push @names, $name;
$seen{$name} = 1;
}
$last_name = $name;
}
return @names;
}
sub process {
my ($self) = @_;
my @names = $self->get_priority_names();
print "The foillowing will be processed as priority xrefs\n";
foreach my $name (@names){
print "\t$name\n";
}
my $sth = $self->xref->dbc->prepare('select ox.object_xref_id, x.accession, s.priority, ox.linkage_type from object_xref ox, xref x, source s where x.xref_id = ox.xref_id and x.source_id = s.source_id and s.name = ? and ox.ox_status = "DUMP_OUT" order by x.accession, s.priority');
my $seq_sth = $self->xref->dbc->prepare('select ox.object_xref_id, (ix.query_identity + ix.target_identity) as identity from object_xref ox, xref x, source s, identity_xref ix where ix.object_xref_id = ox.object_xref_id and x.xref_id = ox.xref_id and ox.ox_status = "DUMP_OUT" and x.source_id = s.source_id and s.name = ? and x.accession = ? order by identity DESC');
my $dep_sth = $self->xref->dbc->prepare('select distinct dox.object_xref_id, (ix.query_identity+ ix.target_identity) as identity from dependent_xref dx, object_xref dox, xref x, source s, object_xref mox, identity_xref ix where ix.object_xref_id = mox.object_xref_id and mox.xref_id = dx.master_xref_id and dx.dependent_xref_id = dox.xref_id and dox.xref_id = x.xref_id and s.source_id = x.source_id and x.accession = ? and s.name = ? and dox.ox_status = "DUMP_OUT"');
my $update_ox_sth = $self->xref->dbc->prepare('update object_xref set ox_status = "FAILED_PRIORITY" where object_xref_id = ?');
foreach my $name (@names){
my %priority_clash_seq;
my %priority_clash_depend;
print "processing $name source\n";
$sth->execute($name);
my ($ox,$acc,$priority, $type);
$sth->bind_columns(\$ox,\$acc,\$priority,\$type);
my $last_acc = "";
my $top_priority = 0;
while($sth->fetch()){
if($acc eq $last_acc){
if($priority == $top_priority and $type ne "DIRECT"){
if($type eq "SEQUENCE_MATCH"){
$priority_clash_seq{$acc} = 1;
}
elsif($type eq "DEPENDENT"){
$priority_clash_depend{$acc} = 1;
}
else{
print "type $type????? $acc, $name\n";
}
}
else{
$update_ox_sth->execute($ox);
}
}
else{
$top_priority = $priority;
}
$last_acc = $acc;
}
# Easy case of sequence matches
my $identity;
# print "need to sort out those with the same priority status:-\n";
foreach my $key (keys %priority_clash_seq){
$seq_sth->execute($name, $key);
$seq_sth->bind_columns(\$ox, \$identity);
$seq_sth->fetch(); # keep first one
while( $seq_sth->fetch()){
$update_ox_sth->execute($ox);
}
}
# now the hard one dependent xrefs!!
foreach my $key (keys %priority_clash_depend){
$dep_sth->execute($key, $name);
$dep_sth->bind_columns(\$ox, \$identity);
$dep_sth->fetch(); # keep first one
while( $dep_sth->fetch()){
$update_ox_sth->execute($ox); # remove others
}
}
}
#Sanity check make sure only one instance of each priority xref in object_xref table with ox_status = DUMP_OUT
foreach my $name (@names){
print "checking $name source\n";
$sth->execute($name);
my ($ox,$acc,$priority, $type);
$sth->bind_columns(\$ox,\$acc,\$priority,\$type);
my $last_acc = "";
while($sth->fetch()){
if($acc eq $last_acc){
print "ERROR: $acc still has more than one viable entry in object_xref\n";
}
$last_acc = $acc;
}
}
$seq_sth->finish;
$sth->finish;
$update_ox_sth->finish;
$sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('prioritys_flagged',now())");
$sth->execute();
$sth->finish;
}
1;
package XrefMapper::SubmitMapper;
use vars '@ISA';
@ISA = qw{ XrefMapper::BasicMapper };
use strict;
use warnings;
use XrefMapper::BasicMapper;
use Cwd;
use DBI;
use File::Basename;
use IPC::Open3;
##################################################################
# JOB 1 Do exonerate jobs and get core info
##################################################################
# Also load database with connection details of the core database for use later in JOBS
# One get all info from core that is needed. :- stable_id's etc.
# Process the direct xrefs by putting them in the object_xref table
# dump the fasta files
# submit exonerate jobs (fill tables mapping, mapping_jobs) if not already done
# submit coordinate xrefs if not processed
sub store_core_database_details{
my ($self, $port, $user, $pass, $dbname, $dir);
}
##############################################################################
# dump fasta files code
##############################################################################
=head2 dump_seqs
Arg[1]: xref object which holds info needed for the dump of xref
Description: Dumps out the files for the mapping. Xref object should hold
the value of the databases and source to be used.
Returntype : none
Exceptions : will die if species not known or an error occurs while
: trying to write to files.
Caller : general
=cut
sub dump_seqs{
my ($self, $location) = @_;
$self->dump_xref();
$self->dump_ensembl($location);
}
sub no_dump_xref {
my ($self) = @_;
my @method=();
my @lists =@{$self->get_set_lists()};
my $i=0;
my $k = 0;
foreach my $list (@lists){
$method[$k++] = shift @$list;
}
$self->method(\@method);
$self->core->dna_file($self->core->dir."/".$self->core->species."_dna.fasta");
$self->core->protein_file($self->core->dir."/".$self->core->species."_protein.fasta");
}
=head2 dump_xref
Arg[1]: xref object which holds info on method and files.
Description: Dumps the Xref data as fasta file(s)
Returntype : none
Exceptions : none
Caller : dump_seqs
=cut
sub dump_xref{
my ($self) = @_;
my $xref =$self->xref();
if(!defined($xref->dir())){
if(defined($self->dir)){
$xref->species($self->dir);
}
else{
$xref->dir(".");
}
}
my @method=();
my @lists =@{$self->get_set_lists()};
my $i=0;
if(defined($self->dumpcheck())){
my $skip = 1;
foreach my $list (@lists){
if(!-e $xref->dir()."/xref_".$i."_dna.fasta"){
$skip = 0;
}
if(!-e $xref->dir()."/xref_".$i."_peptide.fasta"){
$skip = 0;
}
$i++;
}
if($skip){
my $k = 0;
foreach my $list (@lists){
$method[$k++] = shift @$list;
}
$self->method(\@method);
print "Xref fasta files found and will be used (No new dumping)\n";
return;
}
}
print "Dumping Xref fasta files\n";
for my $sequence_type ('dna', 'peptide') {
my $filename = $xref->dir() . "/xref_0_" . $sequence_type . ".fasta";
open(XREF_DUMP,">$filename") || die "Could not open $filename";
my $sql = "SELECT p.xref_id, p.sequence, x.species_id , x.source_id ";
$sql .= " FROM primary_xref p, xref x ";
$sql .= " WHERE p.xref_id = x.xref_id AND ";
$sql .= " p.sequence_type ='$sequence_type' ";
my $sth = $xref->dbc->prepare($sql);
$sth->execute();
while(my @row = $sth->fetchrow_array()){
$row[1] =~ s/(.{60})/$1\n/g;
print XREF_DUMP ">".$row[0]."\n".$row[1]."\n";
}
close(XREF_DUMP);
$sth->finish();
}
my $sth = $xref->dbc->prepare("insert into process_status (status, date) values('xref_fasta_dumped',now())");
$sth->execute();
return;
}
=head2 dump_ensembl
Description: Dumps the ensembl data to a file in fasta format.
Returntype : none
Exceptions : none
Caller : dump_seqs
=cut
sub dump_ensembl{
my ($self, $location) = @_;
$self->fetch_and_dump_seq($location);
}
=head2 fetch_and_dump_seq
Description: Dumps the ensembl data to a file in fasta format.
Returntype : none
Exceptions : wil die if the are errors in db connection or file creation.
Caller : dump_ensembl
=cut
sub fetch_and_dump_seq{
my ($self) = @_;
my $ensembl = $self->core;
my $db = new Bio::EnsEMBL::DBSQL::DBAdaptor(-dbconn => $ensembl->dbc);
#
# store ensembl dna file name and open it
#
if(!defined($ensembl->dir())){
$ensembl->dir(".");
}
$ensembl->dna_file($ensembl->dir."/".$ensembl->species."_dna.fasta");
#
# store ensembl protein file name and open it
#
$ensembl->protein_file($ensembl->dir."/".$ensembl->species."_protein.fasta");
if(defined($self->dumpcheck()) and -e $ensembl->protein_file() and -e $ensembl->dna_file()){
print "Ensembl Fasta files found (no new dumping)\n";
return;
}
print "Dumping Ensembl Fasta files\n";
open(DNA,">".$ensembl->dna_file())
|| die("Could not open dna file for writing: ".$ensembl->dna_file."\n");
open(PEP,">".$ensembl->protein_file())
|| die("Could not open protein file for writing: ".$ensembl->protein_file."\n");
my $gene_adaptor = $db->get_GeneAdaptor();
# fetch by location, or everything if not defined
my @genes;
my $constraint;
# TEST PURPOSES ONLY#################################################
#####################################################################
@genes = @{$gene_adaptor->fetch_all()};
# push @genes, $gene_adaptor->fetch_by_stable_id("ENSG00000139618");
#####################################################################
my $max = undef;
my $i =0;
my $rna = 0;
foreach my $gene (@genes){
next if $gene->biotype eq 'J_segment';
next if $gene->biotype eq 'D_segment';
foreach my $transcript (@{$gene->get_all_Transcripts()}) {
$i++;
my $seq = $transcript->spliced_seq();
$seq =~ s/(.{60})/$1\n/g;
print DNA ">" . $transcript->dbID() . "\n" .$seq."\n";
my $trans = $transcript->translation();
my $translation = $transcript->translate();
if(defined($translation)){
my $pep_seq = $translation->seq();
$pep_seq =~ s/(.{60})/$1\n/g;
print PEP ">".$trans->dbID()."\n".$pep_seq."\n";
}
}
last if(defined($max) and $i > $max);
}
close DNA;
close PEP;
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('core_fasta_dumped',now())");
$sth->execute();
}
=head2 get_set_lists
Description: specifies the list of databases and source to be used in the
: generation of one or more data sets.
Returntype : list of lists
Example : my @lists =@{$self->get_set_lists()};
Exceptions : none
Caller : dump_xref
=cut
sub get_set_lists{
my ($self) = @_;
return [["ExonerateGappedBest1", ["*","*"]]];
}
###################################################################################################
# exonerate subs
###################################################################################################
=head2 jobcount
Arg [1] : (optional)
Example : $mapper->jobcount(1004);
Description: Getter / Setter for number of jobs submitted.
Returntype : scalar
Exceptions : none
=cut
sub jobcount {
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_jobcount} = $arg );
return $self->{_jobcount};
}
sub method{
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_method} = $arg );
return $self->{_method};
}
=head2 build_list_and_map
Arg[1]: xref object which holds info on method and files.
Description: runs the mapping of the list of files with species methods
Returntype : none
Exceptions : none
Caller : general
=cut
sub build_list_and_map {
my ($self) = @_;
my @list=();
my $i = 0;
foreach my $method (@{$self->method()}){
my @dna=();
my $q_dna_file = $self->xref->dir."/xref_".$i."_dna.fasta";
if (-e $q_dna_file and -s $q_dna_file) {
push @dna, $method;
push @dna, $q_dna_file;
push @dna, $self->core->dna_file();
push @list, \@dna;
}
my @pep=();
my $q_pep_file = $self->xref->dir."/xref_".$i."_peptide.fasta";
if (-e $q_pep_file and -s $q_pep_file) {
push @pep, $method;
push @pep, $self->xref->dir."/xref_".$i."_peptide.fasta";
push @pep, $self->core->protein_file();
push @list, \@pep;
}
$i++;
}
$self->run_mapping(\@list);
}
=head2 run_mapping
Arg[1] : List of lists of (method, query, target)
Arg[2] :
Example : none
Description: Create and submit mapping jobs to LSF, and wait for them to finish.
Returntype : none
Exceptions : none
Caller : general
=cut
sub run_mapping {
my ($self, $lists) = @_;
# delete old output files in target directory if we're going to produce new ones
my $dir = $self->core->dir();
print "Deleting out, err and map files from output dir: $dir\n";
unlink (<$dir/*.map $dir/*.out $dir/*.err>);
$self->remove_all_old_output_files();
#disconnect so that we can then reconnect after the long mapping bit.
$self->core->dbc->disconnect_if_idle(1);
$self->xref->dbc->disconnect_if_idle(1);
$self->core->dbc->disconnect_when_inactive(1);
$self->xref->dbc->disconnect_when_inactive(1);
# foreach method, submit the appropriate job & keep track of the job name
# note we check if use_existing_mappings is set here, not earlier, as we
# still need to instantiate the method object in order to fill
# method_query_threshold and method_target_threshold
my @job_names;
my @running_methods;
foreach my $list (@$lists){
my ($method, $queryfile ,$targetfile) = @$list;
my $obj_name = "XrefMapper::Methods::$method";
# check that the appropriate object exists
eval "require $obj_name";
if($@) {
warn("Could not find object $obj_name corresponding to mapping method $method, skipping\n$@");
} else {
my $obj = $obj_name->new();
# $method_query_threshold{$method} = $obj->query_identity_threshold();
# $method_target_threshold{$method} = $obj->target_identity_threshold();
# my $job_name = $obj->run($queryfile, $targetfile, $self->core->dir(),$self->nofarm);
my $job_name = $obj->run($queryfile, $targetfile, $self);
push @job_names, $job_name;
push @running_methods, $obj;
sleep 1; # make sure unique names really are unique
$self->jobcount($self->jobcount+$obj->jobcount);
}
} # foreach method
# submit depend job to wait for all mapping jobs
foreach my $method( @running_methods ){
# Submit all method-specific depend jobs
if( $method->can('submit_depend_job') ){
$method->submit_depend_job;
}
}
# Submit generic depend job. Defaults to LSF
$self->submit_depend_job($self->core->dir, @job_names);
$self->check_err($self->core->dir);
} # run_mapping
sub nofarm{
my ($self, $arg) = @_;
(defined $arg) &&
($self->{_nofarm} = $arg );
return $self->{_nofarm};
}
sub check_err {
my ($self, $dir) = @_;
foreach my $err (glob("$dir/*.err")) {
print "\n\n*** Warning: $err has non-zero size; may indicate".
" problems with exonerate run\n\n\n" if (-s $err);
}
}
=head2 submit_depend_job
Arg[1] : List of job names.
Arg[2] :
Example : none
Description: Submit an LSF job that waits for other jobs to finish.
Returntype : none
Exceptions : none
Caller : general
=cut
sub submit_depend_job {
my ($self, $root_dir, @job_names) = @_;
if(defined($self->nofarm)){
return;
}
# Submit a job that does nothing but wait on the main jobs to
# finish. This job is submitted interactively so the exec does not
# return until everything is finished.
# build up the bsub command; first part
my @depend_bsub = ('bsub', '-K');
# build -w 'ended(job1) && ended(job2)' clause
my $ended_str = "-w ";
my $i = 0;
foreach my $job (@job_names) {
$ended_str .= "ended($job)";
$ended_str .= " && " if ($i < $#job_names);
$i++;
}
push @depend_bsub, $ended_str;
# rest of command
push @depend_bsub, ('-q', 'small', '-o', "$root_dir/depend.out", '-e', "$root_dir/depend.err");
my $jobid = 0;
eval {
my $pid;
my $reader;
local *BSUB;
local *BSUB_READER;
if ( ( $reader = open( BSUB_READER, '-|' ) ) ) {
while (<BSUB_READER>) {
#print "YES:$_";
if (/^Job <(\d+)> is submitted/) {
$jobid = $1;
print "LSF job ID for depend job: $jobid\n";
}
}
close(BSUB_READER);
} else {
die("Could not fork : $!\n") unless ( defined($reader) );
open( STDERR, ">&STDOUT" );
if ( ( $pid = open( BSUB, '|-' ) ) ) {
print BSUB "/bin/true\n";
close BSUB;
if ( $? != 0 ) {
die( "bsub exited with non-zero status ($?) "
. "- job not submitted\n" );
}
} else {
if ( defined($pid) ) {
exec(@depend_bsub);
die("Could not exec bsub : $!\n");
} else {
die("Could not fork : $!\n");
}
}
exit(0);
}
};
if ($@) {
# Something went wrong
warn("Job submission failed:\n$@\n");
}
else{
my $sth = $self->xref->dbc->prepare("insert into process_status (status, date) values('mapping_finished',now())");
$sth->execute();
}
}
sub remove_all_old_output_files{
my ($self) =@_;
my $dir = $self->core->dir();
print "Deleting txt and sql files from output dir: $dir\n";
unlink(<$dir/*.txt $dir/*.sql>);
# $self->cleanup_projections_file(); # now to be done when we load core.
}
1;
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment