Commit 3266ecfd authored by Ian Longden's avatar Ian Longden
Browse files

New subroutine fix_mappings added as well as removing the complicated forking...

New subroutine fix_mappings added as well as removing the complicated forking process in favour of just using back ticks
parent 6e50552f
......@@ -412,6 +412,124 @@ sub build_list_and_map {
}
=head2 fix_mappings
Example : none
Description: submit mapping jobs to LSF for those that had problems, and wait for them to finish.
Returntype : none
Exceptions : none
Caller : general
=cut
sub fix_mappings {
my $self = shift;
# 1) find the broken jobs
# 2) remove the object_xrefs from object_xref_start to object_xref_end
# Plus identity xrefs and go_xrefs for these
my $rm_object_xrefs_sth = $self->xref->dbc->prepare("delete o from object_xref o where object_xref_id >= ? and object_xref_id <= ?");
my $rm_identity_xrefs_sth = $self->xref->dbc->prepare("delete i from identity_xref i where object_xref_id >= ? and object_xref_id <= ?");
my $rm_go_xrefs_sth = $self->xref->dbc->prepare("delete g from go_xref g where object_xref_id >= ? and object_xref_id <= ?");
my $reset_object_xref_limits_sth = $self ->xref->dbc->prepare("update mapping_jobs set object_xref_start = null, object_xref_end = null where job_id = ? and array_number = ?");
# 3) remove map, err and out files for each
# 4) rerun the mapping jobs SETTING NEW DATA IN mapping_jobs
my $sql = 'SELECT mapping.job_id, map_file, out_file, err_file, object_xref_start, object_xref_end, array_number, command_line, method, root_dir from mapping_jobs, mapping where mapping.job_id = mapping_jobs.job_id and status = "FAILED"';
my $sth = $self->xref->dbc->prepare($sql);
$sth->execute();
my ($job_id, $map_file, $out_file, $err_file, $object_xref_start, $object_xref_end, $array_number, $command_line, $method, $root_dir);
$sth->bind_columns(\$job_id, \$map_file, \$out_file, \$err_file, \$object_xref_start, \$object_xref_end, \$array_number, \$command_line, \$method, \$root_dir);
my @running_methods;
my @job_names;
while ($sth->fetch){
my $start = undef;
my $end = undef;
$command_line =~ s/\$LSB_JOBINDEX/$array_number/g;
if(defined($object_xref_start) and $object_xref_start){
$start = $object_xref_start;
}
if(defined($object_xref_end) and $object_xref_end){
$end = $object_xref_end;
}
if(!defined($start) and !defined($end)){
}
elsif(!defined($start) or !defined($end)){
print STDERR "Could not clean up for ".$job_id."[".$array_number."]\n";
next;
}
else{ # REMOVE the entries
print "removing object_xref etc from $start to $end\n";
$rm_object_xrefs_sth->execute($start, $end);
$rm_identity_xrefs_sth->execute($start, $end);
$rm_go_xrefs_sth->execute($start, $end);
$reset_object_xref_limits_sth->execute($job_id, $array_number);
}
unlink($map_file);
unlink($out_file);
unlink($err_file);
# Run the Mapping.
my $obj_name = "XrefMapper::Methods::$method";
# check that the appropriate object exists
eval "require $obj_name";
if($@) {
warn("Could not find object $obj_name corresponding to mapping method $method, skipping\n$@");
} else {
my $obj = $obj_name->new();
print "DO resubmit for $array_number\n";
my $job_name = $obj->resubmit_exonerate($self->mapper, $command_line, $out_file, $err_file, $job_id, $array_number, $root_dir);
print "Job name is $job_name\n";
push @job_names, $job_name;
push @running_methods, $obj;
my $sth = $self->mapper->xref->dbc->prepare('update mapping_jobs set status = "SUBMITTED"'." where job_id = $job_id and array_number = $array_number");
$sth->execute();
$sth->finish;
sleep 1; # make sure unique names really are unique
$self->jobcount(1);
}
}
$sth->finish;
# submit depend job to wait for all mapping jobs
foreach my $method( @running_methods ){
# Submit all method-specific depend jobs
if( $method->can('submit_depend_job') ){
$method->submit_depend_job;
}
}
# Submit generic depend job. Defaults to LSF IF any exist.
if(defined($job_names[0])){
$self->submit_depend_job($self->core->dir, @job_names);
}
$self->check_err($self->core->dir);
}
=head2 run_mapping
Arg[1] : List of lists of (method, query, target)
......@@ -543,60 +661,34 @@ sub submit_depend_job {
my @depend_bsub = ('bsub', '-K');
# build -w 'ended(job1) && ended(job2)' clause
my $ended_str = "-w ";
my $ended_str = '-w "';
my $i = 0;
foreach my $job (@job_names) {
$ended_str .= "ended($job)";
$ended_str .= " && " if ($i < $#job_names);
$i++;
}
$ended_str .= '"';
push @depend_bsub, $ended_str;
# rest of command
push @depend_bsub, ('-q', 'small', '-o', "$root_dir/depend.out", '-e', "$root_dir/depend.err");
my $jobid = 0;
eval {
my $pid;
my $reader;
my $com = "bsub -K -q small -o $root_dir/depend.out -e $root_dir/depend.err $ended_str /bin/true";
local *BSUB;
local *BSUB_READER;
if ( ( $reader = open( BSUB_READER, '-|' ) ) ) {
while (<BSUB_READER>) {
if (/^Job <(\d+)> is submitted/) {
$jobid = $1;
print "LSF job ID for depend job: $jobid\n" if($self->verbose);
}
}
close(BSUB_READER);
} else {
die("Could not fork : $!\n") unless ( defined($reader) );
open( STDERR, ">&STDOUT" );
if ( ( $pid = open( BSUB, '|-' ) ) ) {
print BSUB "/bin/true\n";
close BSUB;
if ( $? != 0 ) {
die( "bsub exited with non-zero status ($?) "
. "- job not submitted\n" );
}
} else {
if ( defined($pid) ) {
exec(@depend_bsub);
die("Could not exec bsub : $!\n");
} else {
die("Could not fork : $!\n");
}
}
exit(0);
}
};
my $line = `$com`;
if ($line =~ /^Job <(\d+)> is submitted/) {
$jobid = $1;
print "LSF job ID for Depend job: $jobid (job array with 1 job)\n";
}
if ($@) {
if (!$jobid) {
# Something went wrong
warn("Job submission failed:\n$@\n");
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment