AnalysisStats.pm 16.8 KB
Newer Older
1 2 3
=pod 

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::AnalysisStats
6 7 8

=head1 DESCRIPTION

9 10 11 12
    An object that maintains counters for jobs in different states. This data is used by the Scheduler.

=head1 LICENSE

13
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Brandon Walts's avatar
Brandon Walts committed
14
    Copyright [2016-2020] EMBL-European Bioinformatics Institute
15 16 17 18 19 20 21 22 23 24

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

25
=head1 CONTACT
26

27
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
28 29

=head1 APPENDIX
30

31 32
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
33

34 35
=cut

36

37 38 39
package Bio::EnsEMBL::Hive::AnalysisStats;

use strict;
40
use warnings;
41 42
use List::Util 'sum';
use POSIX;
43
use Term::ANSIColor;
44

45
use base ( 'Bio::EnsEMBL::Hive::Cacheable', 'Bio::EnsEMBL::Hive::Storable' );
46

47

48 49 50 51 52
sub unikey {    # override the default from Cacheable parent
    return [ 'analysis' ];
}


53 54 55 56 57 58
    ## Minimum amount of time in msec that a worker should run before reporting
    ## back to the hive. This is used when setting the batch_size automatically.
sub min_batch_time {
    return 2*60*1000;
}

59

60 61 62 63 64 65
=head1 AUTOLOADED

    analysis_id / analysis

=cut

66

67 68 69 70 71 72 73
sub dbID {
    my $self = shift;

    return $self->analysis_id(@_);
}


74 75 76 77 78
sub batch_size {
    my $self = shift;
    $self->{'_batch_size'} = shift if(@_);
    $self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
    return $self->{'_batch_size'};
79 80
}

81 82 83 84
sub hive_capacity {
    my $self = shift;
    $self->{'_hive_capacity'} = shift if(@_);
    return $self->{'_hive_capacity'};
85 86
}

87
sub status {
88
    my $self = shift;
89 90 91
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
}
92 93


94
## counters of jobs in different states:
95

96

97
sub total_job_count {
98
    my $self = shift;
99 100
    $self->{'_total_job_count'} = shift if(@_);
    return $self->{'_total_job_count'};
101 102
}

103
sub semaphored_job_count {
104
    my $self = shift;
105 106 107 108 109 110 111 112
    $self->{'_semaphored_job_count'} = shift if(@_);
    return $self->{'_semaphored_job_count'};
}

sub ready_job_count {
    my $self = shift;
    $self->{'_ready_job_count'} = shift if(@_);
    return $self->{'_ready_job_count'};
113 114
}

115
sub done_job_count {
116
    my $self = shift;
117 118
    $self->{'_done_job_count'} = shift if(@_);
    return $self->{'_done_job_count'};
119 120
}

121
sub failed_job_count {
122
    my $self = shift;
123 124 125 126 127 128 129 130 131
    $self->{'_failed_job_count'} = shift if(@_);
    $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
    return $self->{'_failed_job_count'};
}

sub num_running_workers {
    my $self = shift;
    $self->{'_num_running_workers'} = shift if(@_);
    return $self->{'_num_running_workers'};
132 133
}

134 135 136 137

## dynamic hive_capacity mode attributes:


138
sub behaviour {
139 140 141
    my $self = shift;
    $self->{'_behaviour'} = shift if(@_);
    return $self->{'_behaviour'};
142 143 144
}

sub input_capacity {
145 146 147
    my $self = shift;
    $self->{'_input_capacity'} = shift if(@_);
    return $self->{'_input_capacity'};
148 149 150
}

sub output_capacity {
151 152 153
    my $self = shift;
    $self->{'_output_capacity'} = shift if(@_);
    return $self->{'_output_capacity'};
154 155
}

156

157
## dynamic hive_capacity mode counters:
158

159 160

sub avg_msec_per_job {
161
    my $self = shift;
162 163 164
    $self->{'_avg_msec_per_job'} = shift if(@_);
    $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
    return $self->{'_avg_msec_per_job'};
165 166
}

167
sub avg_input_msec_per_job {
168
    my $self = shift;
169 170 171
    $self->{'_avg_input_msec_per_job'} = shift if(@_);
    $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
    return $self->{'_avg_input_msec_per_job'};
172 173
}

174
sub avg_run_msec_per_job {
175
    my $self = shift;
176 177 178
    $self->{'_avg_run_msec_per_job'} = shift if(@_);
    $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
    return $self->{'_avg_run_msec_per_job'};
179 180
}

181
sub avg_output_msec_per_job {
182
    my $self = shift;
183 184 185
    $self->{'_avg_output_msec_per_job'} = shift if(@_);
    $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
    return $self->{'_avg_output_msec_per_job'};
186 187
}

188

189
## other storable attributes:
190

191
sub when_updated {                   # this method is called by the initial store() [at which point it returns undef]
192
    my $self = shift;
193 194
    $self->{'_when_updated'} = shift if(@_);
    return $self->{'_when_updated'};
195
}
196

197
sub seconds_since_when_updated {     # we fetch the server difference, store local time in the memory object, and use the local difference
198
    my( $self, $value ) = @_;
199 200
    $self->{'_when_updated'} = time() - $value if(defined($value));
    return defined($self->{'_when_updated'}) ? time() - $self->{'_when_updated'} : undef;
201 202
}

203 204 205 206 207 208
sub seconds_since_last_fetch {      # track the freshness of the object (store local time, use the local difference)
    my( $self, $value ) = @_;
    $self->{'_last_fetch'} = time() - $value if(defined($value));
    return defined($self->{'_last_fetch'}) ? time() - $self->{'_last_fetch'} : undef;
}

209 210 211 212
sub sync_lock {
    my $self = shift;
    $self->{'_sync_lock'} = shift if(@_);
    return $self->{'_sync_lock'};
213 214
}

215 216 217 218 219

# non-storable attributes and other helper-methods:


sub refresh {
220 221
    my ($self, $seconds_fresh)      = @_;
    my $seconds_since_last_fetch    = $self->seconds_since_last_fetch;
222

223 224 225 226
    if( $self->adaptor
    and (!defined($seconds_fresh) or !defined($seconds_since_last_fetch) or $seconds_fresh < $seconds_since_last_fetch) ) {
        return $self->adaptor->refresh($self);
    }
227 228
}

229

230 231
sub update {
    my $self = shift;
232 233

    if($self->adaptor) {
234
        $self->adaptor->update_stats_and_monitor($self);
235
    }
236 237
}

238

239
sub get_or_estimate_batch_size {
240
    my $self                = shift @_;
241
    my $remaining_job_count = shift @_ || 0;    # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
242

243 244 245
    my $batch_size = $self->batch_size;

    if( $batch_size > 0 ) {        # set to positive or not set (and auto-initialized within $self->batch_size)
246 247

                                                        # otherwise it is a request for dynamic estimation:
248
    } elsif( my $avg_msec_per_job = $self->avg_msec_per_job ) {           # further estimations from collected stats
249 250 251

        $avg_msec_per_job = 100 if($avg_msec_per_job<100);

252
        $batch_size = POSIX::ceil( $self->min_batch_time / $avg_msec_per_job );
253 254

    } else {        # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
255 256 257
        $batch_size = -$batch_size || 1;
    }

258 259
        # TailTrimming correction aims at meeting the requirement half way:
    if( my $num_of_workers = POSIX::ceil( ($self->num_running_workers + $self->estimate_num_required_workers($remaining_job_count))/2 ) ) {
260

261
        my $jobs_to_do  = $self->ready_job_count + $remaining_job_count;
262

263
        my $tt_batch_size = POSIX::floor( $jobs_to_do / $num_of_workers );
264 265 266
        if( (0 < $tt_batch_size) && ($tt_batch_size < $batch_size) ) {
            $batch_size = $tt_batch_size;
        } elsif(!$tt_batch_size) {
267
            $batch_size = POSIX::ceil( $jobs_to_do / $num_of_workers ); # essentially, 0 or 1
268
        }
269
    }
270

271

272
    return $batch_size;
273
}
274

275

276 277 278 279 280 281 282 283 284
sub estimate_num_required_workers {     # this 'max allowed' total includes the ones that are currently running
    my $self                = shift @_;
    my $remaining_job_count = shift @_ || 0;    # FIXME: a better estimate would be $self->claimed_job_count when it is introduced

    my $num_required_workers = $self->ready_job_count + $remaining_job_count;   # this 'max' estimation can still be zero

    my $h_cap = $self->hive_capacity;
    if( defined($h_cap) and $h_cap>=0) {  # what is the currently attainable maximum defined via hive_capacity?
        my $hive_current_load = $self->adaptor ? $self->adaptor->db->get_RoleAdaptor->get_hive_current_load() : 0;
285
        my $h_max = $self->num_running_workers + POSIX::floor( $h_cap * ( 1.0 - $hive_current_load ) );
286 287 288 289 290 291 292 293 294 295
        if($h_max < $num_required_workers) {
            $num_required_workers = $h_max;
        }
    }
    my $a_max = $self->analysis->analysis_capacity;
    if( defined($a_max) and $a_max>=0 ) {   # what is the currently attainable maximum defined via analysis_capacity?
        if($a_max < $num_required_workers) {
            $num_required_workers = $a_max;
        }
    }
296

297
    return $num_required_workers;
298 299 300
}


301
sub inprogress_job_count {      # includes CLAIMED
302 303 304 305 306 307 308
    my $self = shift;
    return    $self->total_job_count
            - $self->semaphored_job_count
            - $self->ready_job_count
            - $self->done_job_count
            - $self->failed_job_count;
}
309

310 311 312 313 314 315 316 317 318
my %meta_status_2_color = (
    'DONE'      => 'bright_cyan',
    'RUNNING'   => 'bright_yellow',
    'READY'     => 'bright_green',
    'BLOCKED'   => 'black on_white',
    'EMPTY'     => 'clear',
    'FAILED'    => 'red',
);

319 320 321 322 323 324 325 326 327 328 329
# "Support for colors 8 through 15 (the bright_ variants) was added in
# Term::ANSIColor 3.00, included in Perl 5.13.3."
# http://perldoc.perl.org/Term/ANSIColor.html#COMPATIBILITY
if ($Term::ANSIColor::VERSION < '3.00') {
    foreach my $s (keys %meta_status_2_color) {
        my $c = $meta_status_2_color{$s};
        $c =~ s/bright_//;
        $meta_status_2_color{$s} = $c;
    }
}

330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
my %analysis_status_2_meta_status = (
    'LOADING'       => 'READY',
    'SYNCHING'      => 'READY',
    'ALL_CLAIMED'   => 'BLOCKED',
    'WORKING'       => 'RUNNING',
);

my %count_method_2_meta_status = (
    'semaphored_job_count'  => 'BLOCKED',
    'ready_job_count'       => 'READY',
    'inprogress_job_count'  => 'RUNNING',
    'done_job_count'        => 'DONE',
    'failed_job_count'      => 'FAILED',
);

sub _text_with_status_color {
    my $field_size = shift;
    my $color_enabled = shift;

    my $padding = ($field_size and length($_[0]) < $field_size) ? ' ' x ($field_size - length($_[0])) : '';
    return $padding . ($color_enabled ? color($meta_status_2_color{$_[1]}).$_[0].color('reset') : $_[0]);
}

353

354 355
sub job_count_breakout {
    my $self = shift;
356 357
    my $field_size = shift;
    my $color_enabled = shift;
358

359
    my $this_length = 0;
360 361 362
    my @count_list = ();
    my %count_hash = ();
    my $total_job_count = $self->total_job_count();
363
    foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
364
        if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
365 366
            $this_length += length("$count") + 1;
            push @count_list, _text_with_status_color(undef, $color_enabled, $count, $count_method_2_meta_status{$count_method}).substr($count_method,0,1);
367 368
        }
    }
369
    my $breakout_label = join('+', @count_list);
370
    $this_length += scalar(@count_list)-1 if @count_list;
371
    $breakout_label .= '='.$total_job_count if(scalar(@count_list)!=1); # only provide a total if multiple or no categories available
372 373 374
    $this_length += 1+length("$total_job_count") if(scalar(@count_list)!=1);

    $breakout_label = ' ' x ($field_size - $this_length) . $breakout_label if $field_size and $this_length<$field_size;
375

376
    return ($breakout_label, $total_job_count, \%count_hash);
377 378
}

379 380 381 382 383 384 385 386 387 388 389 390 391 392
sub friendly_avg_job_runtime {
    my $self = shift;

    my $avg = $self->avg_msec_per_job;
    my @units = ([24*3600*1000, 'day'], [3600*1000, 'hr'], [60*1000, 'min'], [1000, 'sec']);

    while (my $unit_description = shift @units) {
        my $x = $avg / $unit_description->[0];
        if ($x >= 1.) {
            return ($x, $unit_description->[1]);
        }
    }
    return ($avg, 'ms');
}
393

394 395
sub toString {
    my $self = shift @_;
396
    my $max_logic_name_length = shift || 40;
397

398 399
    my $can_do_colour                                   = (-t STDOUT ? 1 : 0);
    my ($breakout_label, $total_job_count, $count_hash) = $self->job_count_breakout(24, $can_do_colour);
400
    my $analysis                                        = $self->analysis;
401
    my ($avg_runtime, $avg_runtime_unit)                = $self->friendly_avg_job_runtime;
402

403
    my $output .= sprintf("%-${max_logic_name_length}s(%3d) %s, jobs( %s ), avg:%5.1f %-3s, workers(Running:%d, Est.Required:%d) ",
404
        $analysis->logic_name,
405
        $self->analysis_id // 0,
406

407
        _text_with_status_color(11, $can_do_colour, $self->status, $analysis_status_2_meta_status{$self->status} || $self->status),
408

409
        $breakout_label,
410

411
        $avg_runtime, $avg_runtime_unit,
412 413

        $self->num_running_workers,
414
        $self->estimate_num_required_workers,
415
    );
416 417
    $output .=  '  h.cap:'    .( $self->hive_capacity // '-' )
               .'  a.cap:'    .( $analysis->analysis_capacity // '-')
418
               ."  (sync'd "  .($self->seconds_since_when_updated // 0)." sec ago)";
419 420

    return $output;
421 422
}

423 424 425 426

sub check_blocking_control_rules {
    my $self = shift;
  
427
    my $ctrl_rules = $self->analysis->control_rules_collection();
428

429
    my $all_conditions_satisfied = 1;
430

Leo Gordon's avatar
Leo Gordon committed
431
    if(scalar @$ctrl_rules) {    # there are blocking ctrl_rules to check
432 433 434 435 436

        foreach my $ctrl_rule (@$ctrl_rules) {
                #use this method because the condition_analysis objects can be
                #network distributed to a different database so use it's adaptor to get
                #the AnalysisStats object
437 438 439 440
            my $condition_analysis  = $ctrl_rule->condition_analysis;
            my $condition_stats     = $condition_analysis && $condition_analysis->stats;
            my $condition_status    = $condition_stats    && $condition_stats->status;
            my $condition_cbe       = $condition_analysis && $condition_analysis->can_be_empty;
Leo Gordon's avatar
Leo Gordon committed
441
            my $condition_tjc       = $condition_stats    && $condition_stats->total_job_count;
442

443
            my $this_condition_satisfied = ($condition_status eq 'DONE')
444
                        || ($condition_cbe && !$condition_tjc);             # probably safer than saying ($condition_status eq 'EMPTY') because of the sync order
Leo Gordon's avatar
Leo Gordon committed
445

446 447
            unless( $this_condition_satisfied ) {
                $all_conditions_satisfied = 0;
448 449 450
            }
        }

451
        if($all_conditions_satisfied) {
452
            if($self->status eq 'BLOCKED') {    # unblock, since all conditions are met
453
                $self->status('LOADING');       # anything that is not 'BLOCKED' will do, it will be redefined in the following subroutine
454 455
            }
        } else {    # (re)block
456
            $self->status('BLOCKED');
457 458
        }
    }
Leo Gordon's avatar
Leo Gordon committed
459

460
    return $all_conditions_satisfied;
461 462 463
}


464 465 466 467
sub determine_status {
    my $self = shift;

    if($self->status ne 'BLOCKED') {
468
        if( !$self->total_job_count ) {
469

470
            $self->status('EMPTY');
471

472
        } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) {   # all jobs of the analysis have been finished
473
            my $analysis = $self->analysis;
474
            my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
475 476
            if ($self->failed_job_count > $absolute_tolerance) {
                $self->status('FAILED');
477 478 479 480
                warn       "\n##################################################\n";
                warn sprintf("##   ERROR: %-35s ##\n", $analysis->logic_name." failed!");
                warn sprintf("##     %d jobs failed (tolerance: %d (%3d%%)) ##\n", $self->failed_job_count, $absolute_tolerance, $analysis->failed_job_tolerance);
                warn         "##################################################\n\n";
481 482 483
            } else {
                $self->status('DONE');
            }
484 485 486 487 488
        } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running

            $self->status('READY');

        } elsif( !$self->ready_job_count ) {                                # there are no claimable jobs, possibly because some are semaphored
489 490 491

            $self->status('ALL_CLAIMED');

492
        } elsif( $self->inprogress_job_count ) {
493 494 495 496 497 498 499

            $self->status('WORKING');
        }
    }
}


500 501 502 503 504 505 506 507
sub recalculate_from_job_counts {
    my ($self, $job_counts) = @_;

        # only update job_counts if given the hash:
    if($job_counts) {
        $self->semaphored_job_count( $job_counts->{'SEMAPHORED'} || 0 );
        $self->ready_job_count(      $job_counts->{'READY'} || 0 );
        $self->failed_job_count(     $job_counts->{'FAILED'} || 0 );
508
        $self->done_job_count(       ( $job_counts->{'DONE'} // 0 ) + ($job_counts->{'PASSED_ON'} // 0 ) ); # done here or potentially done elsewhere
509 510 511 512 513 514 515 516 517
        $self->total_job_count(      sum( values %$job_counts ) || 0 );
    }

    $self->check_blocking_control_rules();

    $self->determine_status();
}


518
1;