Commit 8773ee3f authored by Leo Gordon's avatar Leo Gordon
Browse files

hive_id renamed worker_id

parent 1b5aa1d0
......@@ -56,10 +56,10 @@ sub input_id {
return $self->{'_input_id'};
}
sub hive_id {
sub worker_id {
my $self = shift;
$self->{'_hive_id'} = shift if(@_);
return $self->{'_hive_id'};
$self->{'_worker_id'} = shift if(@_);
return $self->{'_worker_id'};
}
sub analysis_id {
......
......@@ -182,11 +182,11 @@ sub fetch_by_claim_analysis {
}
sub fetch_by_run_analysis {
my ($self,$hive_id,$analysis_id) = @_;
my ($self,$worker_id,$analysis_id) = @_;
throw("fetch_by_run_analysis must have hive_id") unless($hive_id);
throw("fetch_by_run_analysis must have worker_id") unless($worker_id);
throw("fetch_by_run_analysis must have analysis_id") unless($analysis_id);
my $constraint = "a.status='RUN' AND a.hive_id=$hive_id AND a.analysis_id='$analysis_id'";
my $constraint = "a.status='RUN' AND a.worker_id=$worker_id AND a.analysis_id='$analysis_id'";
return $self->_generic_fetch($constraint);
}
......@@ -323,7 +323,7 @@ sub _columns {
a.analysis_id
a.input_id
a.job_claim
a.hive_id
a.worker_id
a.status
a.retry_count
a.completed
......@@ -360,7 +360,7 @@ sub _objs_from_sth {
$job->analysis_id($column{'analysis_id'});
$job->input_id($column{'input_id'});
$job->job_claim($column{'job_claim'});
$job->hive_id($column{'hive_id'});
$job->worker_id($column{'worker_id'});
$job->status($column{'status'});
$job->retry_count($column{'retry_count'});
$job->completed($column{'completed'});
......@@ -425,11 +425,11 @@ sub reclaim_job {
my $uuid = $ug->create();
$job->job_claim($ug->to_string( $uuid ));
my $sql = "UPDATE analysis_job SET status='CLAIMED', job_claim=?, hive_id=? WHERE analysis_job_id=?";
my $sql = "UPDATE analysis_job SET status='CLAIMED', job_claim=?, worker_id=? WHERE analysis_job_id=?";
#print("$sql\n");
my $sth = $self->prepare($sql);
$sth->execute($job->job_claim, $job->hive_id, $job->dbID);
$sth->execute($job->job_claim, $job->worker_id, $job->dbID);
$sth->finish;
}
......@@ -450,19 +450,19 @@ sub store_out_files {
return unless($job);
my $sql = sprintf("DELETE from analysis_job_file WHERE hive_id=%d and analysis_job_id=%d",
$job->hive_id, $job->dbID);
my $sql = sprintf("DELETE from analysis_job_file WHERE worker_id=%d and analysis_job_id=%d",
$job->worker_id, $job->dbID);
$self->dbc->do($sql);
return unless($job->stdout_file or $job->stderr_file);
$sql = "INSERT ignore INTO analysis_job_file (analysis_job_id, hive_id, retry, type, path) VALUES ";
$sql = "INSERT ignore INTO analysis_job_file (analysis_job_id, worker_id, retry, type, path) VALUES ";
if($job->stdout_file) {
$sql .= sprintf("(%d,%d,%d,'STDOUT','%s')", $job->dbID, $job->hive_id,
$sql .= sprintf("(%d,%d,%d,'STDOUT','%s')", $job->dbID, $job->worker_id,
$job->retry_count, $job->stdout_file);
}
$sql .= "," if($job->stdout_file and $job->stderr_file);
if($job->stderr_file) {
$sql .= sprintf("(%d,%d,%d,'STDERR','%s')", $job->dbID, $job->hive_id,
$sql .= sprintf("(%d,%d,%d,'STDERR','%s')", $job->dbID, $job->worker_id,
$job->retry_count, $job->stderr_file);
}
......@@ -479,10 +479,10 @@ sub claim_jobs_for_worker {
my $ug = new Data::UUID;
my $uuid = $ug->create();
my $claim = $ug->to_string( $uuid );
#print("claiming jobs for hive_id=", $worker->hive_id, " with uuid $claim\n");
#print("claiming jobs for worker_id=", $worker->worker_id, " with uuid $claim\n");
my $sql_base = "UPDATE analysis_job SET job_claim='$claim'".
" , hive_id='". $worker->hive_id ."'".
" , worker_id='". $worker->worker_id ."'".
" , status='CLAIMED'".
" WHERE job_claim='' and status='READY'".
" AND analysis_id='" .$worker->analysis->dbID. "'";
......@@ -523,27 +523,27 @@ sub reset_dead_jobs_for_worker {
my $worker = shift;
throw("must define worker") unless($worker);
#added hive_id index to analysis_job table which made this operation much faster
#added worker_id index to analysis_job table which made this operation much faster
my ($sql, $sth);
my $max_retry_count = $worker->analysis->stats->max_retry_count();
#first just reset the claimed jobs, these don't need a retry_count index increment
$sql = "UPDATE analysis_job SET job_claim='', status='READY'".
" WHERE status='CLAIMED'".
" AND hive_id='" . $worker->hive_id ."'";
" AND worker_id='" . $worker->worker_id ."'";
$sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
#print(" done update CLAIMED\n");
# an update with select on status and hive_id took 4seconds per worker to complete,
# an update with select on status and worker_id took 4seconds per worker to complete,
# while doing a select followed by update on analysis_job_id returned almost instantly
$sql = "UPDATE analysis_job SET job_claim='', status='READY'".
" ,retry_count=retry_count+1".
" WHERE status in ('GET_INPUT','RUN','WRITE_OUTPUT')".
" AND retry_count<$max_retry_count".
" AND hive_id='" . $worker->hive_id ."'";
" AND worker_id='" . $worker->worker_id ."'";
#print("$sql\n");
$sth = $self->prepare($sql);
$sth->execute();
......@@ -553,7 +553,7 @@ sub reset_dead_jobs_for_worker {
" ,retry_count=retry_count+1".
" WHERE status in ('GET_INPUT','RUN','WRITE_OUTPUT')".
" AND retry_count>=$max_retry_count".
" AND hive_id='" . $worker->hive_id ."'";
" AND worker_id='" . $worker->worker_id ."'";
#print("$sql\n");
$sth = $self->prepare($sql);
$sth->execute();
......@@ -567,7 +567,7 @@ sub reset_dead_job_by_dbID {
my $self = shift;
my $job_id = shift;
#added hive_id index to analysis_job table which made this operation much faster
#added worker_id index to analysis_job table which made this operation much faster
my $sql;
#first just reset the claimed jobs, these don't need a retry_count index increment
......@@ -577,7 +577,7 @@ sub reset_dead_job_by_dbID {
$self->dbc->do($sql);
#print(" done update CLAIMED\n");
# an update with select on status and hive_id took 4seconds per worker to complete,
# an update with select on status and worker_id took 4seconds per worker to complete,
# while doing a select followed by update on analysis_job_id returned almost instantly
$sql = "
......@@ -624,7 +624,7 @@ sub reset_job_by_dbID {
my ($sql, $sth);
#first just reset the claimed jobs, these don't need a retry_count index increment
$sql = "UPDATE analysis_job SET hive_id=0, job_claim='', status='READY', retry_count=0 WHERE analysis_job_id=?";
$sql = "UPDATE analysis_job SET worker_id=0, job_claim='', status='READY', retry_count=0 WHERE analysis_job_id=?";
$sth = $self->prepare($sql);
$sth->execute($analysis_job_id);
$sth->finish;
......
......@@ -143,10 +143,10 @@ sub create_new_worker {
my $sth = $self->prepare($sql);
$sth->execute($pid, $analysisStats->analysis_id, $beekeeper, $host);
my $hive_id = $sth->{'mysql_insertid'};
my $worker_id = $sth->{'mysql_insertid'};
$sth->finish;
my $worker = $self->_fetch_by_hive_id($hive_id);
my $worker = $self->fetch_by_worker_id($worker_id);
$worker=undef unless($worker and $worker->analysis);
if($worker and $analysisStats) {
......@@ -176,7 +176,7 @@ sub register_worker_death {
$sql .= " ,status='DEAD'";
$sql .= " ,work_done='" . $worker->work_done . "'";
$sql .= " ,cause_of_death='". $worker->cause_of_death ."'";
$sql .= " WHERE hive_id='" . $worker->hive_id ."'";
$sql .= " WHERE worker_id='" . $worker->worker_id ."'";
my $sth = $self->prepare($sql);
$sth->execute();
......@@ -186,7 +186,7 @@ sub register_worker_death {
$self->db->get_AnalysisStatsAdaptor->update_status($worker->analysis->dbID, "ALL_CLAIMED");
}
if($worker->cause_of_death eq "FATALITY") {
#print("FATAL DEATH Arrrrgggghhhhhhhh (hive_id=",$worker->hive_id,")\n");
#print("FATAL DEATH Arrrrgggghhhhhhhh (worker_id=",$worker->worker_id,")\n");
$self->db->get_AnalysisJobAdaptor->reset_dead_jobs_for_worker($worker);
}
......@@ -206,7 +206,7 @@ sub worker_check_in {
return unless($worker);
my $sql = "UPDATE hive SET last_check_in=now()";
$sql .= " ,work_done='" . $worker->work_done . "'";
$sql .= " WHERE hive_id='" . $worker->hive_id ."'";
$sql .= " WHERE worker_id='" . $worker->worker_id ."'";
my $sth = $self->prepare($sql);
$sth->execute();
......@@ -281,7 +281,7 @@ sub worker_reclaim_job {
my $job = shift;
return undef unless($job and $worker);
$job->hive_id($worker->hive_id);
$job->worker_id($worker->worker_id);
$self->db->get_AnalysisJobAdaptor->reclaim_job($job);
return $job;
}
......@@ -293,7 +293,7 @@ sub worker_register_job_done {
my $job = shift;
return unless($job);
return unless($job->dbID and $job->adaptor and $job->hive_id);
return unless($job->dbID and $job->adaptor and $job->worker_id);
return unless($worker and $worker->analysis and $worker->analysis->dbID);
$job->update_status('DONE');
......@@ -354,10 +354,10 @@ sub fetch_failed_workers {
sub fetch_dead_workers_with_jobs {
my $self = shift;
# select h.hive_id from hive h, analysis_job WHERE h.hive_id=analysis_job.hive_id AND h.cause_of_death!='' AND analysis_job.status not in ('DONE', 'READY','FAILED') group by h.hive_id
# select h.worker_id from hive h, analysis_job WHERE h.worker_id=analysis_job.worker_id AND h.cause_of_death!='' AND analysis_job.status not in ('DONE', 'READY','FAILED') group by h.worker_id
my $constraint = "h.cause_of_death!='' ";
my $join = [[['analysis_job', 'j'], " h.hive_id=j.hive_id AND j.status NOT IN ('DONE', 'READY', 'FAILED') GROUP BY h.hive_id"]];
my $join = [[['analysis_job', 'j'], " h.worker_id=j.worker_id AND j.status NOT IN ('DONE', 'READY', 'FAILED') GROUP BY h.worker_id"]];
return $self->_generic_fetch($constraint, $join);
}
......@@ -625,7 +625,7 @@ sub get_num_running_workers {
sub enter_status {
my ($self, $worker, $status) = @_;
$self->dbc->do("UPDATE hive SET status = '$status' WHERE hive_id = ".$worker->hive_id);
$self->dbc->do("UPDATE hive SET status = '$status' WHERE worker_id = ".$worker->worker_id);
}
=head2 get_num_needed_workers
......@@ -844,7 +844,7 @@ sub _pick_best_analysis_for_new_worker {
}
=head2 _fetch_by_hive_id
=head2 fetch_by_worker_id
Arg [1] : int $id
the unique database identifier for the feature to be obtained
......@@ -857,7 +857,7 @@ sub _pick_best_analysis_for_new_worker {
=cut
sub _fetch_by_hive_id {
sub fetch_by_worker_id {
my ($self,$id) = @_;
unless(defined $id) {
......@@ -866,7 +866,7 @@ sub _fetch_by_hive_id {
my @tabs = $self->_tables;
my $constraint = "h.hive_id = $id";
my $constraint = "h.worker_id = $id";
#return first element of _generic_fetch list
my ($obj) = @{$self->_generic_fetch($constraint)};
......@@ -950,7 +950,7 @@ sub _tables {
sub _columns {
my $self = shift;
return qw (h.hive_id
return qw (h.worker_id
h.analysis_id
h.beekeeper
h.host
......@@ -976,7 +976,7 @@ sub _objs_from_sth {
my $worker = new Bio::EnsEMBL::Hive::Worker;
$worker->init;
$worker->hive_id($column{'hive_id'});
$worker->worker_id($column{'worker_id'});
$worker->beekeeper($column{'beekeeper'});
$worker->host($column{'host'});
$worker->process_id($column{'process_id'});
......
......@@ -240,10 +240,10 @@ sub job_limit_reached {
sub hive_id {
sub worker_id {
my( $self, $value ) = @_;
$self->{'_hive_id'} = $value if($value);
return $self->{'_hive_id'};
$self->{'_worker_id'} = $value if($value);
return $self->{'_worker_id'};
}
sub host {
......@@ -306,11 +306,11 @@ use Digest::MD5 qw(md5_hex);
sub output_dir {
my ($self, $outdir) = @_;
if ($outdir and (-d $outdir)) {
my $hive_id = $self->hive_id;
my (@hex) = md5_hex($hive_id) =~ m/\G(..)/g;
my $worker_id = $self->worker_id;
my (@hex) = md5_hex($worker_id) =~ m/\G(..)/g;
# If you want more than one level of directories, change $hex[0]
# below into an array slice. e.g @hex[0..1] for two levels.
$outdir = join('/', $outdir, $hex[0], 'hive_id_' . $hive_id);
$outdir = join('/', $outdir, $hex[0], 'worker_id' . $worker_id);
system("mkdir -p $outdir") && die "Could not create $outdir\n";
$self->{'_output_dir'} = $outdir;
}
......@@ -327,7 +327,7 @@ sub perform_global_cleanup {
sub print_worker {
my $self = shift;
print("WORKER: hive_id=",$self->hive_id,
print("WORKER: worker_id=",$self->worker_id,
" analysis_id=(",$self->analysis->dbID,")",$self->analysis->logic_name,
" host=",$self->host,
" pid=",$self->process_id,
......@@ -521,7 +521,7 @@ sub run
}
} while (!$self->cause_of_death); # /Worker's lifespan loop
$self->queen->dbc->do("UPDATE hive SET status = 'DEAD' WHERE hive_id = ".$self->hive_id);
$self->queen->dbc->do("UPDATE hive SET status = 'DEAD' WHERE worker_id = ".$self->worker_id);
if($self->perform_global_cleanup) {
#have runnable cleanup any global/process files/data it may have created
......@@ -555,7 +555,7 @@ sub run_module_with_job
my $runObj = $self->analysis->process;
return 0 unless($runObj);
return 0 unless($job and ($job->hive_id eq $self->hive_id));
return 0 unless($job and ($job->worker_id eq $self->worker_id));
my $init_time = time() * 1000;
$self->queen->dbc->query_count(0);
......
......@@ -176,12 +176,12 @@ sub main {
if($check_for_dead) { check_for_dead_workers($self, $queen, 1); }
if ($kill_worker_id) {
my $worker = $queen->_fetch_by_hive_id($kill_worker_id);
my $worker = $queen->fetch_by_worker_id($kill_worker_id);
if( $self->{'meadow'}->responsible_for_worker($worker)
and not defined($worker->cause_of_death())) {
printf("KILL: %10d %35s %15s %20s(%d) : ",
$worker->hive_id, $worker->host, $worker->process_id,
$worker->worker_id, $worker->host, $worker->process_id,
$worker->analysis->logic_name, $worker->analysis->dbID);
$self->{'meadow'}->kill_worker($worker);
......@@ -311,7 +311,7 @@ sub show_given_workers {
foreach my $worker (@{$worker_list}) {
printf("%10d %35s(%5d) %5s:%15s %15s (%s)\n",
$worker->hive_id,
$worker->worker_id,
$worker->analysis->logic_name,
$worker->analysis->dbID,
$worker->beekeeper,
......
#!/usr/local/ensembl/bin/perl -w
use strict;
use DBI;
use Getopt::Long;
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::Queen;
use Bio::EnsEMBL::Hive::URLFactory;
use Sys::Hostname;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor;
# ok this is a hack, but I'm going to pretend I've got an object here
# by creating a blessed hash ref and passing it around like an object
# this is to avoid using global variables in functions, and to consolidate
# the globals into a nice '$self' package
my $self = bless {};
$self->{'db_conf'} = {};
$self->{'db_conf'}->{'-user'} = 'ensro';
$self->{'db_conf'}->{'-port'} = 3306;
$self->{'max_loops'} = 0; #unlimited
my $conf_file;
my ($help, $host, $user, $pass, $dbname, $port, $adaptor, $url);
my ($job_limit, $batch_size);
my $loopit=0;
my $worker_limit = 50;
my $sleep_time = 5;
my $sync=0;
$self->{'overdue_limit'} = 75; #minutes
$self->{'show_analysis_stats'} = undef;
$self->{'show_worker_stats'} = undef;
GetOptions('help' => \$help,
'url=s' => \$url,
'conf=s' => \$conf_file,
'dbhost=s' => \$host,
'dbport=i' => \$port,
'dbuser=s' => \$user,
'dbpass=s' => \$pass,
'dbname=s' => \$dbname,
'dead' => \$self->{'check_for_dead'},
'overdue' => \$self->{'overdue_limit'},
'alldead' => \$self->{'all_dead'},
'run' => \$self->{'run'},
'jlimit=i' => \$job_limit,
'wlimit=i' => \$worker_limit,
'batch_size=i' => \$batch_size,
'loop' => \$loopit,
'sync' => \$sync,
'analysis_stats' => \$self->{'show_analysis_stats'},
'worker_stats' => \$self->{'show_worker_stats'},
'sleep=i' => \$sleep_time,
'logic_name=s' => \$self->{'logic_name'},
);
if ($help) { usage(); }
parse_conf($self, $conf_file);
if($self->{'run'}) {
$loopit = 1;
$self->{'max_loops'} = 1;
}
my $DBA;
if($url) {
$DBA = Bio::EnsEMBL::Hive::URLFactory->fetch($url);
die("Unable to connect to $url\n") unless($DBA);
} else {
if($host) { $self->{'db_conf'}->{'-host'} = $host; }
if($port) { $self->{'db_conf'}->{'-port'} = $port; }
if($dbname) { $self->{'db_conf'}->{'-dbname'} = $dbname; }
if($user) { $self->{'db_conf'}->{'-user'} = $user; }
if($pass) { $self->{'db_conf'}->{'-pass'} = $pass; }
unless(defined($self->{'db_conf'}->{'-host'})
and defined($self->{'db_conf'}->{'-user'})
and defined($self->{'db_conf'}->{'-dbname'}))
{
print "\nERROR : must specify host, user, and database to connect\n\n";
usage();
}
# connect to database specified
$DBA = new Bio::EnsEMBL::Hive::DBSQL::DBAdaptor(%{$self->{'db_conf'}});
}
my $queen = $DBA->get_Queen;
if($self->{'all_dead'}) { register_all_workers_dead($self, $queen); }
if($self->{'check_for_dead'}) { check_for_dead_workers($self, $queen); }
my $analysis = $DBA->get_AnalysisAdaptor->fetch_by_logic_name($self->{'logic_name'});
if($loopit) {
run_autonomously($self, $queen);
} elsif($analysis) {
my $stats = $analysis->stats;
if($sync) {
$queen->synchronize_AnalysisStats($stats);
$queen->check_blocking_control_rules_for_AnalysisStats($stats);
}
$stats->print_stats;
$queen->get_num_needed_workers();
} else {
$queen->synchronize_hive() if($sync);
$queen->print_analysis_status if($self->{'show_analysis_stats'});
$queen->print_running_worker_status;
show_running_workers($self, $queen) if($self->{'show_worker_stats'});
$queen->get_num_running_workers();
$queen->get_num_needed_workers();
$queen->get_hive_progress();
}
exit(0);
#######################
#
# subroutines
#
#######################
sub usage {
print "lsf_beekeeper.pl [options]\n";
print " -help : print this help\n";
print " -url <url string> : url defining where hive database is located\n";
print " -conf <path> : config file describing db connection\n";
print " -dbhost <machine> : mysql database host <machine>\n";
print " -dbport <port#> : mysql port number\n";
print " -dbname <name> : mysql database <name>\n";
print " -dbuser <name> : mysql connection user <name>\n";
print " -dbpass <pass> : mysql connection password\n";
print " -batch_size <num> : #jobs a worker can claim at once\n";
print " -jlimit <num> : #jobs to run before worker can die naturally\n";
print " -dead : clean overdue jobs for resubmission\n";
print " -alldead : all outstanding workers\n";
print " -run : run 1 iteration of automation loop\n";
print " -loop : run autonomously, loops and sleeps\n";
print " -sleep <num> : when looping, sleep <num> minutes (default 5)\n";
print " -wlimit <num> : max # workers to create per loop\n";
print " -analysis_stats : show status of each analysis\n";
print " -worker_stats : show status of each running worker\n";
print "lsf_beekeeper.pl v1.3\n";
exit(1);
}
sub parse_conf {
my $self = shift;
my $conf_file = shift;
if($conf_file and (-e $conf_file)) {
#read configuration file from disk
my @conf_list = @{do $conf_file};
foreach my $confPtr (@conf_list) {
#print("HANDLE type " . $confPtr->{TYPE} . "\n");
if(($confPtr->{TYPE} eq 'COMPARA') or ($confPtr->{TYPE} eq 'DATABASE')) {
$self->{'db_conf'} = $confPtr;
}
}
}
}
sub check_for_dead_workers {
my $self = shift;
my $queen = shift;
print("===== check for dead workers\n");
my $overdueWorkers = $queen->fetch_overdue_workers($self->{'overdue_limit'}*60);
print(scalar(@{$overdueWorkers}), " overdue workers\n");
foreach my $worker (@{$overdueWorkers}) {
if($worker->beekeeper eq 'LSF') {
printf("%10d %35s %15s %20s(%d) : ", $worker->hive_id,$worker->host,$worker->process_id, $worker->analysis->logic_name, $worker->analysis->dbID);
my $cmd = "bjobs ". $worker->process_id . " 2>&1 | grep -v 'not found' | grep -v JOBID | grep -v EXIT";
#print(" check worker with : $cmd\n");
my $check = qx/$cmd/;
unless($check) {
print("worker is missing => it DIED!!\n");
$queen->register_worker_death($worker);
}
else {
print("ALIVE and running\n");
}
}
}
}
sub register_all_workers_dead {
my $self = shift;
my $queen = shift;
my $overdueWorkers = $queen->fetch_overdue_workers(0);
foreach my $worker (@{$overdueWorkers}) {
$queen->register_worker_death($worker);
}
}
sub show_overdue_workers {
my $self = shift;
my $queen = shift;
print("===== overdue workers\n");
my $overdueWorkers = $queen->fetch_overdue_workers($self->{'overdue_limit'}*60);
foreach my $worker (@{$overdueWorkers}) {
printf("%10d %35s %15s %20s(%d)\n", $worker->hive_id,$worker->host,$worker->process_id, $worker->analysis->logic_name, $worker->analysis->dbID);
}
}
sub show_running_workers {
my $self = shift;
my $queen = shift;
print("===== running workers\n");
my $worker_list = $queen->fetch_overdue_workers(0);
foreach my $worker (@{$worker_list}) {
printf("%10d %35s(%5d) %5s:%15s %15s (%s)\n",
$worker->hive_id,
$worker->analysis->logic_name,
$worker->analysis->dbID,
$worker->beekeeper,
$worker->process_id,