Commit 2d7324e7 authored by Jessica Severin's avatar Jessica Severin
Browse files

consolidated local_beekeeper and lsf_beekeeper into a single script

program has evolved into being the primary portal for user interaction
with the Hive, so 95% of functions are compute resource agnostic so it
makes more sense to have internal switches for which compute resource to
submit/check.  Expanded local logic to allow multiple local cpus, submit
into background(fork), check with 'ps', kill with 'kill -9'.
parent 331fc631
#!/usr/local/ensembl/bin/perl -w
use strict;
use DBI;
use Getopt::Long;
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::Queen;
use Bio::EnsEMBL::Hive::URLFactory;
use Sys::Hostname;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor;
# ok this is a hack, but I'm going to pretend I've got an object here
# by creating a blessed hash ref and passing it around like an object
# this is to avoid using global variables in functions, and to consolidate
# the globals into a nice '$self' package
my $self = bless {};
$self->{'db_conf'} = {};
$self->{'db_conf'}->{'-user'} = 'ensro';
$self->{'db_conf'}->{'-port'} = 3306;
$self->{'max_loops'} = 0; #unlimited
$self->{'beekeeper_type'} = 'LSF';
$self->{'local_cpus'} = 2;
$| = 1;
my $conf_file;
my ($help, $host, $user, $pass, $dbname, $port, $adaptor, $url);
my ($job_limit, $batch_size);
my $loopit=0;
my $worker_limit = 50;
my $sleep_time = 3;
my $sync=0;
my $local=undef;
$self->{'overdue_limit'} = 75; #minutes
$self->{'show_analysis_stats'} = undef;
$self->{'show_worker_stats'} = undef;
GetOptions('help' => \$help,
'url=s' => \$url,
'conf=s' => \$conf_file,
'dbhost=s' => \$host,
'dbport=i' => \$port,
'dbuser=s' => \$user,
'dbpass=s' => \$pass,
'dbname=s' => \$dbname,
'local' => \$local,
'dead' => \$self->{'check_for_dead'},
'killworker=i' => \$self->{'kill_worker_id'},
'overdue' => \$self->{'overdue_limit'},
'alldead' => \$self->{'all_dead'},
'run' => \$self->{'run'},
'jlimit=i' => \$job_limit,
'wlimit=i' => \$worker_limit,
'batch_size=i' => \$batch_size,
'loop' => \$loopit,
'no_pend' => \$self->{'no_pend_adjust'},
'sync' => \$sync,
'analysis_stats' => \$self->{'show_analysis_stats'},
'worker_stats' => \$self->{'show_worker_stats'},
'sleep=i' => \$sleep_time,
'logic_name=s' => \$self->{'logic_name'},
'failed_jobs' => \$self->{'show_failed_jobs'},
'reset_job_id=i' => \$self->{'reset_job_id'},
'reset_all_jobs_for_analysis_id=i' => \$self->{'reset_all_jobs_for_analysis_id'},
'm=s' => \$self->{'lsf_machine_option'},
);
if ($help) { usage(); }
if($local) {
$self->{'beekeeper_type'} ='LOCAL';
}
parse_conf($self, $conf_file);
if($self->{'run'}) {
$loopit = 1;
$self->{'max_loops'} = 1;
}
my $DBA;
if($url) {
$DBA = Bio::EnsEMBL::Hive::URLFactory->fetch($url);
die("Unable to connect to $url\n") unless($DBA);
} else {
if($host) { $self->{'db_conf'}->{'-host'} = $host; }
if($port) { $self->{'db_conf'}->{'-port'} = $port; }
if($dbname) { $self->{'db_conf'}->{'-dbname'} = $dbname; }
if($user) { $self->{'db_conf'}->{'-user'} = $user; }
if($pass) { $self->{'db_conf'}->{'-pass'} = $pass; }
unless(defined($self->{'db_conf'}->{'-host'})
and defined($self->{'db_conf'}->{'-user'})
and defined($self->{'db_conf'}->{'-dbname'}))
{
print "\nERROR : must specify host, user, and database to connect\n\n";
usage();
}
# connect to database specified
$DBA = new Bio::EnsEMBL::Hive::DBSQL::DBAdaptor(%{$self->{'db_conf'}});
}
$self->{'dba'} = $DBA;
my $queen = $DBA->get_Queen;
if($self->{'reset_job_id'}) { $queen->reset_and_fetch_job_by_dbID($self->{'reset_job_id'}); };
if($self->{'reset_all_jobs_for_analysis_id'}) { reset_all_jobs_for_analysis_id($self); }
if($self->{'all_dead'}) { register_all_workers_dead($self, $queen); }
if($self->{'check_for_dead'}) { check_for_dead_workers($self, $queen); }
my $analysis = $DBA->get_AnalysisAdaptor->fetch_by_logic_name($self->{'logic_name'});
if($loopit) {
run_autonomously($self, $queen);
} elsif($analysis) {
my $stats = $analysis->stats;
if($sync) {
$queen->synchronize_AnalysisStats($stats);
$queen->check_blocking_control_rules_for_AnalysisStats($stats);
}
$stats->print_stats;
} else {
$queen->synchronize_hive() if($sync);
$queen->print_analysis_status if($self->{'show_analysis_stats'});
$queen->print_running_worker_status;
show_running_workers($self) if($self->{'show_worker_stats'});
#show_failed_workers($self);
$queen->get_num_needed_workers();
$queen->get_hive_progress();
show_failed_jobs($self) if($self->{'show_failed_jobs'});
}
exit(0);
#######################
#
# subroutines
#
#######################
sub usage {
print "beekeeper.pl [options]\n";
print " -help : print this help\n";
print " -url <url string> : url defining where hive database is located\n";
print " -conf <path> : config file describing db connection\n";
print " -dbhost <machine> : mysql database host <machine>\n";
print " -dbport <port#> : mysql port number\n";
print " -dbname <name> : mysql database <name>\n";
print " -dbuser <name> : mysql connection user <name>\n";
print " -dbpass <pass> : mysql connection password\n";
print " -batch_size <num> : #jobs a worker can claim at once\n";
print " -jlimit <num> : #jobs to run before worker can die naturally\n";
print " -dead : clean overdue jobs for resubmission\n";
print " -overdue <min> : worker overdue minutes checking if dead\n";
print " -alldead : all outstanding workers\n";
print " -run : run 1 iteration of automation loop\n";
print " -loop : run autonomously, loops and sleeps\n";
print " -local : run jobs on local CPU (fork)\n";
print " -lsf : run jobs on LSF compute resource (bsub)\n";
print " -no_pend : don't adjust needed workers by pending workers\n";
print " -m <string> : passes <string> to LSF bsub -m option\n";
print " -sleep <num> : when looping, sleep <num> minutes (default 3min)\n";
print " -wlimit <num> : max # workers to create per loop\n";
print " -analysis_stats : show status of each analysis\n";
print " -worker_stats : show status of each running worker\n";
print " -failed_jobs : show all failed jobs\n";
print " -reset_job_id <num> : reset a job back to READY so it can be rerun\n";
print " -reset_all_jobs_for_analysis_id <num>\n";
print " : reset jobs back to READY so it can be rerun\n";
print "beekeeper.pl v1.6\n";
exit(1);
}
sub parse_conf {
my $self = shift;
my $conf_file = shift;
if($conf_file and (-e $conf_file)) {
#read configuration file from disk
my @conf_list = @{do $conf_file};
foreach my $confPtr (@conf_list) {
#print("HANDLE type " . $confPtr->{TYPE} . "\n");
if(($confPtr->{TYPE} eq 'COMPARA') or ($confPtr->{TYPE} eq 'DATABASE')) {
$self->{'db_conf'} = $confPtr;
}
}
}
}
sub kill_worker {
my $self = shift;
my $queen = shift;
my $worker = $queen->_fetch_by_hive_id($self->{'kill_worker_id'});
return unless($worker->beekeeper eq $self->{'beekeeper_type'});
return if(defined($worker->cause_of_death));
printf("KILL: %10d %35s %15s %20s(%d) : ",
$worker->hive_id, $worker->host, $worker->process_id,
$worker->analysis->logic_name, $worker->analysis->dbID);
if(($self->{'beekeeper_type'} eq 'LSF') and ($self->lsf_check_worker($worker))) {
my $cmd = 'bkill ' . $worker->process_id;
system($cmd);
}
if(($self->{'beekeeper_type'} eq 'LOCAL') and
($worker->host eq hostname) and
($self->local_check_worker($worker)))
{
my $cmd = 'kill -9 ' . $worker->process_id;
system($cmd);
}
$queen->register_worker_death($worker);
}
sub check_for_dead_workers {
my $self = shift;
my $queen = shift;
print("===== check for dead workers\n");
my $overdueWorkers = $queen->fetch_overdue_workers($self->{'overdue_limit'}*60);
print(scalar(@{$overdueWorkers}), " overdue workers\n");
foreach my $worker (@{$overdueWorkers}) {
next unless($worker->beekeeper eq $self->{'beekeeper_type'});
next if(($self->{'beekeeper_type'} eq 'LOCAL') and
($worker->host ne hostname));
printf("%10d %35s %15s %20s(%d) : ",
$worker->hive_id, $worker->host, $worker->process_id,
$worker->analysis->logic_name, $worker->analysis->dbID);
my $is_alive;
$is_alive = $self->lsf_check_worker($worker) if($self->{'beekeeper_type'} eq 'LSF');
$is_alive = $self->local_check_worker($worker) if($self->{'beekeeper_type'} eq 'LOCAL');
if($is_alive) {
print("ALIVE and running\n");
} else {
print("worker is missing => it DIED!!\n");
$queen->register_worker_death($worker);
}
}
}
sub lsf_check_worker {
my $self = shift;
my $worker = shift;
my $cmd = "bjobs ". $worker->process_id . " 2>&1 | grep -v 'not found' | grep -v JOBID | grep -v EXIT";
#print(" check worker with : $cmd\n");
my $is_alive = qx/$cmd/;
return $is_alive;
}
sub local_check_worker {
my $self = shift;
my $worker = shift;
my $cmd = "ps ". $worker->process_id . " 2>&1 | grep " . $worker->process_id;
my $is_alive = qx/$cmd/;
return $is_alive;
}
sub register_all_workers_dead {
my $self = shift;
my $queen = shift;
my $overdueWorkers = $queen->fetch_overdue_workers(0);
foreach my $worker (@{$overdueWorkers}) {
$queen->register_worker_death($worker);
}
}
sub show_overdue_workers {
my $self = shift;
my $queen = shift;
print("===== overdue workers\n");
my $overdueWorkers = $queen->fetch_overdue_workers($self->{'overdue_limit'}*60);
foreach my $worker (@{$overdueWorkers}) {
printf("%10d %35s %15s %20s(%d)\n", $worker->hive_id,$worker->host,$worker->process_id, $worker->analysis->logic_name, $worker->analysis->dbID);
}
}
sub show_running_workers {
my $self = shift;
my $queen = $self->{'dba'}->get_Queen;
print("===== running workers\n");
my $worker_list = $queen->fetch_overdue_workers(0);
foreach my $worker (@{$worker_list}) {
printf("%10d %35s(%5d) %5s:%15s %15s (%s)\n",
$worker->hive_id,
$worker->analysis->logic_name,
$worker->analysis->dbID,
$worker->beekeeper,
$worker->process_id,
$worker->host,
$worker->last_check_in);
}
}
sub show_failed_jobs {
my $self = shift;
print("===== failed jobs\n");
my $failed_job_list = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_all_failed_jobs;
foreach my $job (@{$failed_job_list}) {
my $analysis = $self->{'dba'}->get_AnalysisAdaptor->fetch_by_dbID($job->analysis_id);
printf("job_id=%d %35s(%5d) input_id='%s'\n",
$job->dbID,
$analysis->logic_name,
$analysis->dbID,
$job->input_id);
}
}
sub show_failed_workers {
my $self = shift;
my $queen = $self->{'dba'}->get_Queen;
print("===== CRASHED workers\n");
my $worker_list = $queen->fetch_failed_workers;
foreach my $worker (@{$worker_list}) {
printf("%10d %35s(%5d) %5s:%15s %15s (%s)\n",
$worker->hive_id,
$worker->analysis->logic_name,
$worker->analysis->dbID,
$worker->beekeeper,
$worker->process_id,
$worker->host,
$worker->last_check_in);
}
}
sub run_autonomously {
my $self = shift;
my $queen = shift;
unless(`runWorker.pl`) {
print("can't find runWorker.pl script. Please make sure it's in your path\n");
exit(1);
}
my ($cmd, $worker_cmd);
my $loopCount=1;
while($loopit) {
print("\n=======lsf_beekeeper loop ** $loopCount **==========\n");
check_for_dead_workers($self, $queen);
$queen->print_analysis_status if($self->{'show_analysis_stats'});
$queen->print_running_worker_status;
#show_failed_workers($self);
my $runCount = $queen->get_num_running_workers();
my $load = $queen->get_hive_current_load();
my $count = $queen->get_num_needed_workers();
if($self->{'beekeeper_type'} eq 'LSF') {
$count = $count - $self->get_lsf_pending_count();
}
if($load==0 and $count==0 and $runCount==0) {
#nothing running and nothing todo => do hard resync
print("*** nothing is happening => do a hard resync\n");
$queen->synchronize_hive();
$count = $queen->get_num_needed_workers();
}
$count = $worker_limit if($count>$worker_limit);
my $logic_name = $self->{'logic_name'};
if($count>0) {
print("need $count workers\n");
$worker_cmd = "runWorker.pl -url $url -bk ". $self->{'beekeeper_type'};
$worker_cmd .= " -limit $job_limit" if(defined $job_limit);
$worker_cmd .= " -batch_size $batch_size" if(defined $batch_size);
$worker_cmd .= " -logic_name $logic_name" if(defined $logic_name);
$cmd = undef;
if($self->{'beekeeper_type'} eq 'LSF') {
if($count>1) { $cmd = "bsub -JHL$loopCount\[1-$count\]";}
else { $cmd = "bsub -JHL$loopCount";}
$cmd .= " -m '" . $self->{'lsf_machine_option'} ."'" if($self->{'lsf_machine_option'});
$cmd .= " ".$worker_cmd;
} elsif(($self->{'beekeeper_type'} eq 'LOCAL') and
($self->get_local_running_count() < $self->{'local_cpus'}))
{
$cmd = "$worker_cmd &";
}
if($cmd) {
print("$cmd\n");
system($cmd);
}
}
$queen->get_hive_progress();
last if($self->{'max_loops'}>0 and ($loopCount >= $self->{'max_loops'}));
$DBA->dbc->disconnect_if_idle;
print("sleep $sleep_time minutes. Next loop at ",scalar localtime(time+$sleep_time*60),".\n");
sleep($sleep_time*60);
$loopCount++;
}
printf("dbc %d disconnect cycles\n", $DBA->dbc->disconnect_count);
}
sub get_lsf_pending_count {
my $self = shift;
return 0 if($self->{'no_pend_adjust'});
my $cmd = "bjobs | grep -c PEND";
my $pend_count = qx/$cmd/;
chomp($pend_count);
print("$pend_count workers queued on LSF but not running\n");
return $pend_count;
}
sub get_local_running_count {
my $self = shift;
my $cmd = "ps -a | grep runWorker | grep -v grep | wc -l";
my $run_count = qx/$cmd/;
chomp($run_count);
print("$run_count workers running locally\n");
return $run_count;
}
sub reset_all_jobs_for_analysis_id {
my $self = shift;
my $analysis = $self->{'dba'}->get_AnalysisAdaptor->
fetch_by_dbID($self->{'reset_all_jobs_for_analysis_id'});
$self->{'dba'}->get_AnalysisJobAdaptor->reset_all_jobs_for_analysis_id($analysis->dbID);
$self->{'dba'}->get_Queen->synchronize_AnalysisStats($analysis->stats);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment