Skip to content
Snippets Groups Projects
Commit 8cfe3d0f authored by Will Spooner's avatar Will Spooner
Browse files

Brought up-to-date with lsf_beekeeper

parent b584611b
No related branches found
No related tags found
No related merge requests found
......@@ -19,13 +19,18 @@ my $self = bless {};
$self->{'db_conf'} = {};
$self->{'db_conf'}->{'-user'} = 'ensro';
$self->{'db_conf'}->{'-port'} = 3306;
$self->{'analysis_id'} = undef;
$self->{'outdir'} = "/ecs4/work2/ensembl/jessica/data/hive-output";
$self->{'max_loops'} = 0; #unlimited
my $conf_file;
my ($help, $host, $user, $pass, $dbname, $port, $adaptor, $url);
my ($limit, $batch_size);
my ($job_limit, $batch_size);
my $loopit=0;
my $worker_limit = 50;
my $sleep_time = 5;
my $sync=0;
$self->{'overdue_limit'} = 75; #minutes
$self->{'show_analysis_stats'} = undef;
$self->{'show_worker_stats'} = undef;
GetOptions('help' => \$help,
'url=s' => \$url,
......@@ -35,18 +40,30 @@ GetOptions('help' => \$help,
'dbuser=s' => \$user,
'dbpass=s' => \$pass,
'dbname=s' => \$dbname,
'dead' => \$self->{'all_dead'},
'run' => \$self->{'run'},
'limit=i' => \$limit,
'batch_size=i' => \$batch_size
'dead' => \$self->{'check_for_dead'},
'overdue' => \$self->{'overdue_limit'},
'alldead' => \$self->{'all_dead'},
'run' => \$self->{'run'},
'jlimit=i' => \$job_limit,
'wlimit=i' => \$worker_limit,
'batch_size=i' => \$batch_size,
'loop' => \$loopit,
'sync' => \$sync,
'analysis_stats' => \$self->{'show_analysis_stats'},
'worker_stats' => \$self->{'show_worker_stats'},
'sleep=i' => \$sleep_time,
'logic_name=s' => \$self->{'logic_name'},
);
$self->{'analysis_id'} = shift if(@_);
if ($help) { usage(); }
parse_conf($self, $conf_file);
if($self->{'run'}) {
$loopit = 1;
$self->{'max_loops'} = 1;
}
my $DBA;
if($url) {
......@@ -74,12 +91,36 @@ if($url) {
my $queen = $DBA->get_Queen;
if($self->{'all_dead'}) { check_for_dead_workers($self, $queen); }
if($self->{'all_dead'}) { register_all_workers_dead($self, $queen); }
if($self->{'check_for_dead'}) { check_for_dead_workers($self, $queen); }
my $analysis = $DBA->get_AnalysisAdaptor->fetch_by_logic_name($self->{'logic_name'});
if($loopit) {
run_autonomously($self, $queen);
} elsif($analysis) {
my $stats = $analysis->stats;
if($sync) {
$queen->synchronize_AnalysisStats($stats);
$queen->check_blocking_control_rules_for_AnalysisStats($stats);
}
$stats->print_stats;
$queen->get_num_needed_workers();
} else {
$queen->synchronize_hive() if($sync);
$queen->print_analysis_status if($self->{'show_analysis_stats'});
$queen->print_running_worker_status;
$queen->synchronize_hive();
$queen->print_hive_status;
show_running_workers($self, $queen) if($self->{'show_worker_stats'});
run_next_worker_clutch($self, $queen);
$queen->get_num_running_workers();
$queen->get_num_needed_workers();
$queen->get_hive_progress();
}
exit(0);
......@@ -91,7 +132,7 @@ exit(0);
#######################
sub usage {
print "local_beekeeper.pl [options]\n";
print "lsf_beekeeper.pl [options]\n";
print " -help : print this help\n";
print " -url <url string> : url defining where hive database is located\n";
print " -conf <path> : config file describing db connection\n";
......@@ -101,10 +142,16 @@ sub usage {
print " -dbuser <name> : mysql connection user <name>\n";
print " -dbpass <pass> : mysql connection password\n";
print " -batch_size <num> : #jobs a worker can claim at once\n";
print " -limit <num> : #jobs to run before worker can die naturally\n";
print " -run : show and run the needed jobs\n";
print " -jlimit <num> : #jobs to run before worker can die naturally\n";
print " -dead : clean overdue jobs for resubmission\n";
print "local_beekeeper.pl v1.0\n";
print " -alldead : all outstanding workers\n";
print " -run : run 1 iteration of automation loop\n";
print " -loop : run autonomously, loops and sleeps\n";
print " -sleep <num> : when looping, sleep <num> minutes (default 5)\n";
print " -wlimit <num> : max # workers to create per loop\n";
print " -analysis_stats : show status of each analysis\n";
print " -worker_stats : show status of each running worker\n";
print "lsf_beekeeper.pl v1.3\n";
exit(1);
}
......@@ -128,65 +175,137 @@ sub parse_conf {
}
sub run_next_worker_clutch
{
sub check_for_dead_workers {
my $self = shift;
my $queen = shift;
my $queen = shift;
print("===== check for dead workers\n");
my $overdueWorkers = $queen->fetch_overdue_workers($self->{'overdue_limit'}*60);
print(scalar(@{$overdueWorkers}), " overdue workers\n");
foreach my $worker (@{$overdueWorkers}) {
if($worker->beekeeper eq 'LSF') {
printf("%10d %35s %15s %20s(%d) : ", $worker->hive_id,$worker->host,$worker->process_id, $worker->analysis->logic_name, $worker->analysis->dbID);
my $cmd = "bjobs ". $worker->process_id . " 2>&1 | grep -v 'not found' | grep -v JOBID | grep -v EXIT";
#print(" check worker with : $cmd\n");
my $check = qx/$cmd/;
unless($check) {
print("worker is missing => it DIED!!\n");
$queen->register_worker_death($worker);
}
else {
print("ALIVE and running\n");
}
}
}
}
my $clutches = $queen->db->get_AnalysisStatsAdaptor->fetch_by_needed_workers();
print("\n");
foreach my $analysis_stats (@{$clutches}) {
##my($analysis_id, $count) = $queen->next_clutch();
#if($count>0) {
sub register_all_workers_dead {
my $self = shift;
my $queen = shift;
my $analysis_id = $analysis_stats->analysis_id;
my $count = $analysis_stats->num_required_workers;
my $analysis = $analysis_stats->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($analysis_id);
my $hive_capacity = $analysis_stats->hive_capacity;
my $overdueWorkers = $queen->fetch_overdue_workers(0);
foreach my $worker (@{$overdueWorkers}) {
$queen->register_worker_death($worker);
}
}
my $cmd;
my $worker_cmd = "./runWorker.pl -logic_name " . $analysis->logic_name;
$worker_cmd .= " -conf $conf_file" if($conf_file);
$worker_cmd .= " -url $url" if($url);
if (defined $limit) {
$worker_cmd .= " -limit $limit";
} elsif ($hive_capacity < 0) {
$worker_cmd .= " -limit " . $analysis_stats->batch_size;
}
$worker_cmd .= " -batch_size $batch_size" if (defined $batch_size);
sub show_overdue_workers {
my $self = shift;
my $queen = shift;
if($count>1) { $cmd = "bsub -JW$analysis_id\[1-$count\] $worker_cmd";}
else { $cmd = "bsub -JW$analysis_id $worker_cmd";}
print("$cmd\n");
system($cmd) if($self->{'run'});
print("===== overdue workers\n");
my $overdueWorkers = $queen->fetch_overdue_workers($self->{'overdue_limit'}*60);
foreach my $worker (@{$overdueWorkers}) {
printf("%10d %35s %15s %20s(%d)\n", $worker->hive_id,$worker->host,$worker->process_id, $worker->analysis->logic_name, $worker->analysis->dbID);
}
}
# return of bsub looks like this
#Job <6392054> is submitted to default queue <normal>.
sub show_running_workers {
my $self = shift;
my $queen = shift;
print("===== running workers\n");
my $worker_list = $queen->fetch_overdue_workers(0);
foreach my $worker (@{$worker_list}) {
printf("%10d %35s(%5d) %5s:%15s %15s (%s)\n",
$worker->hive_id,
$worker->analysis->logic_name,
$worker->analysis->dbID,
$worker->beekeeper,
$worker->process_id,
$worker->host,
$worker->last_check_in);
}
}
sub check_for_dead_workers {
sub run_autonomously {
my $self = shift;
my $queen = shift;
my $host = hostname;
my ($cmd, $worker_cmd);
my $loopCount=1;
while($loopit) {
print("\n=======lsf_beekeeper loop ** $loopCount **==========\n");
check_for_dead_workers($self, $queen);
my $runCount = $queen->get_num_running_workers();
my $load = $queen->get_hive_current_load();
my $count = $queen->get_num_needed_workers();
my $pend_count = $self->get_pending_count();
$count = $count - $pend_count;
if($load==0 and $count==0 and $runCount==0) {
#nothing running and nothing todo => do hard resync
print("*** nothing is happening => do a hard resync\n");
$queen->synchronize_hive();
$count = $queen->get_num_needed_workers();
}
$queen->print_hive_status() if($self->{'show_analysis_stats'});
$queen->print_worker_status() if($self->{'show_worker_stats'});
$count = $worker_limit if($count>$worker_limit);
if($count>0) {
print("need $count workers\n");
$worker_cmd = "runWorker.pl -bk LOCAL -url $url";
$worker_cmd .= " -limit $job_limit" if(defined $job_limit);
$worker_cmd .= " -batch_size $batch_size" if(defined $batch_size);
if($count>1) { $cmd = "$worker_cmd";}
else { $cmd = "$worker_cmd";}
print("$cmd\n");
system($cmd);
}
my $overdueWorkers = $queen->fetch_overdue_workers(0); #0 minutes means check all outstanding workers
print(scalar(@{$overdueWorkers}), " overdue workers\n");
foreach my $worker (@{$overdueWorkers}) {
printf("%10d %20s analysis_id=%d\n", $worker->hive_id,$worker->host, $worker->analysis->dbID);
#if(($worker->beekeeper eq '') and ($worker->host eq $host)) {
#print(" is one of mine\n");
my $cmd = "ps -p ". $worker->process_id;
my $check = qx/$cmd/;
last if($self->{'max_loops'}>0 and ($loopCount >= $self->{'max_loops'}));
$queen->register_worker_death($worker);
#}
$DBA->dbc->disconnect_if_idle;
#print("sleep $sleep_time minutes\n");
#sleep($sleep_time*60);
$loopCount++;
}
printf("dbc %d disconnect cycles\n", $DBA->dbc->disconnect_count);
}
sub get_pending_count {
return 0; # Not needed for local
my $self = shift;
my $cmd = "bjobs | grep -c PEND";
my $pend_count = qx/$cmd/;
chomp($pend_count);
print("$pend_count workers queued but not running\n");
return $pend_count;
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment