AnalysisStats.pm 16.1 KB
Newer Older
1 2 3
=pod 

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::AnalysisStats
6 7 8

=head1 DESCRIPTION

9 10 11 12
    An object that maintains counters for jobs in different states. This data is used by the Scheduler.

=head1 LICENSE

13
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
nwillhoft's avatar
nwillhoft committed
14
    Copyright [2016-2021] EMBL-European Bioinformatics Institute
15 16 17 18 19 20 21 22 23 24

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

25
=head1 CONTACT
26

27
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
28 29

=head1 APPENDIX
30

31 32
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
33

34 35
=cut

36

37 38 39
package Bio::EnsEMBL::Hive::AnalysisStats;

use strict;
40
use warnings;
41 42
use List::Util 'sum';
use POSIX;
43
use Term::ANSIColor;
44

45
use base ( 'Bio::EnsEMBL::Hive::Storable' );
46

47

48 49 50 51 52
sub unikey {    # override the default from Cacheable parent
    return [ 'analysis' ];
}


53 54 55 56 57 58
    ## Minimum amount of time in msec that a worker should run before reporting
    ## back to the hive. This is used when setting the batch_size automatically.
sub min_batch_time {
    return 2*60*1000;
}

59

60 61 62 63 64 65
=head1 AUTOLOADED

    analysis_id / analysis

=cut

66

67 68 69 70 71 72 73
sub dbID {
    my $self = shift;

    return $self->analysis_id(@_);
}


74
sub status {
75
    my $self = shift;
76 77 78
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
}
79

80 81 82 83 84
sub is_excluded {
    my $self = shift;
    $self->{'_is_excluded'} = shift if (@_);
    return $self->{'_is_excluded'};
}
85

86
## counters of jobs in different states:
87

88

89
sub total_job_count {
90
    my $self = shift;
91 92
    $self->{'_total_job_count'} = shift if(@_);
    return $self->{'_total_job_count'};
93 94
}

95
sub semaphored_job_count {
96
    my $self = shift;
97 98 99 100 101 102 103 104
    $self->{'_semaphored_job_count'} = shift if(@_);
    return $self->{'_semaphored_job_count'};
}

sub ready_job_count {
    my $self = shift;
    $self->{'_ready_job_count'} = shift if(@_);
    return $self->{'_ready_job_count'};
105 106
}

107
sub done_job_count {
108
    my $self = shift;
109 110
    $self->{'_done_job_count'} = shift if(@_);
    return $self->{'_done_job_count'};
111 112
}

113
sub failed_job_count {
114
    my $self = shift;
115 116 117 118 119 120 121 122 123
    $self->{'_failed_job_count'} = shift if(@_);
    $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
    return $self->{'_failed_job_count'};
}

sub num_running_workers {
    my $self = shift;
    $self->{'_num_running_workers'} = shift if(@_);
    return $self->{'_num_running_workers'};
124 125
}

126

127
## runtime stats:
128

129 130

sub avg_msec_per_job {
131
    my $self = shift;
132 133 134
    $self->{'_avg_msec_per_job'} = shift if(@_);
    $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
    return $self->{'_avg_msec_per_job'};
135 136
}

137
sub avg_input_msec_per_job {
138
    my $self = shift;
139 140 141
    $self->{'_avg_input_msec_per_job'} = shift if(@_);
    $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
    return $self->{'_avg_input_msec_per_job'};
142 143
}

144
sub avg_run_msec_per_job {
145
    my $self = shift;
146 147 148
    $self->{'_avg_run_msec_per_job'} = shift if(@_);
    $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
    return $self->{'_avg_run_msec_per_job'};
149 150
}

151
sub avg_output_msec_per_job {
152
    my $self = shift;
153 154 155
    $self->{'_avg_output_msec_per_job'} = shift if(@_);
    $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
    return $self->{'_avg_output_msec_per_job'};
156 157
}

158

159
## other storable attributes:
160

161
sub when_updated {                   # this method is called by the initial store() [at which point it returns undef]
162
    my $self = shift;
163 164
    $self->{'_when_updated'} = shift if(@_);
    return $self->{'_when_updated'};
165
}
166

167
sub seconds_since_when_updated {     # we fetch the server difference, store local time in the memory object, and use the local difference
168
    my( $self, $value ) = @_;
169 170
    $self->{'_when_updated'} = time() - $value if(defined($value));
    return defined($self->{'_when_updated'}) ? time() - $self->{'_when_updated'} : undef;
171 172
}

