LSF.pm 12.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::Meadow::LSF

=head1 DESCRIPTION

    This is the 'LSF' implementation of Meadow

=head1 LICENSE

13
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Brandon Walts's avatar
Brandon Walts committed
14
    Copyright [2016-2020] EMBL-European Bioinformatics Institute
15 16 17 18 19 20 21 22 23 24 25 26

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

27
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
28 29 30

=cut

31 32 33 34

package Bio::EnsEMBL::Hive::Meadow::LSF;

use strict;
35
use warnings;
36
use Time::Piece;
37
use Time::Seconds;
38

39 40
use Bio::EnsEMBL::Hive::Utils ('split_for_bash');

Leo Gordon's avatar
Leo Gordon committed
41
use base ('Bio::EnsEMBL::Hive::Meadow');
42

43

44
our $VERSION = '3.1';       # Semantic version of the Meadow interface:
45 46 47
                            #   change the Major version whenever an incompatible change is introduced,
                            #   change the Minor version whenever the interface is extended, but compatibility is retained.

48
=head2 name
49

50 51 52
   Args:       : None
   Description : Determine the LSF cluster_name, if an LSF meadow is available.
   Returntype  : String
53

54
=cut
55

56 57 58
sub name {
    my $mcni = 'My cluster name is';
    my @lsid_out = `lsid 2>/dev/null`;
59

60 61 62 63
    foreach my $lsid_line (@lsid_out) {
        if ($lsid_line =~ /^$mcni\s+(\S+)/) {
            return $1;
        }
64 65 66
    }
}

67

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
sub get_current_worker_process_id {
    my ($self) = @_;

    my $lsb_jobid    = $ENV{'LSB_JOBID'};
    my $lsb_jobindex = $ENV{'LSB_JOBINDEX'};

    if(defined($lsb_jobid) and defined($lsb_jobindex)) {
        if($lsb_jobindex>0) {
            return "$lsb_jobid\[$lsb_jobindex\]";
        } else {
            return $lsb_jobid;
        }
    } else {
        die "Could not establish the process_id";
    }
}

85

86
sub count_pending_workers_by_rc_name {
Leo Gordon's avatar
Leo Gordon committed
87
    my ($self) = @_;
88

89
    my $jnp = $self->job_name_prefix();
90 91
    my @bjobs_out = qx{bjobs -w -J '${jnp}*' 2>/dev/null};  # "-u all" has been removed to ensure one user's PEND processes
                                                          #   do not affect another user helping to run the same pipeline.
Leo Gordon's avatar
Leo Gordon committed
92

93 94
#    warn "LSF::count_pending_workers_by_rc_name() running cmd:\n\t$cmd\n";

95 96
    my %pending_this_meadow_by_rc_name = ();
    my $total_pending_this_meadow = 0;
97

98 99 100 101 102 103
    foreach my $line (@bjobs_out) {
        if ($line=~/PEND/) {
            if($line=~/\b\Q$jnp\E(\S+)\-\d+(\[\d+\])?\b/) {
                $pending_this_meadow_by_rc_name{$1}++;
                $total_pending_this_meadow++;
            }
104 105 106
        }
    }

107
    return (\%pending_this_meadow_by_rc_name, $total_pending_this_meadow);
108 109
}

110

111
sub count_running_workers {
112 113
    my $self                        = shift @_;
    my $meadow_users_of_interest    = shift @_ || [ 'all' ];
114 115 116

    my $jnp = $self->job_name_prefix();

117 118 119
    my $total_running_worker_count = 0;

    foreach my $meadow_user (@$meadow_users_of_interest) {
120
        my @bjobs_out = qx{bjobs -w -J '${jnp}*' -u $meadow_user 2>/dev/null};
121 122

#        warn "LSF::count_running_workers() running cmd:\n\t$cmd\n";
123

124
        my $meadow_user_worker_count = scalar(grep {/RUN/} @bjobs_out);
125

126 127 128 129
        $total_running_worker_count += $meadow_user_worker_count;
    }

    return $total_running_worker_count;
130 131 132
}


133
sub status_of_all_our_workers { # returns a hashref
134 135
    my $self                        = shift @_;
    my $meadow_users_of_interest    = shift @_ || [ 'all' ];
Leo Gordon's avatar
Leo Gordon committed
136

137
    my $jnp = $self->job_name_prefix();
138

Leo Gordon's avatar
Leo Gordon committed
139
    my %status_hash = ();
Leo Gordon's avatar
bugfix  
Leo Gordon committed
140

141 142 143 144 145 146 147
    foreach my $meadow_user (@$meadow_users_of_interest) {
        my $cmd = "bjobs -w -J '${jnp}*' -u $meadow_user 2>/dev/null";

#        warn "LSF::status_of_all_our_workers() running cmd:\n\t$cmd\n";

        foreach my $line (`$cmd`) {
            my ($group_pid, $user, $status, $queue, $submission_host, $running_host, $job_name) = split(/\s+/, $line);
148

149 150 151
            next if(($group_pid eq 'JOBID') or ($status eq 'DONE') or ($status eq 'EXIT'));

            my $worker_pid = $group_pid;
152
            if($job_name=~/(\[\d+\])$/ and $worker_pid!~/\[\d+\]$/) {   # account for the difference in LSF 9.1.1.1 vs LSF 9.1.2.0  bjobs' output
153 154 155
                $worker_pid .= $1;
            }
            $status_hash{$worker_pid} = $status;
Leo Gordon's avatar
bugfix  
Leo Gordon committed
156
        }
Leo Gordon's avatar
Leo Gordon committed
157
    }
158

Leo Gordon's avatar
Leo Gordon committed
159 160 161
    return \%status_hash;
}

162

163
sub check_worker_is_alive_and_mine {
164 165
    my ($self, $worker) = @_;

166 167
    my $wpid = $worker->process_id();
    my $this_user = $ENV{'USER'};
168
    my $cmd = qq{bjobs -u $this_user $wpid 2>&1};
169

170
    my @bjobs_out = qx/$cmd/;
171 172
#    warn "LSF::check_worker_is_alive_and_mine() running cmd:\n\t$cmd\n";

173 174 175 176 177 178 179 180 181 182
    my $is_alive_and_mine = 0;
    foreach my $bjobs_line (@bjobs_out) {
        unless ($bjobs_line =~ /JOBID|DONE|EXIT/) { # *SUSP, UNKWN, and ZOMBI are "alive" for the purposes of this call
                                                    # which is typically used to see if the process can be killed.
                                                    # Can't search for line containing the job id, since it may be
                                                    # formatted differently in bjob output than in $worker->process_id()
                                                    # (e.g. for array jobs), so we exclude the header by excluding "JOBID"
            $is_alive_and_mine = 1;
        }
    }
183
    return $is_alive_and_mine;
184 185
}

186

187
sub kill_worker {
188
    my ($self, $worker, $fast) = @_;
189

190 191 192
    my $fast_flag = $fast ? '-r ' : '';

    my $cmd = "bkill $fast_flag".$worker->process_id();
193 194 195

#    warn "LSF::kill_worker() running cmd:\n\t$cmd\n";

196
    system($cmd);
197 198
}

199

200 201
sub _convert_to_datetime {      # a private subroutine that can recover missing year from an incomplete date and then transforms it into SQL's datetime for storage
    my ($weekday, $yearless, $real_year) = @_;
202

203 204 205 206 207
    if($real_year) {
        my $datetime = Time::Piece->strptime("$yearless $real_year", '%b %d %T %Y');
        return $datetime->date.' '.$datetime->hms;
    } else {
        my $curr_year = Time::Piece->new->year();
208

209 210 211
        my $years_back = 0;
        while ($years_back < 28) {  # The Gregorian calendar repeats every 28 years
            my $candidate_year = $curr_year - $years_back;
212 213 214 215
            my $datetime = Time::Piece->strptime("$yearless $candidate_year", '%b %d %T %Y');
            if($datetime->wdayname eq $weekday) {
                return $datetime->date.' '.$datetime->hms;
            }
216
            $years_back++;
217 218
        }
    }
219

220 221 222 223
    return; # could not guess the year
}


224
sub parse_report_source_line {
225 226 227
    my ($self, $bacct_source_line) = @_;

    warn "LSF::parse_report_source_line( \"$bacct_source_line\" )\n";
228

229
    my %status_2_cod = (
230 231 232 233
        'TERM_MEMLIMIT'     => 'MEMLIMIT',
        'TERM_RUNLIMIT'     => 'RUNLIMIT',
        'TERM_OWNER'        => 'KILLED_BY_USER',    # bkill     (wait until it dies)
        'TERM_FORCE_OWNER'  => 'KILLED_BY_USER',    # bkill -r  (quick remove)
234 235
    );

236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
    my %units_2_megs = (
        'K' => 1.0/1024,
        'M' => 1,
        'G' => 1024,
        'T' => 1024*1024,
    );

    local $/ = "------------------------------------------------------------------------------\n\n";
    open(my $bacct_fh, $bacct_source_line);
    my $record = <$bacct_fh>; # skip the header

    my %report_entry = ();

    for my $record (<$bacct_fh>) {
        chomp $record;

        # warn "RECORD:\n$record";

        my @lines = split(/\n/, $record);
        if( my ($process_id) = $lines[0]=~/^Job <(\d+(?:\[\d+\])?)>/) {

            my ($exit_status, $exception_status) = ('' x 2);
258
            my ($when_died, $cause_of_death);
259 260
            my (@keys, @values);
            my $line_has_key_values = 0;
261
            foreach (@lines) {
262
                if( /^(\w+)\s+(\w+\s+\d+\s+\d+:\d+:\d+)(?:\s+(\d{4}))?:\s+Completed\s<(\w+)>(?:\.|;\s+(\w+))/ ) {
263
                    $when_died      = _convert_to_datetime($1, $2, $3);
264 265
                    $cause_of_death = $5 && $status_2_cod{$5};
                    $exit_status = $4 . ($5 ? "/$5" : '');
266 267 268 269 270
                }
                elsif(/^\s*EXCEPTION STATUS:\s*(.*?)\s*$/) {
                    $exception_status = $1;
                    $exception_status =~s/\s+/;/g;
                }
271 272 273 274 275 276 277 278
                elsif(/^\s*CPU_T/) {
                    @keys = split(/\s+/, ' '.$_);
                    $line_has_key_values = 1;
                }
                elsif($line_has_key_values) {
                    @values = split(/\s+/, ' '.$_);
                    $line_has_key_values = 0;
                }
279 280 281 282 283 284 285 286 287 288
            }

            my %usage;  @usage{@keys} = @values;

            #warn join(', ', map {sprintf('%s=%s', $_, $usage{$_})} (sort keys %usage)), "\n";

            my ($mem_in_units, $mem_unit)   = $usage{'MEM'}  =~ /^([\d\.]+)([KMGT])$/;
            my ($swap_in_units, $swap_unit) = $usage{'SWAP'} =~ /^([\d\.]+)([KMGT])$/;

            $report_entry{ $process_id } = {
289
                    # entries for 'worker' table:
290
                'when_died'         => $when_died,
291 292 293 294 295 296 297 298 299 300
                'cause_of_death'    => $cause_of_death,

                    # entries for 'worker_resource_usage' table:
                'exit_status'       => $exit_status,
                'exception_status'  => $exception_status,
                'mem_megs'          => $mem_in_units  * $units_2_megs{$mem_unit},
                'swap_megs'         => $swap_in_units * $units_2_megs{$swap_unit},
                'pending_sec'       => $usage{'WAIT'},
                'cpu_sec'           => $usage{'CPU_T'},
                'lifespan_sec'      => $usage{'TURNAROUND'},
301 302 303 304 305 306 307 308 309
            };
        }
    }
    close $bacct_fh;

    return \%report_entry;
}


310
sub get_report_entries_for_process_ids {
311
    my $self = shift @_;    # make sure we get if off the way before splicing
312

313
    my %combined_report_entries = ();
314

315 316 317
    unless ($self->config_get('AccountingDisabled')) {
        while (my $pid_batch = join(' ', map { "'$_'" } splice(@_, 0, 20))) {  # can't fit too many pids on one shell cmdline
            my $cmd = "bacct -l $pid_batch |";
318

319
#           warn "LSF::get_report_entries_for_process_ids() running cmd:\n\t$cmd\n";
320

321
            my $batch_of_report_entries = $self->parse_report_source_line( $cmd );
322

323 324
            %combined_report_entries = (%combined_report_entries, %$batch_of_report_entries);
        }
325 326
    }

327
    return \%combined_report_entries;
328 329
}

330

331 332 333
sub get_report_entries_for_time_interval {
    my ($self, $from_time, $to_time, $username) = @_;

334 335 336 337 338
    my $batch_of_report_entries = {};

    unless ($self->config_get('AccountingDisabled')) {
        my $from_timepiece = Time::Piece->strptime($from_time, '%Y-%m-%d %H:%M:%S');
        $from_time = $from_timepiece->strftime('%Y/%m/%d/%H:%M');
339

340 341
        my $to_timepiece = Time::Piece->strptime($to_time, '%Y-%m-%d %H:%M:%S') + 2*ONE_MINUTE;
        $to_time = $to_timepiece->strftime('%Y/%m/%d/%H:%M');
342

343
        my $cmd = "bacct -l -C $from_time,$to_time ".($username ? "-u $username" : '') . ' |';
344 345 346

#        warn "LSF::get_report_entries_for_time_interval() running cmd:\n\t$cmd\n";

347 348
        $batch_of_report_entries = $self->parse_report_source_line( $cmd );
    }
349 350 351 352 353

    return $batch_of_report_entries;
}


354
sub submit_workers {
355
    my ($self, $worker_cmd, $required_worker_count, $iteration, $rc_name, $rc_specific_submission_cmd_args, $submit_log_subdir) = @_;
356

357 358
    my $job_array_common_name               = $self->job_array_common_name($rc_name, $iteration);
    my $job_array_name_with_indices         = $job_array_common_name . (($required_worker_count > 1) ? "[1-${required_worker_count}]" : '');
359
    my $meadow_specific_submission_cmd_args = $self->config_get('SubmissionOptions');
Leo Gordon's avatar
Leo Gordon committed
360

361 362 363 364 365 366 367 368 369
    my ($submit_stdout_file, $submit_stderr_file);

    if($submit_log_subdir) {
        $submit_stdout_file = $submit_log_subdir . "/log_${rc_name}_%J_%I.out";
        $submit_stderr_file = $submit_log_subdir . "/log_${rc_name}_%J_%I.err";
    } else {
        $submit_stdout_file = '/dev/null';
        $submit_stderr_file = '/dev/null';
    }
370

371 372
    $ENV{'LSB_STDOUT_DIRECT'} = 'y';  # unbuffer the output of the bsub command

373 374 375
    my @cmd = ('bsub',
        '-o', $submit_stdout_file,
        '-e', $submit_stderr_file,
376
        '-J', $job_array_name_with_indices,
377 378
        split_for_bash($rc_specific_submission_cmd_args),
        split_for_bash($meadow_specific_submission_cmd_args),
379 380
        $worker_cmd
    );
381

382
    print "Executing [ ".$self->signature." ] \t\t".join(' ', @cmd)."\n";
383

384
    system( @cmd ) && die "Could not submit job(s): $!, $?";  # let's abort the beekeeper and let the user check the syntax
385 386 387
}

1;