Meadow.pm 4 KB
Newer Older
1
2
3
4
5
6
# A Meadow is an abstract interface for one of several implementations of Workers' process manager.
#
# A Meadow knows how to check&change the actual status of Workers

package Bio::EnsEMBL::Hive::Meadow;

7
8
9
10
use Sys::Hostname;
use Bio::EnsEMBL::Hive::Meadow::LSF;
use Bio::EnsEMBL::Hive::Meadow::LOCAL;

11
12
13
14
15
use strict;

sub new {
    my $class = shift @_;

16
17
18
19
    unless($class=~/::/) {
        $class = 'Bio::EnsEMBL::Hive::Meadow'.$class;
    }

20
21
22
    return bless { @_ }, $class;
}

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
sub guess_current_type_pid_exechost {
    my $self = shift @_;

    my ($type, $pid);
    eval {
        $pid  = Bio::EnsEMBL::Hive::Meadow::LSF->get_current_worker_process_id();
        $type = 'LSF';
    };
    if($@) {
        $pid  = Bio::EnsEMBL::Hive::Meadow::LOCAL->get_current_worker_process_id();
        $type = 'LOCAL';
    }

    my $exechost = hostname();

    return ($type, $pid, $exechost);
}

41
42
43
44
sub type { # should return 'LOCAL' or 'LSF'
    return (reverse split(/::/, ref(shift @_)))[0];
}

Leo Gordon's avatar
Leo Gordon committed
45
46
47
48
49
50
51
52
53
sub pipeline_name { # if set, provides a filter for job-related queries
    my $self = shift @_;

    if(scalar(@_)) { # new value is being set (which can be undef)
        $self->{'_pipeline_name'} = shift @_;
    }
    return $self->{'_pipeline_name'};
}

54
55
56
57
58
59
60
61
62
sub meadow_options {    # general options that different Meadows can plug into the submission command
    my $self = shift @_;

    if(scalar(@_)) {
        $self->{'_meadow_options'} = shift @_;
    }
    return $self->{'_meadow_options'} || '';
}

63
64
65
66
67
68
sub job_name_prefix {
    my $self = shift @_;

    return ($self->pipeline_name() ? $self->pipeline_name().'-' : '') . 'Hive';
}

Leo Gordon's avatar
Leo Gordon committed
69
sub generate_job_name {
Leo Gordon's avatar
Leo Gordon committed
70
71
    my ($self, $worker_count, $iteration, $rc_id) = @_;
    $rc_id ||= 0;
Leo Gordon's avatar
Leo Gordon committed
72

73
74
    return $self->job_name_prefix()
        ."${rc_id}_${iteration}"
Leo Gordon's avatar
Leo Gordon committed
75
        . (($worker_count > 1) ? "[1-${worker_count}]" : '');
Leo Gordon's avatar
Leo Gordon committed
76
77
}

78
79
80
81
82
83
sub responsible_for_worker {
    my ($self, $worker) = @_;

    return $worker->beekeeper() eq $self->type();
}

84
sub check_worker_is_alive_and_mine {
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
    my ($self, $worker) = @_;

    die "Please use a derived method";
}

sub kill_worker {
    my ($self, $worker) = @_;

    die "Please use a derived method";
}

# --------------[(combinable) means of adjusting the number of submitted workers]----------------------

sub total_running_workers_limit { # if set and ->can('count_running_workers'),
                                  # provides a cut-off on the number of workers being submitted
    my $self = shift @_;

    if(scalar(@_)) { # new value is being set (which can be undef)
        $self->{'_total_running_workers_limit'} = shift @_;
    }
    return $self->{'_total_running_workers_limit'};
}

sub pending_adjust { # if set and ->can('count_pending_workers'),
                     # provides a cut-off on the number of workers being submitted
    my $self = shift @_;

    if(scalar(@_)) { # new value is being set (which can be undef)
        $self->{'_pending_adjust'} = shift @_;
    }
    return $self->{'_pending_adjust'};
}

sub submitted_workers_limit { # if set, provides a cut-off on the number of workers being submitted
    my $self = shift @_;

    if(scalar(@_)) { # new value is being set (which can be undef)
        $self->{'_submitted_workers_limit'} = shift @_;
    }
    return $self->{'_submitted_workers_limit'};
}

sub limit_workers {
Leo Gordon's avatar
Leo Gordon committed
128
    my ($self, $worker_count) = @_;
129
130

    if($self->can('count_pending_workers') and $self->pending_adjust()) {
Leo Gordon's avatar
Leo Gordon committed
131
        my $pending_count = $self->count_pending_workers();
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156

        $worker_count -= $pending_count;
    }

    if(defined(my $submit_limit = $self->submitted_workers_limit)) {
        if($submit_limit < $worker_count) {

            $worker_count = $submit_limit;
        }
    }

    if($self->can('count_running_workers') and defined(my $total_limit = $self->total_running_workers_limit)) {
        my $available_slots = $total_limit - $self->count_running_workers();
        if($available_slots < $worker_count) {

            $worker_count = $available_slots;
        }
    }

    $worker_count = 0 if ($worker_count<0);

    return $worker_count;
}

1;