173 174 175 176 177 178
sub seconds_since_last_fetch {      # track the freshness of the object (store local time, use the local difference)
    my( $self, $value ) = @_;
    $self->{'_last_fetch'} = time() - $value if(defined($value));
    return defined($self->{'_last_fetch'}) ? time() - $self->{'_last_fetch'} : undef;
}

179 180 181 182
sub sync_lock {
    my $self = shift;
    $self->{'_sync_lock'} = shift if(@_);
    return $self->{'_sync_lock'};
183 184
}

185 186 187 188 189

# non-storable attributes and other helper-methods:


sub refresh {
190 191
    my ($self, $seconds_fresh)      = @_;
    my $seconds_since_last_fetch    = $self->seconds_since_last_fetch;
192

193 194 195 196
    if( $self->adaptor
    and (!defined($seconds_fresh) or !defined($seconds_since_last_fetch) or $seconds_fresh < $seconds_since_last_fetch) ) {
        return $self->adaptor->refresh($self);
    }
197 198
}

199

200 201
sub update {
    my $self = shift;
202 203

    if($self->adaptor) {
204
        $self->adaptor->update_stats_and_monitor($self);
205
    }
206 207
}

208

209
sub get_or_estimate_batch_size {
210
    my $self                = shift @_;
211
    my $remaining_job_count = shift @_ || 0;    # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
212

213
    my $batch_size = $self->analysis->batch_size;
214 215

    if( $batch_size > 0 ) {        # set to positive or not set (and auto-initialized within $self->batch_size)
216 217

                                                        # otherwise it is a request for dynamic estimation:
218
    } elsif( my $avg_msec_per_job = $self->avg_msec_per_job ) {           # further estimations from collected stats
219 220 221

        $avg_msec_per_job = 100 if($avg_msec_per_job<100);

222
        $batch_size = POSIX::ceil( $self->min_batch_time / $avg_msec_per_job );
223 224

    } else {        # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
225 226 227
        $batch_size = -$batch_size || 1;
    }

228 229
        # TailTrimming correction aims at meeting the requirement half way:
    if( my $num_of_workers = POSIX::ceil( ($self->num_running_workers + $self->estimate_num_required_workers($remaining_job_count))/2 ) ) {
230

231
        my $jobs_to_do  = $self->ready_job_count + $remaining_job_count;
232

233
        my $tt_batch_size = POSIX::floor( $jobs_to_do / $num_of_workers );
234 235 236
        if( (0 < $tt_batch_size) && ($tt_batch_size < $batch_size) ) {
            $batch_size = $tt_batch_size;
        } elsif(!$tt_batch_size) {
237
            $batch_size = POSIX::ceil( $jobs_to_do / $num_of_workers ); # essentially, 0 or 1
238
        }
239
    }
240

241

242
    return $batch_size;
243
}
244

245

246 247 248 249 250 251
sub estimate_num_required_workers {     # this 'max allowed' total includes the ones that are currently running
    my $self                = shift @_;
    my $remaining_job_count = shift @_ || 0;    # FIXME: a better estimate would be $self->claimed_job_count when it is introduced

    my $num_required_workers = $self->ready_job_count + $remaining_job_count;   # this 'max' estimation can still be zero

252
    my $h_cap = $self->analysis->hive_capacity;
253
    if( defined($h_cap) and $h_cap>=0) {  # what is the currently attainable maximum defined via hive_capacity?
254
        my $hive_current_load = $self->hive_pipeline->get_cached_hive_current_load();
255
        my $h_max = $self->num_running_workers + POSIX::floor( $h_cap * ( 1.0 - $hive_current_load ) );
256 257 258 259 260 261 262 263 264 265
        if($h_max < $num_required_workers) {
            $num_required_workers = $h_max;
        }
    }
    my $a_max = $self->analysis->analysis_capacity;
    if( defined($a_max) and $a_max>=0 ) {   # what is the currently attainable maximum defined via analysis_capacity?
        if($a_max < $num_required_workers) {
            $num_required_workers = $a_max;
        }
    }
266

267
    return $num_required_workers;
268 269 270
}


271
sub inprogress_job_count {      # includes CLAIMED
272 273 274 275 276 277 278
    my $self = shift;
    return    $self->total_job_count
            - $self->semaphored_job_count
            - $self->ready_job_count
            - $self->done_job_count
            - $self->failed_job_count;
}
279

280 281 282 283 284 285 286 287 288
my %meta_status_2_color = (
    'DONE'      => 'bright_cyan',
    'RUNNING'   => 'bright_yellow',
    'READY'     => 'bright_green',
    'BLOCKED'   => 'black on_white',
    'EMPTY'     => 'clear',
    'FAILED'    => 'red',
);

289 290 291 292 293 294 295 296 297 298 299
# "Support for colors 8 through 15 (the bright_ variants) was added in
# Term::ANSIColor 3.00, included in Perl 5.13.3."
# http://perldoc.perl.org/Term/ANSIColor.html#COMPATIBILITY
if ($Term::ANSIColor::VERSION < '3.00') {
    foreach my $s (keys %meta_status_2_color) {
        my $c = $meta_status_2_color{$s};
        $c =~ s/bright_//;
        $meta_status_2_color{$s} = $c;
    }
}

300 301 302 303
my %analysis_status_2_meta_status = (
    'LOADING'       => 'READY',
    'SYNCHING'      => 'READY',
    'ALL_CLAIMED'   => 'BLOCKED',
304
    'EXCLUDED'      => 'FAILED',
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
    'WORKING'       => 'RUNNING',
);

my %count_method_2_meta_status = (
    'semaphored_job_count'  => 'BLOCKED',
    'ready_job_count'       => 'READY',
    'inprogress_job_count'  => 'RUNNING',
    'done_job_count'        => 'DONE',
    'failed_job_count'      => 'FAILED',
);

sub _text_with_status_color {
    my $field_size = shift;
    my $color_enabled = shift;

    my $padding = ($field_size and length($_[0]) < $field_size) ? ' ' x ($field_size - length($_[0])) : '';
    return $padding . ($color_enabled ? color($meta_status_2_color{$_[1]}).$_[0].color('reset') : $_[0]);
}

324

325 326
sub job_count_breakout {
    my $self = shift;
327 328
    my $field_size = shift;
    my $color_enabled = shift;
329

330
    my $this_length = 0;
331 332 333
    my @count_list = ();
    my %count_hash = ();
    my $total_job_count = $self->total_job_count();
334
    foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
335
        if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
336 337
            $this_length += length("$count") + 1;
            push @count_list, _text_with_status_color(undef, $color_enabled, $count, $count_method_2_meta_status{$count_method}).substr($count_method,0,1);
338 339
        }
    }
340
    my $breakout_label = join('+', @count_list);
341
    $this_length += scalar(@count_list)-1 if @count_list;
342
    $breakout_label .= '='.$total_job_count if(scalar(@count_list)!=1); # only provide a total if multiple or no categories available
343 344 345
    $this_length += 1+length("$total_job_count") if(scalar(@count_list)!=1);

    $breakout_label = ' ' x ($field_size - $this_length) . $breakout_label if $field_size and $this_length<$field_size;
346

347
    return ($breakout_label, $total_job_count, \%count_hash);
348 349
}

350 351 352 353 354 355 356 357 358 359 360 361 362 363
sub friendly_avg_job_runtime {
    my $self = shift;

    my $avg = $self->avg_msec_per_job;
    my @units = ([24*3600*1000, 'day'], [3600*1000, 'hr'], [60*1000, 'min'], [1000, 'sec']);

    while (my $unit_description = shift @units) {
        my $x = $avg / $unit_description->[0];
        if ($x >= 1.) {
            return ($x, $unit_description->[1]);
        }
    }
    return ($avg, 'ms');
}
364

365 366
sub toString {
    my $self = shift @_;
367

368 369
    my $can_do_colour                                   = (-t STDOUT ? 1 : 0);
    my ($breakout_label, $total_job_count, $count_hash) = $self->job_count_breakout(24, $can_do_colour);
370
    my $analysis                                        = $self->analysis;
371
    my ($avg_runtime, $avg_runtime_unit)                = $self->friendly_avg_job_runtime;
372
    my $max_logic_name_length                           = shift @_ || length($analysis->logic_name);
373 374 375 376
    my $status_text                                     = $self->status;
    if ($self->is_excluded) {
        $status_text = 'EXCLUDED';
    }
377

378
    my $output .= sprintf("%-${max_logic_name_length}s(%3d) %s, jobs( %s ), avg:%5.1f %-3s, workers(Running:%d, Est.Required:%d) ",
379
        $analysis->logic_name,
380
        $self->analysis_id // 0,
381

382
        _text_with_status_color(11, $can_do_colour, $status_text, $analysis_status_2_meta_status{$status_text} || $status_text),
383

384
        $breakout_label,
385

386
        $avg_runtime, $avg_runtime_unit,
387 388

        $self->num_running_workers,
389
        $self->estimate_num_required_workers,
390
    );
391
    $output .=  '  h.cap:'    .( $analysis->hive_capacity // '-' )
392
               .'  a.cap:'    .( $analysis->analysis_capacity // '-')
393
               ."  (sync'd "  .($self->seconds_since_when_updated // 0)." sec ago)";
394 395

    return $output;
396 397
}

398 399

sub check_blocking_control_rules {
400
    my ($self, $no_die) = @_;
401
  
402
    my $ctrl_rules = $self->analysis->control_rules_collection();
403

404
    my $all_conditions_satisfied = 1;
405

Leo Gordon's avatar
Leo Gordon committed
406
    if(scalar @$ctrl_rules) {    # there are blocking ctrl_rules to check
407 408

        foreach my $ctrl_rule (@$ctrl_rules) {
409

410
            my $condition_analysis  = $ctrl_rule->condition_analysis(undef, $no_die);
411 412
            unless ($condition_analysis) {
                $all_conditions_satisfied = 0;
413
                last
414 415 416 417 418
            }

            my $condition_stats     = $condition_analysis->stats;
            unless ($condition_stats) {
                $all_conditions_satisfied = 0;
419
                last
420 421
            }

422 423 424 425
            # Make sure we use fresh properties of the AnalysisStats object
            # (especially relevant in the case of foreign pipelines, since
            # local objects are periodically refreshed)
            $condition_stats->refresh();
426 427 428 429

            my $condition_status    = $condition_stats->status;
            my $condition_cbe       = $condition_analysis->can_be_empty;
            my $condition_tjc       = $condition_stats->total_job_count;
430

431
            my $this_condition_satisfied = ($condition_status eq 'DONE')
432
                        || ($condition_cbe && !$condition_tjc);             # probably safer than saying ($condition_status eq 'EMPTY') because of the sync order
Leo Gordon's avatar
Leo Gordon committed
433

434 435
            unless( $this_condition_satisfied ) {
                $all_conditions_satisfied = 0;
436 437 438
            }
        }

439
        if($all_conditions_satisfied) {
440
            if($self->status eq 'BLOCKED') {    # unblock, since all conditions are met
441
                $self->status('LOADING');       # anything that is not 'BLOCKED' will do, it will be redefined in the following subroutine
442 443
            }
        } else {    # (re)block
444
            $self->status('BLOCKED');
445 446
        }
    }
Leo Gordon's avatar
Leo Gordon committed
447

448
    return $all_conditions_satisfied;
449 450 451
}


452 453 454 455
sub determine_status {
    my $self = shift;

    if($self->status ne 'BLOCKED') {
456
        if( !$self->total_job_count ) {
457

458
            $self->status('EMPTY');
459

460
        } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) {   # all jobs of the analysis have been finished
461
            my $analysis = $self->analysis;
462
            my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
463 464 465 466 467
            if ($self->failed_job_count > $absolute_tolerance) {
                $self->status('FAILED');
            } else {
                $self->status('DONE');
            }
468 469 470 471 472
        } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running

            $self->status('READY');

        } elsif( !$self->ready_job_count ) {                                # there are no claimable jobs, possibly because some are semaphored
473 474 475

            $self->status('ALL_CLAIMED');

476
        } elsif( $self->inprogress_job_count ) {
477 478 479 480 481 482 483

            $self->status('WORKING');
        }
    }
}


484 485 486 487 488 489 490 491
sub recalculate_from_job_counts {
    my ($self, $job_counts) = @_;

        # only update job_counts if given the hash:
    if($job_counts) {
        $self->semaphored_job_count( $job_counts->{'SEMAPHORED'} || 0 );
        $self->ready_job_count(      $job_counts->{'READY'} || 0 );
        $self->failed_job_count(     $job_counts->{'FAILED'} || 0 );
492
        $self->done_job_count(       ( $job_counts->{'DONE'} // 0 ) + ($job_counts->{'PASSED_ON'} // 0 ) ); # done here or potentially done elsewhere
493 494 495 496 497 498 499 500 501
        $self->total_job_count(      sum( values %$job_counts ) || 0 );
    }

    $self->check_blocking_control_rules();

    $self->determine_status();
}


502
1;