Meadow.pm 3.03 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# A Meadow is an abstract interface for one of several implementations of Workers' process manager.
#
# A Meadow knows how to check&change the actual status of Workers

package Bio::EnsEMBL::Hive::Meadow;

use strict;

sub new {
    my $class = shift @_;

    return bless { @_ }, $class;
}

sub type { # should return 'LOCAL' or 'LSF'
    return (reverse split(/::/, ref(shift @_)))[0];
}

Leo Gordon's avatar
Leo Gordon committed
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
sub pipeline_name { # if set, provides a filter for job-related queries
    my $self = shift @_;

    if(scalar(@_)) { # new value is being set (which can be undef)
        $self->{'_pipeline_name'} = shift @_;
    }
    return $self->{'_pipeline_name'};
}

sub generate_job_name {
    my ($self, $worker_count, $iteration) = @_;

    return ($self->pipeline_name() ? $self->pipeline_name().'-' : '').'HL'.$iteration
         . (($worker_count > 1) ? "[1-${worker_count}]" : '');
}

35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
sub responsible_for_worker {
    my ($self, $worker) = @_;

    return $worker->beekeeper() eq $self->type();
}

sub check_worker_is_alive {
    my ($self, $worker) = @_;

    die "Please use a derived method";
}

sub kill_worker {
    my ($self, $worker) = @_;

    die "Please use a derived method";
}

# --------------[(combinable) means of adjusting the number of submitted workers]----------------------

sub total_running_workers_limit { # if set and ->can('count_running_workers'),
                                  # provides a cut-off on the number of workers being submitted
    my $self = shift @_;

    if(scalar(@_)) { # new value is being set (which can be undef)
        $self->{'_total_running_workers_limit'} = shift @_;
    }
    return $self->{'_total_running_workers_limit'};
}

sub pending_adjust { # if set and ->can('count_pending_workers'),
                     # provides a cut-off on the number of workers being submitted
    my $self = shift @_;

    if(scalar(@_)) { # new value is being set (which can be undef)
        $self->{'_pending_adjust'} = shift @_;
    }
    return $self->{'_pending_adjust'};
}

sub submitted_workers_limit { # if set, provides a cut-off on the number of workers being submitted
    my $self = shift @_;

    if(scalar(@_)) { # new value is being set (which can be undef)
        $self->{'_submitted_workers_limit'} = shift @_;
    }
    return $self->{'_submitted_workers_limit'};
}

sub limit_workers {
Leo Gordon's avatar
Leo Gordon committed
85
    my ($self, $worker_count) = @_;
86 87

    if($self->can('count_pending_workers') and $self->pending_adjust()) {
Leo Gordon's avatar
Leo Gordon committed
88
        my $pending_count = $self->count_pending_workers();
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113

        $worker_count -= $pending_count;
    }

    if(defined(my $submit_limit = $self->submitted_workers_limit)) {
        if($submit_limit < $worker_count) {

            $worker_count = $submit_limit;
        }
    }

    if($self->can('count_running_workers') and defined(my $total_limit = $self->total_running_workers_limit)) {
        my $available_slots = $total_limit - $self->count_running_workers();
        if($available_slots < $worker_count) {

            $worker_count = $available_slots;
        }
    }

    $worker_count = 0 if ($worker_count<0);

    return $worker_count;
}

1;