LSF.pm 15.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
=pod 

=head1 NAME

    Bio::EnsEMBL::Hive::Meadow::LSF

=head1 DESCRIPTION

    This is the 'LSF' implementation of Meadow

11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
=head1 TODO

=over

=item LSF being temporarily unavailable

We should probably implement a method using IPC::Open3 (see Bio::EnsEMBL::Compara::Utils::RunCommand)
that captures stderr and can parse stdout on the fly.
Depending on the Meadow method, we should either retry say 1 minute later, or
return something like undef to tell the caller that no operation was done.

 Beekeeper : loop #15 ======================================================
 GarbageCollector:       Checking for lost Workers...
 GarbageCollector:       [Queen:] out of 20 Workers that haven't checked in during the last 5 seconds...
 GarbageCollector:       [LSF/EBI Meadow:]       LOST:20

 GarbageCollector:       Discovered 20 lost LSF Workers
28
 LSF::parse_report_source_line( "bacct -f - -l '4126850[15]' '4126850[6]' '4126835[24]' '4126850[33]' '4126835[10]' '4126835[39]' '4126850[23]' '4126835[3]' '4126835[19]' '4126835[31]' '4126835[40]' '4126835[41]' '4126850[5]' '4126850[41]' '4126850[2]' '4126850[3]' '4126835[5]' '4126835[33]' '4126850[7]' '4126850[42]'" )
29
 ls_getclustername(): Slave LIM configuration is not ready yet. Please give file name.
30
 Could not read from 'bacct -f - -l '4126850[15]' '4126850[6]' '4126835[24]' '4126850[33]' '4126835[10]' '4126835[39]' '4126850[23]' '4126835[3]' '4126835[19]' '4126835[31]' '4126835[40]' '4126835[41]' '4126850[5]' '4126850[41]' '4126850[2]' '4126850[3]' '4126835[5]' '4126835[33]' '4126850[7]' '4126850[42]''. Received the error 255
31 32 33

=back

34 35
=head1 LICENSE

36
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
nwillhoft's avatar
nwillhoft committed
37
    Copyright [2016-2021] EMBL-European Bioinformatics Institute
38 39 40 41 42 43 44 45 46 47 48 49

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

=head1 CONTACT

50
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
51 52 53

=cut

54 55 56 57

package Bio::EnsEMBL::Hive::Meadow::LSF;

use strict;
58
use warnings;
59
use Time::Piece;
60
use Time::Seconds;
61

62
use Bio::EnsEMBL::Hive::Utils ('split_for_bash', 'whoami');
63

Leo Gordon's avatar
Leo Gordon committed
64
use base ('Bio::EnsEMBL::Hive::Meadow');
65

66

Brandon Walts's avatar
Brandon Walts committed
67
our $VERSION = '5.2';       # Semantic version of the Meadow interface:
68 69 70
                            #   change the Major version whenever an incompatible change is introduced,
                            #   change the Minor version whenever the interface is extended, but compatibility is retained.

71
=head2 name
72

73 74 75
   Args:       : None
   Description : Determine the LSF cluster_name, if an LSF meadow is available.
   Returntype  : String
76

77
=cut
78

79
sub name {
80 81
    my $re_lsf_names = qr/(IBM Spectrum LSF|Platform LSF|openlava project)/;
    my $re_cluster_name = qr/^My cluster name is\s+(\S+)/;
82
    my @lsid_out = `lsid 2>/dev/null`;
83

84
    my $is_lsf = 0;
85
    foreach my $lsid_line (@lsid_out) {
86 87 88 89
        if ($lsid_line =~ $re_lsf_names) {
            $is_lsf = 1;
        } elsif ($lsid_line =~ $re_cluster_name) {
            return $1 if $is_lsf;
90
        }
91 92 93
    }
}

94

95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
sub get_current_worker_process_id {
    my ($self) = @_;

    my $lsb_jobid    = $ENV{'LSB_JOBID'};
    my $lsb_jobindex = $ENV{'LSB_JOBINDEX'};

    if(defined($lsb_jobid) and defined($lsb_jobindex)) {
        if($lsb_jobindex>0) {
            return "$lsb_jobid\[$lsb_jobindex\]";
        } else {
            return $lsb_jobid;
        }
    } else {
        die "Could not establish the process_id";
    }
}

112

113 114 115 116 117 118 119 120
sub deregister_local_process {
    my ($self) = @_;

    delete $ENV{'LSB_JOBID'};
    delete $ENV{'LSB_JOBINDEX'};
}


121
sub status_of_all_our_workers { # returns an arrayref
122
    my $self                        = shift @_;
123 124 125
    my $meadow_users_of_interest    = shift @_;

    $meadow_users_of_interest = [ 'all' ] unless ($meadow_users_of_interest && scalar(@$meadow_users_of_interest));
Leo Gordon's avatar
Leo Gordon committed
126

127
    my $jnp = $self->job_name_prefix();
128

129
    my @status_list = ();
Leo Gordon's avatar
bugfix  
Leo Gordon committed
130

131
    foreach my $meadow_user (@$meadow_users_of_interest) {
132
        my $cmd = "bjobs -w -u $meadow_user 2>/dev/null";
133 134 135 136 137

#        warn "LSF::status_of_all_our_workers() running cmd:\n\t$cmd\n";

        foreach my $line (`$cmd`) {
            my ($group_pid, $user, $status, $queue, $submission_host, $running_host, $job_name) = split(/\s+/, $line);
138

139
            # skip the header line and jobs that are done
140 141
            next if(($group_pid eq 'JOBID') or ($status eq 'DONE') or ($status eq 'EXIT'));

142 143 144
            # skip the hive jobs that belong to another pipeline
            next if (($job_name =~ /Hive-/) and (index($job_name, $jnp) != 0));

145
            my $worker_pid = $group_pid;
146
            if($job_name=~/(\[\d+\])$/ and $worker_pid!~/\[\d+\]$/) {   # account for the difference in LSF 9.1.1.1 vs LSF 9.1.2.0  bjobs' output
147 148
                $worker_pid .= $1;
            }
149
            push @status_list, [$worker_pid, $user, $status];
Leo Gordon's avatar
bugfix  
Leo Gordon committed
150
        }
Leo Gordon's avatar
Leo Gordon committed
151
    }
152

153
    return \@status_list;
Leo Gordon's avatar
Leo Gordon committed
154 155
}

156

157
sub check_worker_is_alive_and_mine {
158 159
    my ($self, $worker) = @_;

160
    my $wpid = $worker->process_id();
161
    my $this_user = whoami();
162
    my $cmd = qq{bjobs -u $this_user $wpid 2>&1};
163

164
    my @bjobs_out = qx/$cmd/;
165 166
#    warn "LSF::check_worker_is_alive_and_mine() running cmd:\n\t$cmd\n";

167 168 169 170 171 172 173 174 175 176
    my $is_alive_and_mine = 0;
    foreach my $bjobs_line (@bjobs_out) {
        unless ($bjobs_line =~ /JOBID|DONE|EXIT/) { # *SUSP, UNKWN, and ZOMBI are "alive" for the purposes of this call
                                                    # which is typically used to see if the process can be killed.
                                                    # Can't search for line containing the job id, since it may be
                                                    # formatted differently in bjob output than in $worker->process_id()
                                                    # (e.g. for array jobs), so we exclude the header by excluding "JOBID"
            $is_alive_and_mine = 1;
        }
    }
177
    return $is_alive_and_mine;
178 179
}

180

181
sub kill_worker {
182
    my ($self, $worker, $fast) = @_;
183

184
    my $exec_status;
Matthieu Muffato's avatar
Matthieu Muffato committed
185
    if ($fast) {
186
        $exec_status = system('bkill', '-r', $worker->process_id());
Matthieu Muffato's avatar
Matthieu Muffato committed
187
    } else {
188
        $exec_status = system('bkill', $worker->process_id());
Matthieu Muffato's avatar
Matthieu Muffato committed
189
    }
190 191

    return ( $exec_status >> 8 );
192 193
}

194

195 196
sub _convert_to_datetime {      # a private subroutine that can recover missing year from an incomplete date and then transforms it into SQL's datetime for storage
    my ($weekday, $yearless, $real_year) = @_;
197

198 199 200 201 202
    if($real_year) {
        my $datetime = Time::Piece->strptime("$yearless $real_year", '%b %d %T %Y');
        return $datetime->date.' '.$datetime->hms;
    } else {
        my $curr_year = Time::Piece->new->year();
203

204 205 206
        my $years_back = 0;
        while ($years_back < 28) {  # The Gregorian calendar repeats every 28 years
            my $candidate_year = $curr_year - $years_back;
207 208 209 210
            my $datetime = Time::Piece->strptime("$yearless $candidate_year", '%b %d %T %Y');
            if($datetime->wdayname eq $weekday) {
                return $datetime->date.' '.$datetime->hms;
            }
211
            $years_back++;
212 213
        }
    }
214

215 216 217 218
    return; # could not guess the year
}


219
sub parse_report_source_line {
220 221
    my ($self, $bacct_source_line) = @_;

222
    print "LSF::parse_report_source_line( \"$bacct_source_line\" )\n";
223

224 225
    # Conplete list of exit codes is available at
    # https://www.ibm.com/support/knowledgecenter/SSETD4_9.1.3/lsf_admin/termination_reasons_lsf.html
226
    my %status_2_cod = (
227 228 229 230
        'TERM_MEMLIMIT'     => 'MEMLIMIT',
        'TERM_RUNLIMIT'     => 'RUNLIMIT',
        'TERM_OWNER'        => 'KILLED_BY_USER',    # bkill     (wait until it dies)
        'TERM_FORCE_OWNER'  => 'KILLED_BY_USER',    # bkill -r  (quick remove)
231 232
        'TERM_BUCKET_KILL'  => 'KILLED_BY_USER',    # bkill -b  (kills large numbers of jobs as soon as possible)
        'TERM_REQUEUE_OWNER'=> 'KILLED_BY_USER',    # Job killed and requeued by owner
233 234
    );

235 236 237 238 239 240 241 242
    my %units_2_megs = (
        'K' => 1.0/1024,
        'M' => 1,
        'G' => 1024,
        'T' => 1024*1024,
    );

    local $/ = "------------------------------------------------------------------------------\n\n";
Matthieu Muffato's avatar
Matthieu Muffato committed
243
    open(my $bacct_fh, '-|', $bacct_source_line);
244 245 246 247 248 249 250 251 252 253 254 255 256
    my $record = <$bacct_fh>; # skip the header

    my %report_entry = ();

    for my $record (<$bacct_fh>) {
        chomp $record;

        # warn "RECORD:\n$record";

        my @lines = split(/\n/, $record);
        if( my ($process_id) = $lines[0]=~/^Job <(\d+(?:\[\d+\])?)>/) {

            my ($exit_status, $exception_status) = ('' x 2);
257
            my ($when_born, $meadow_host);
258
            my ($when_died, $cause_of_death);
259 260
            my (@keys, @values);
            my $line_has_key_values = 0;
261
            foreach (@lines) {
262
                if( /^(\w+)\s+(\w+\s+\d+\s+\d+:\d+:\d+)(?:\s+(\d{4}))?:\s+(?:\[\d+\]\s+)?[Dd]ispatched to\s<([\w\-\.]+)>/ ) {
263 264 265 266
                    $when_born      = _convert_to_datetime($1, $2, $3);
                    $meadow_host    = $4;
                }
                elsif( /^(\w+)\s+(\w+\s+\d+\s+\d+:\d+:\d+)(?:\s+(\d{4}))?:\s+Completed\s<(\w+)>(?:\.|;\s+(\w+))/ ) {
267
                    $when_died      = _convert_to_datetime($1, $2, $3);
268
                    $cause_of_death = $5 && ($status_2_cod{$5} || 'SEE_EXIT_STATUS');
269
                    $exit_status = $4 . ($5 ? "/$5" : '');
270 271 272 273 274
                }
                elsif(/^\s*EXCEPTION STATUS:\s*(.*?)\s*$/) {
                    $exception_status = $1;
                    $exception_status =~s/\s+/;/g;
                }
275 276 277 278 279 280 281 282
                elsif(/^\s*CPU_T/) {
                    @keys = split(/\s+/, ' '.$_);
                    $line_has_key_values = 1;
                }
                elsif($line_has_key_values) {
                    @values = split(/\s+/, ' '.$_);
                    $line_has_key_values = 0;
                }
283 284 285 286 287 288 289 290 291 292
            }

            my %usage;  @usage{@keys} = @values;

            #warn join(', ', map {sprintf('%s=%s', $_, $usage{$_})} (sort keys %usage)), "\n";

            my ($mem_in_units, $mem_unit)   = $usage{'MEM'}  =~ /^([\d\.]+)([KMGT])$/;
            my ($swap_in_units, $swap_unit) = $usage{'SWAP'} =~ /^([\d\.]+)([KMGT])$/;

            $report_entry{ $process_id } = {
293
                    # entries for 'worker' table:
294 295
                'meadow_host'       => $meadow_host,
                'when_born'         => $when_born,
296
                'when_died'         => $when_died,
297 298 299 300 301 302 303 304 305 306
                'cause_of_death'    => $cause_of_death,

                    # entries for 'worker_resource_usage' table:
                'exit_status'       => $exit_status,
                'exception_status'  => $exception_status,
                'mem_megs'          => $mem_in_units  * $units_2_megs{$mem_unit},
                'swap_megs'         => $swap_in_units * $units_2_megs{$swap_unit},
                'pending_sec'       => $usage{'WAIT'},
                'cpu_sec'           => $usage{'CPU_T'},
                'lifespan_sec'      => $usage{'TURNAROUND'},
307 308 309 310
            };
        }
    }
    close $bacct_fh;
311 312
    my $exit = $? >> 8;
    die "Could not read from '$bacct_source_line'. Received the error $exit\n" if $exit;
313 314 315 316 317

    return \%report_entry;
}


318
sub get_report_entries_for_process_ids {
319
    my $self = shift @_;    # make sure we get if off the way before splicing
320

321
    my %combined_report_entries = ();
322

323 324
    unless ($self->config_get('AccountingDisabled')) {
        while (my $pid_batch = join(' ', map { "'$_'" } splice(@_, 0, 20))) {  # can't fit too many pids on one shell cmdline
325 326
            my $bacct_opts = $self->config_get('BacctExtraOptions') || "";
            my $cmd = "bacct $bacct_opts -l $pid_batch";
327

328
#           warn "LSF::get_report_entries_for_process_ids() running cmd:\n\t$cmd\n";
329

330
            my $batch_of_report_entries = $self->parse_report_source_line( $cmd );
331

332 333
            %combined_report_entries = (%combined_report_entries, %$batch_of_report_entries);
        }
334 335
    }

336
    return \%combined_report_entries;
337 338
}

339

340 341 342
sub get_report_entries_for_time_interval {
    my ($self, $from_time, $to_time, $username) = @_;

343 344 345 346 347
    my $batch_of_report_entries = {};

    unless ($self->config_get('AccountingDisabled')) {
        my $from_timepiece = Time::Piece->strptime($from_time, '%Y-%m-%d %H:%M:%S');
        $from_time = $from_timepiece->strftime('%Y/%m/%d/%H:%M');
348

349 350
        my $to_timepiece = Time::Piece->strptime($to_time, '%Y-%m-%d %H:%M:%S') + 2*ONE_MINUTE;
        $to_time = $to_timepiece->strftime('%Y/%m/%d/%H:%M');
351

352 353
        my $bacct_opts = $self->config_get('BacctExtraOptions') || "";
        my $cmd = "bacct $bacct_opts -l -C $from_time,$to_time ".($username ? "-u $username" : '');
354 355 356

#        warn "LSF::get_report_entries_for_time_interval() running cmd:\n\t$cmd\n";

357 358
        $batch_of_report_entries = $self->parse_report_source_line( $cmd );
    }
359 360 361 362 363

    return $batch_of_report_entries;
}


364
sub submit_workers_return_meadow_pids {
365
    my ($self, $worker_cmd, $required_worker_count, $iteration, $rc_name, $rc_specific_submission_cmd_args, $submit_log_subdir) = @_;
366

367
    my $job_array_common_name               = $self->job_array_common_name($rc_name, $iteration);
368 369
    my $array_required                      = $required_worker_count > 1;
    my $job_array_name_with_indices         = $job_array_common_name . ($array_required ? "[1-${required_worker_count}]" : '');
370
    my $meadow_specific_submission_cmd_args = $self->config_get('SubmissionOptions');
Leo Gordon's avatar
Leo Gordon committed
371

372 373 374 375 376 377 378 379 380
    my ($submit_stdout_file, $submit_stderr_file);

    if($submit_log_subdir) {
        $submit_stdout_file = $submit_log_subdir . "/log_${rc_name}_%J_%I.out";
        $submit_stderr_file = $submit_log_subdir . "/log_${rc_name}_%J_%I.err";
    } else {
        $submit_stdout_file = '/dev/null';
        $submit_stderr_file = '/dev/null';
    }
381

382 383
    $ENV{'LSB_STDOUT_DIRECT'} = 'y';  # unbuffer the output of the bsub command

384 385 386
    my @cmd = ('bsub',
        '-o', $submit_stdout_file,
        '-e', $submit_stderr_file,
387
        '-J', $job_array_name_with_indices,
388 389
        split_for_bash($rc_specific_submission_cmd_args),
        split_for_bash($meadow_specific_submission_cmd_args),
390 391
        $worker_cmd
    );
392

393
    print "Executing [ ".$self->signature." ] \t\t".join(' ', @cmd)."\n";
394

395 396
    my $lsf_jobid;

397 398
    open(my $bsub_output_fh, "-|", @cmd) || die "Could not submit job(s): $!, $?";  # let's abort the beekeeper and let the user check the syntax
    while(my $line = <$bsub_output_fh>) {
399
        if($line=~/^Job \<(\d+)\> is submitted to/) {
400 401 402 403 404
            $lsf_jobid = $1;
        } else {
            warn $line;     # assuming it is a temporary blockage that might resolve itself with time
        }
    }
405
    close $bsub_output_fh;
406 407 408 409 410 411

    if($lsf_jobid) {
        return ($array_required ? [ map { $lsf_jobid.'['.$_.']' } (1..$required_worker_count) ] : [ $lsf_jobid ]);
    } else {
        die "Submission unsuccessful\n";
    }
412 413 414
}

1;