Commit 41aa6834 authored by Leo Gordon's avatar Leo Gordon
Browse files

*** empty log message ***

parent 8a07e8f7
......@@ -16,12 +16,35 @@ sub type { # should return 'LOCAL' or 'LSF'
return (reverse split(/::/, ref(shift @_)))[0];
}
sub pipeline_name { # if set, provides a filter for job-related queries
my $self = shift @_;
if(scalar(@_)) { # new value is being set (which can be undef)
$self->{'_pipeline_name'} = shift @_;
}
return $self->{'_pipeline_name'};
}
sub generate_job_name {
my ($self, $worker_count, $iteration) = @_;
return ($self->pipeline_name() ? $self->pipeline_name().'-' : '').'HL'.$iteration
. (($worker_count > 1) ? "[1-${worker_count}]" : '');
}
sub responsible_for_worker {
my ($self, $worker) = @_;
return $worker->beekeeper() eq $self->type();
}
sub status_of_all_my_workers { # Returns undef if it is not implemented in the derived class (which returns a hashref).
# You should check the return value and use other means if it is not defined.
my ($self) = @_;
return undef;
}
sub check_worker_is_alive {
my ($self, $worker) = @_;
......@@ -66,10 +89,10 @@ sub submitted_workers_limit { # if set, provides a cut-off on the number of work
}
sub limit_workers {
my ($self, $worker_count, $hive_name) = @_;
my ($self, $worker_count) = @_;
if($self->can('count_pending_workers') and $self->pending_adjust()) {
my $pending_count = $self->count_pending_workers($hive_name);
my $pending_count = $self->count_pending_workers();
$worker_count -= $pending_count;
}
......
......@@ -47,7 +47,7 @@ sub kill_worker {
}
sub submit_workers {
my ($self, $worker_cmd, $worker_count, $jobname) = @_;
my ($self, $worker_cmd, $worker_count, $iteration) = @_;
my $cmd = "$worker_cmd &";
......
......@@ -7,20 +7,36 @@ use strict;
use base 'Bio::EnsEMBL::Hive::Meadow';
sub count_pending_workers {
my ($self, $name) = @_;
my ($self) = @_;
my $cmd;
if ($name) {
$cmd = "bjobs -w | grep '$name-HL' | grep -c PEND";
} else {
$cmd = "bjobs -w | grep -c PEND";
my $cmd = "bjobs -w ";
if(my $pipeline_name = $self->pipeline_name()) {
$cmd .= " | grep '${pipeline_name}-HL'";
}
$cmd .= " | grep -c PEND";
my $pend_count = qx/$cmd/;
chomp($pend_count);
return $pend_count;
}
sub status_of_all_my_workers { # returns a hashref
my ($self) = @_;
my $cmd = 'bjobs -w 2>&1 | grep -v "No unfinished job found" | grep -v JOBID | grep -v DONE | grep -v EXIT';
if(my $pipeline_name = $self->pipeline_name()) {
$cmd .= " | grep '${pipeline_name}-HL'";
}
my %status_hash = ();
foreach my $line (`$cmd`) {
my ($worker_pid, $user, $status, $queue) = split(/\s+/, $line);
$status_hash{$worker_pid} = $status;
}
return \%status_hash;
}
sub check_worker_is_alive {
my ($self, $worker) = @_;
......@@ -50,14 +66,12 @@ sub lsf_options {
}
sub submit_workers {
my ($self, $worker_cmd, $worker_count, $jobname) = @_;
if($worker_count>1) {
$jobname .= "[1-${worker_count}]";
}
my ($self, $worker_cmd, $worker_count, $iteration) = @_;
my $job_name = $self->generate_job_name($worker_count, $iteration);
my $lsf_options = $self->lsf_options();
my $cmd = "bsub -o /dev/null -J\"${jobname}\" $lsf_options $worker_cmd";
my $cmd = "bsub -o /dev/null -J\"${job_name}\" $lsf_options $worker_cmd";
print "SUBMITTING_CMD:\t\t$cmd\n";
system($cmd);
......
......@@ -345,10 +345,20 @@ sub fetch_overdue_workers {
sub fetch_failed_workers {
my $self = shift;
my $constraint = "h.cause_of_death='FATALITY' ";
return $self->_generic_fetch($constraint);
}
sub fetch_dead_workers_with_jobs {
my $self = shift;
# select h.hive_id from hive h, analysis_job WHERE h.hive_id=analysis_job.hive_id AND h.cause_of_death AND analysis_job.status not in ('DONE', 'READY','FAILED') group by h.hive_id
my $constraint = "h.cause_of_death ";
my $join = [[['analysis_job', 'j'], " h.hive_id=j.hive_id AND j.status NOT IN ('DONE', 'READY', 'FAILED') GROUP BY h.hive_id"]];
return $self->_generic_fetch($constraint, $join);
}
=head2 synchronize_hive
......@@ -371,16 +381,21 @@ sub synchronize_hive {
my $list_of_analyses = $this_analysis ? [$this_analysis] : $self->db->get_AnalysisAdaptor->fetch_all;
print "Synchronizing the hive (".scalar(@$list_of_analyses)." analyses this time) \n";
print STDERR "\nSynchronizing the hive (".scalar(@$list_of_analyses)." analyses this time):\n";
foreach my $analysis (@$list_of_analyses) {
$self->synchronize_AnalysisStats($analysis->stats);
print STDERR '.';
}
print STDERR "\n";
print STDERR "Checking blocking control rules:\n";
foreach my $analysis (@$list_of_analyses) {
$self->check_blocking_control_rules_for_AnalysisStats($analysis->stats);
print STDERR '.';
}
print STDERR "\n";
print((time() - $start_time), " secs to synchronize_hive\n");
print STDERR ''.((time() - $start_time))." seconds to synchronize_hive\n\n";
}
......@@ -743,15 +758,15 @@ sub print_analysis_status {
}
sub print_running_worker_status
{
sub print_running_worker_status {
my $self = shift;
my $total = 0;
print("HIVE LIVE WORKERS====\n");
print "====== Live workers according to Queen:\n";
my $sql = "select logic_name, count(*) from hive, analysis ".
"where hive.analysis_id=analysis.analysis_id and hive.cause_of_death='' ".
"group by hive.analysis_id";
my $total = 0;
my $sth = $self->prepare($sql);
$sth->execute();
while((my $logic_name, my $count)=$sth->fetchrow_array()) {
......@@ -759,7 +774,7 @@ sub print_running_worker_status
$total += $count;
}
printf(" %d total workers\n", $total);
print("=====================\n");
print "===========================\n";
$sth->finish;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment