Skip to content
Snippets Groups Projects
Commit 80e8066d authored by Patrick Meidl's avatar Patrick Meidl
Browse files

added support for different cache methods (currently implemented:...

added support for different cache methods (currently implemented: build_cache_all() and build_cache_by_seq_region()
parent e29b80c4
No related branches found
No related tags found
No related merge requests found
......@@ -66,6 +66,7 @@ use FindBin qw($Bin);
use Bio::EnsEMBL::Utils::ConfParser;
use Bio::EnsEMBL::Utils::Logger;
use Bio::EnsEMBL::Utils::ScriptUtils qw(dynamic_use path_append);
use Bio::EnsEMBL::Utils::Exception qw(throw warning);
# parse configuration and commandline arguments
my $conf = new Bio::EnsEMBL::Utils::ConfParser(
......@@ -89,6 +90,7 @@ $conf->parse_options(
'region=s' => 0,
'biotypes=s@' => 0,
'lsf_opt_dump_cache|lsfoptdumpcache=s' => 0,
'cache_method=s' => 0,
);
# set default logpath
......@@ -111,63 +113,12 @@ my $logger = new Bio::EnsEMBL::Utils::Logger(
# initialise log
$logger->init_log($conf->list_param_values);
my %jobs;
# create empty directory for logs
my $logpath = path_append($conf->param('logpath'), 'dump_by_seq_region');
system("rm -rf $logpath") == 0 or
$logger->error("Unable to delete lsf log dir $logpath: $!\n");
system("mkdir -p $logpath") == 0 or
$logger->error("Can't create lsf log dir $logpath: $!\n");
# submit jobs to lsf
foreach my $dbtype (qw(source target)) {
$logger->info("\n".ucfirst($dbtype)." db...\n", 0, 'stamped');
my $schema = $conf->param("${dbtype}schema") || 'latest';
my $cache_builder = "build_cache_$schema";
no strict 'refs';
&$cache_builder($dbtype);
}
# monitor progress
my $err;
my $total = scalar(keys %jobs);
my @types;
while (keys %jobs) {
foreach my $type (keys %jobs) {
my $err_log = "$logpath/dump_by_seq_region.$type.err";
# there's an error if the lsf error logfile has non-zero length
$err++ if (-s $err_log);
# the job has finished if you find the error logfile
delete($jobs{$type}) if (-e $err_log);
push @types, $type;
}
$logger->info("Jobs waiting: ".scalar(keys %jobs)."/$total.\r");
sleep(3) if (scalar(keys %jobs));
}
$logger->info("\n\n");
# check if anything went wrong
sleep(5);
foreach my $type (@types) {
#warn "$logpath/dump_by_seq_region.$type.success\n";
$err++ unless (-e "$logpath/dump_by_seq_region.$type.success");
}
my $retval = 0;
if ($err) {
$logger->warning("At least one of your jobs failed.\n");
$logger->warning("Please check $logpath and ".$logger->logpath."/".$logger->logfile." for errors.\n");
$retval = 1;
}
# determin cache method to use.
# this can be used to support different caching strategies or access to old
# database schemas.
my $cache_method = $conf->param('cache_method') || 'build_cache_by_seq_region';
no strict 'refs';
my $retval = &$cache_method;
# finish logfile
$logger->finish_log;
......@@ -178,64 +129,136 @@ exit($retval);
### END main ###
sub build_cache_latest {
my $dbtype = shift;
sub build_cache_by_seq_region {
my $cache_impl = 'Bio::EnsEMBL::IdMapping::Cache';
my %jobs = ();
# create empty directory for logs
my $logpath = path_append($conf->param('logpath'), 'dump_by_seq_region');
system("rm -rf $logpath") == 0 or
$logger->error("Unable to delete lsf log dir $logpath: $!\n");
system("mkdir -p $logpath") == 0 or
$logger->error("Can't create lsf log dir $logpath: $!\n");
# load the cache implementation
my $cache_impl = 'Bio::EnsEMBL::IdMapping::Cache';
dynamic_use($cache_impl);
my $cache = $cache_impl->new(
-LOGGER => $logger,
-CONF => $conf,
);
my $dba = $cache->get_DBAdaptor($dbtype);
my $sa = $dba->get_SliceAdaptor;
$logger->info("Submitting jobs to lsf...\n", 1);
# submit jobs to lsf
foreach my $dbtype (qw(source target)) {
foreach my $slice_name (@{ $cache->slice_names($dbtype) }) {
&bsubmit($dbtype, $slice_name, $cache, $logpath);
}
$logger->info("\n".ucfirst($dbtype)." db...\n", 0, 'stamped');
}
# determine which slices need to be done
my $filename = "$dbtype.dump_cache.slices.txt";
open(my $fh, '>', "$logpath/$filename") or
throw("Unable to open $logpath/$filename for writing: $!");
my $num_jobs = 0;
foreach my $slice_name (@{ $cache->slice_names($dbtype) }) {
my $type = "$dbtype.$slice_name";
unless ($cache->cache_file_exists($type)) {
print $fh "$slice_name\n";
$num_jobs++;
}
}
sub bsubmit {
my $dbtype = shift;
my $slice_name = shift;
my $cache = shift;
my $logpath = shift;
$logger->info("$slice_name\n", 2);
my $type = "$dbtype.$slice_name";
close($fh);
# build lsf command
my $lsf_name = 'dump_by_seq_region_'.time;
my $concurrent = $conf->param('dump_cache_concurrent_jobs') || 200;
unless ($cache->cache_file_exists($type)) {
my $options = $conf->create_commandline_options(
logauto => 1,
logautobase => "dump_by_seq_region_${type}",
logautobase => "dump_by_seq_region",
interactive => 0,
is_component => 1,
dbtype => $dbtype,
slice_name => $slice_name,
cache_impl => ref($cache),
cache_impl => $cache_impl,
);
my $cmd = "bsub ".
"-o $logpath/dump_by_seq_region.$type.out ".
"-e $logpath/dump_by_seq_region.$type.err ".
$conf->param('lsf_opt_dump_cache') . " " .
"./dump_by_seq_region.pl $options";
my $cmd = qq{./dump_by_seq_region.pl $options --index \$LSB_JOBINDEX};
my $pipe = qq{|bsub -J$lsf_name\[1-$num_jobs\]\%$concurrent } .
qq{-o $logpath/dump_by_seq_region.$dbtype.\%I.out } .
qq{-e $logpath/dump_by_seq_region.$dbtype.\%I.err } .
$conf->param('lsf_opt_dump_cache');
# run lsf job array
$logger->info("\nSubmitting $num_jobs jobs to lsf.\n");
$logger->debug("$cmd\n\n");
system("$cmd") == 0
or $logger->error("Error running dump_by_seq_region.pl: $!");
local *BSUB;
open BSUB, $pipe or
$logger->error("Could not open open pipe to bsub: $!\n");
print BSUB $cmd;
$logger->error("Error submitting jobs: $!\n")
unless ($? == 0);
close BSUB;
# submit dependent job to monitor finishing of jobs
$logger->info("Waiting for jobs to finish...\n", 0, 'stamped');
my $dependent_job = qq{bsub -K -w "ended($lsf_name)" -q small } .
qq{-o $logpath/dump_cache.$dbtype.depend.out /bin/true};
system($dependent_job) == 0 or
$logger->error("Error submitting dependent job: $!\n");
$logger->info("All jobs finished.\n", 0, 'stamped');
# check for lsf errors
sleep(5);
my $err;
foreach my $i (1..$num_jobs) {
$err++ unless (-e "$logpath/dump_by_seq_region.$dbtype.$i.success");
}
if ($err) {
$logger->error("At least one of your jobs failed.\nPlease check the logfiles at $logpath for errors.\n");
return 1;
}
}
return 0;
}
sub build_cache_all {
# load the cache implementation
my $cache_impl = 'Bio::EnsEMBL::IdMapping::Cache';
dynamic_use($cache_impl);
my $cache = $cache_impl->new(
-LOGGER => $logger,
-CONF => $conf,
);
# submit jobs to lsf
foreach my $dbtype (qw(source target)) {
$logger->info("\n".ucfirst($dbtype)." db...\n", 0, 'stamped');
$logger->info("Building cache for whole genome...\n", 1);
my $i = 0;
my $size = 0;
($i, $size) = $cache->build_cache_all($dbtype);
$logger->info("Done with $dbtype (genes: $i, filesize: $size.\n", 1,
'stamped');
}
# mark job as submitted
$jobs{$type} = 1;
}
return 0;
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment