Commit a5c33fec authored by Will Spooner's avatar Will Spooner
Browse files

Mapper that runs exonerate jobs using PBS rather than LSF

parent 6a1a6ac1
package XrefMapper::Methods::PBSExonerateGappedBest1;
use strict;
#use File::Basename;
#use IPC::Open3;
use Sys::Hostname;
use XrefMapper::Methods::ExonerateBasic;
use vars '@ISA';
@ISA = qw{XrefMapper::Methods::ExonerateBasic};
my $exonerate_path = "exonerate";
sub options {
return ('--model', 'affine:local', '--subopt', 'no', '--bestn', '1');
}
sub query_identity_threshold {
return 90;
}
sub target_identity_threshold {
return 90;
}
sub submit_exonerate{
my ($self, $query, $target, $root_dir, @options) = @_;
# Exonerate can run individual chunks of a large job.
# Determine the number of chunks that will be used, and add to total
my $num_jobs
= XrefMapper::Methods::ExonerateBasic::calculate_num_jobs($query);
$self->jobcount($self->jobcount()+$num_jobs);
$num_jobs || return; # Check we have jobs to run
# Build a template representing the exonerate command
my $options_str = join( ' ', @options );
my $shell_command = <<"EOF";
#!/bin/sh
$exonerate_path \\
--target $target \\
--query $query \\
--querychunktotal $num_jobs \\
--querychunkid %d \\
--showvulgar FALSE \\
--showalignment FALSE \\
--ryo "xref:%%qi:%%ti:%%ei:%%ql:%%tl:%%qab:%%qae:%%tab:%%tae:%%C:%%s\\n" \\
$options_str \\
| grep '^xref'
exit
EOF
#warn( sprintf $shell_command, 1 );
# Determine the output filename
my $host = Sys::Hostname::hostname;
my $query_alphabet = $query =~ /peptide.fasta$/ ? 'peptide' : 'dna';
my $outfile_root = sprintf( "%s:%s/%s_%s",
$host,
$root_dir,
$self->get_class_name(),
$query_alphabet );
# Run PBS qsub for each job chunk.
# First set a job that depends on the completion of all other jobs
my $jobid = $self->submit_qsub( q( echo 'sleep 1' ),
[ -W => "depend=on:$num_jobs" ] );
# Run each job chunk
my $jobname = join '.', 'X', ( $host =~ /^(\w+)/ ), $$;
foreach( my $i=1; $i<=$num_jobs; $i++ ){
my $outfile = sprintf( '%s_%s.map', $outfile_root, $i );
my $errfile = sprintf( '%s_%s.err', $outfile_root, $i );
my $chunkid = $self->submit_qsub
( sprintf( $shell_command, $i ),
[ '-N' => $jobname,
'-W' => "depend=beforeany:$jobid",
'-o' => $outfile,
'-e' => $errfile ] );
}
# BasicMapper calls a method called 'submit_depend_job' that waits
# till all the lsf jobs have completed. We're not using LSF, so this
# approach does not work. Solution: wait here until all jobs are done.
# Not as efficient for a large cluster, but we may have enough work to
# do to saturate a small one. TODO: abstract 'submit_depend_job' to the
# Method class instances.
$self->global_jobid( $jobid );
#$self->submit_depend_job( $jobid );
return $jobid;
}
#----------------------------------------------------------------------
# Get/set the 'global' job ID that groups all chunked jobs in the method
sub global_jobid{
my $key = '_global_jobid';
my $self = shift;
if( @_ ){ $self->{$key} = shift }
return $self->{$key};
}
#----------------------------------------------------------------------
# Waits in-process untill the specified jobid has completed
sub submit_depend_job{
my $self = shift;
$self->jobcount || return; # Check that we are running jobs
my $jobid = $self->global_jobid || die( "global_jobid unset!" );
my $depend = join( ':', 'afterany', $jobid );
my $id = $self->submit_qsub( q( echo 'sleep 1' ),
[ '-W' => 'block=true',
'-W' => "depend=$depend" ] );
return $jobid;
}
#----------------------------------------------------------------------
# A wrapper for submitting PBS qsub jobs;
# First the READER (this process) opens a pipe (-|) on the WRITER.
# The WRITER then opens a pipe (|-) on the RUNNER.
# The RUNNER then execs qsub with command line options,
# the WRITER writes the script to the RUNNER,
# and any output is collected by the READER.
# Returns the qsub job ID.
sub submit_qsub{
my $self = shift;
my $script = shift || die( "Need a script to submit to qsub!" );
my @qsub_args = @{ shift || [] };
local *QSUB;
local *QSUB_READER;
my $jobid;
my $writer_pid;
if( $writer_pid = open( QSUB_READER, '-|' ) ){
# READER - Reads stdout from RUNNER process
while( <QSUB_READER> ){
if( /^(\d+)/ ){
$jobid = $1;
print( "Job ID $1 submitted to qsub\n" );
}
}
close( QSUB_READER );
}
else{
unless( defined($writer_pid) ){ die("Could not fork : $!" ) }
my $runner_pid;
if( $runner_pid = open(QSUB, '|-') ){
# WRITER - Writes command to RUNNER process
print QSUB $script;
close QSUB;
unless( $? == 0 ){ die( "qsub failed; non-zero status" ) }
exit(0);
}
else{
# RUNNER - Runs the command; STDIN,STDOUT attached to WRITER,READER.
unless( defined($runner_pid) ){ die("Could not fork : $!" ) }
#warn join( " ", 'qsub', @qsub_args );
exec( 'qsub', @qsub_args );
die("Could not exec qsub : $!");
}
}
return $jobid;
}
1;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment