Commit 004f42af authored by Leo Gordon's avatar Leo Gordon
Browse files

copied the Meadow stuff from the development branch

parent 8623c5d2
# A Meadow is an abstract interface for one of several implementations of Workers' process manager.
#
# A Meadow knows how to check&change the actual status of Workers
package Bio::EnsEMBL::Hive::Meadow;
use strict;
sub new {
my $class = shift @_;
return bless { @_ }, $class;
}
sub type { # should return 'LOCAL' or 'LSF'
return (reverse split(/::/, ref(shift @_)))[0];
}
sub responsible_for_worker {
my ($self, $worker) = @_;
return $worker->beekeeper() eq $self->type();
}
sub check_worker_is_alive {
my ($self, $worker) = @_;
die "Please use a derived method";
}
sub kill_worker {
my ($self, $worker) = @_;
die "Please use a derived method";
}
# --------------[(combinable) means of adjusting the number of submitted workers]----------------------
sub total_running_workers_limit { # if set and ->can('count_running_workers'),
# provides a cut-off on the number of workers being submitted
my $self = shift @_;
if(scalar(@_)) { # new value is being set (which can be undef)
$self->{'_total_running_workers_limit'} = shift @_;
}
return $self->{'_total_running_workers_limit'};
}
sub pending_adjust { # if set and ->can('count_pending_workers'),
# provides a cut-off on the number of workers being submitted
my $self = shift @_;
if(scalar(@_)) { # new value is being set (which can be undef)
$self->{'_pending_adjust'} = shift @_;
}
return $self->{'_pending_adjust'};
}
sub submitted_workers_limit { # if set, provides a cut-off on the number of workers being submitted
my $self = shift @_;
if(scalar(@_)) { # new value is being set (which can be undef)
$self->{'_submitted_workers_limit'} = shift @_;
}
return $self->{'_submitted_workers_limit'};
}
sub limit_workers {
my ($self, $worker_count, $hive_name) = @_;
if($self->can('count_pending_workers') and $self->pending_adjust()) {
my $pending_count = $self->count_pending_workers($hive_name);
$worker_count -= $pending_count;
}
if(defined(my $submit_limit = $self->submitted_workers_limit)) {
if($submit_limit < $worker_count) {
$worker_count = $submit_limit;
}
}
if($self->can('count_running_workers') and defined(my $total_limit = $self->total_running_workers_limit)) {
my $available_slots = $total_limit - $self->count_running_workers();
if($available_slots < $worker_count) {
$worker_count = $available_slots;
}
}
$worker_count = 0 if ($worker_count<0);
return $worker_count;
}
1;
# This is the 'Local' implementation of Meadow
package Bio::EnsEMBL::Hive::Meadow::LOCAL;
use strict;
use Sys::Hostname;
use base 'Bio::EnsEMBL::Hive::Meadow';
sub count_running_workers {
my $self = shift @_;
my $cmd = 'ps -a | grep runWorker.pl | grep -v grep | wc -l';
my $run_count = qx/$cmd/;
chomp($run_count);
return $run_count;
}
sub responsible_for_worker {
my ($self, $worker) = @_;
return ($worker->beekeeper() eq $self->type()) and ($worker->host eq hostname());
}
sub check_worker_is_alive {
my ($self, $worker) = @_;
my $cmd = 'ps '. $worker->process_id . ' 2>&1 | grep ' . $worker->process_id;
my $is_alive = qx/$cmd/;
return $is_alive;
}
sub kill_worker {
my ($self, $worker) = @_;
if( $self->responsible_for_worker($worker) ) {
if($self->check_worker_is_alive($worker)) {
my $cmd = 'kill -9 '.$worker->process_id();
system($cmd);
} else {
warn 'Cannot kill worker '.$worker->process_id().' because it is not running';
}
} else {
warn 'Cannot kill worker '.$worker->process_id().'@'.$worker->host.' it is probably running on a different host';
}
}
sub submit_workers {
my ($self, $worker_cmd, $worker_count, $jobname) = @_;
my $cmd = "$worker_cmd &";
foreach (1..$worker_count) {
print "SUBMITTING_CMD:\t\t$cmd\n";
system($cmd);
}
}
1;
# This is the 'LSF' implementation of Meadow
package Bio::EnsEMBL::Hive::Meadow::LSF;
use strict;
use base 'Bio::EnsEMBL::Hive::Meadow';
sub count_pending_workers {
my ($self, $name) = @_;
my $cmd;
if ($name) {
$cmd = "bjobs -w | grep '$name-HL' | grep -c PEND";
} else {
$cmd = "bjobs -w | grep -c PEND";
}
my $pend_count = qx/$cmd/;
chomp($pend_count);
return $pend_count;
}
sub check_worker_is_alive {
my ($self, $worker) = @_;
my $cmd = 'bjobs '. $worker->process_id . ' 2>&1 | grep -v "not found" | grep -v JOBID | grep -v EXIT';
my $is_alive = qx/$cmd/;
return $is_alive;
}
sub kill_worker {
my ($self, $worker) = @_;
if($self->check_worker_is_alive($worker)) {
my $cmd = 'bkill '.$worker->process_id();
system($cmd);
} else {
warn 'Cannot kill worker '.$worker->process_id().' because it is not running';
}
}
sub lsf_options {
my $self = shift @_;
if(scalar(@_)) {
$self->{'_lsf_options'} = shift @_;
}
return $self->{'_lsf_options'} || '';
}
sub submit_workers {
my ($self, $worker_cmd, $worker_count, $jobname) = @_;
if($worker_count>1) {
$jobname .= "[1-${worker_count}]";
}
my $lsf_options = $self->lsf_options();
my $cmd = "bsub -o /dev/null -J\"${jobname}\" $lsf_options $worker_cmd";
print "SUBMITTING_CMD:\t\t$cmd\n";
system($cmd);
}
1;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment