AnalysisStats.pm 16.3 KB
Newer Older
1
2
3
=pod 

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::AnalysisStats
6
7
8

=head1 DESCRIPTION

9
10
11
12
    An object that maintains counters for jobs in different states. This data is used by the Scheduler.

=head1 LICENSE

13
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
14
15
16
17
18
19
20
21
22
23

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

24
=head1 CONTACT
25

26
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
27
28

=head1 APPENDIX
29

30
31
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
32

33
34
=cut

35

36
37
38
package Bio::EnsEMBL::Hive::AnalysisStats;

use strict;
39
use warnings;
40
41
use List::Util 'sum';
use POSIX;
42
use Term::ANSIColor;
43

44
use base ( 'Bio::EnsEMBL::Hive::Cacheable', 'Bio::EnsEMBL::Hive::Storable' );
45

46

47
48
49
50
51
sub unikey {    # override the default from Cacheable parent
    return [ 'analysis' ];
}


52
53
54
55
56
57
    ## Minimum amount of time in msec that a worker should run before reporting
    ## back to the hive. This is used when setting the batch_size automatically.
sub min_batch_time {
    return 2*60*1000;
}

58

59
60
61
62
63
64
=head1 AUTOLOADED

    analysis_id / analysis

=cut

65

66
67
68
69
70
sub batch_size {
    my $self = shift;
    $self->{'_batch_size'} = shift if(@_);
    $self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
    return $self->{'_batch_size'};
71
72
}

73
74
75
76
sub hive_capacity {
    my $self = shift;
    $self->{'_hive_capacity'} = shift if(@_);
    return $self->{'_hive_capacity'};
77
78
}

79
sub status {
80
    my $self = shift;
81
82
83
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
}
84
85


86
## counters of jobs in different states:
87

88

89
sub total_job_count {
90
    my $self = shift;
91
92
    $self->{'_total_job_count'} = shift if(@_);
    return $self->{'_total_job_count'};
93
94
}

95
sub semaphored_job_count {
96
    my $self = shift;
97
98
99
100
101
102
103
104
    $self->{'_semaphored_job_count'} = shift if(@_);
    return $self->{'_semaphored_job_count'};
}

sub ready_job_count {
    my $self = shift;
    $self->{'_ready_job_count'} = shift if(@_);
    return $self->{'_ready_job_count'};
105
106
}

107
sub done_job_count {
108
    my $self = shift;
109
110
    $self->{'_done_job_count'} = shift if(@_);
    return $self->{'_done_job_count'};
111
112
}

113
sub failed_job_count {
114
    my $self = shift;
115
116
117
118
119
120
121
122
123
    $self->{'_failed_job_count'} = shift if(@_);
    $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
    return $self->{'_failed_job_count'};
}

sub num_running_workers {
    my $self = shift;
    $self->{'_num_running_workers'} = shift if(@_);
    return $self->{'_num_running_workers'};
124
125
}

126
127
128
129

## dynamic hive_capacity mode attributes:


130
sub behaviour {
131
132
133
    my $self = shift;
    $self->{'_behaviour'} = shift if(@_);
    return $self->{'_behaviour'};
134
135
136
}

sub input_capacity {
137
138
139
    my $self = shift;
    $self->{'_input_capacity'} = shift if(@_);
    return $self->{'_input_capacity'};
140
141
142
}

sub output_capacity {
143
144
145
    my $self = shift;
    $self->{'_output_capacity'} = shift if(@_);
    return $self->{'_output_capacity'};
146
147
}

148

149
## dynamic hive_capacity mode counters:
150

151
152

sub avg_msec_per_job {
153
    my $self = shift;
154
155
156
    $self->{'_avg_msec_per_job'} = shift if(@_);
    $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
    return $self->{'_avg_msec_per_job'};
157
158
}

159
sub avg_input_msec_per_job {
160
    my $self = shift;
161
162
163
    $self->{'_avg_input_msec_per_job'} = shift if(@_);
    $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
    return $self->{'_avg_input_msec_per_job'};
164
165
}

166
sub avg_run_msec_per_job {
167
    my $self = shift;
168
169
170
    $self->{'_avg_run_msec_per_job'} = shift if(@_);
    $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
    return $self->{'_avg_run_msec_per_job'};
171
172
}

173
sub avg_output_msec_per_job {
174
    my $self = shift;
175
176
177
    $self->{'_avg_output_msec_per_job'} = shift if(@_);
    $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
    return $self->{'_avg_output_msec_per_job'};
178
179
}

180

181
## other storable attributes:
182

183
sub when_updated {                   # this method is called by the initial store() [at which point it returns undef]
184
    my $self = shift;
185
186
    $self->{'_when_updated'} = shift if(@_);
    return $self->{'_when_updated'};
187
}
188

189
sub seconds_since_when_updated {     # we fetch the server difference, store local time in the memory object, and use the local difference
190
    my( $self, $value ) = @_;
191
192
    $self->{'_when_updated'} = time() - $value if(defined($value));
    return defined($self->{'_when_updated'}) ? time() - $self->{'_when_updated'} : undef;
193
194
}

195
196
197
198
199
200
sub seconds_since_last_fetch {      # track the freshness of the object (store local time, use the local difference)
    my( $self, $value ) = @_;
    $self->{'_last_fetch'} = time() - $value if(defined($value));
    return defined($self->{'_last_fetch'}) ? time() - $self->{'_last_fetch'} : undef;
}

201
202
203
204
sub sync_lock {
    my $self = shift;
    $self->{'_sync_lock'} = shift if(@_);
    return $self->{'_sync_lock'};
205
206
}

207
208
209
210
211

# non-storable attributes and other helper-methods:


sub refresh {
212
213
    my ($self, $seconds_fresh)      = @_;
    my $seconds_since_last_fetch    = $self->seconds_since_last_fetch;
214

215
216
217
218
    if( $self->adaptor
    and (!defined($seconds_fresh) or !defined($seconds_since_last_fetch) or $seconds_fresh < $seconds_since_last_fetch) ) {
        return $self->adaptor->refresh($self);
    }
219
220
}

221

222
223
sub update {
    my $self = shift;
224
225
226
227

    if($self->adaptor) {
        $self->adaptor->update($self);
    }
228
229
}

230

231
sub get_or_estimate_batch_size {
232
    my $self                = shift @_;
233
    my $remaining_job_count = shift @_ || 0;    # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
234

235
236
237
    my $batch_size = $self->batch_size;

    if( $batch_size > 0 ) {        # set to positive or not set (and auto-initialized within $self->batch_size)
238
239

                                                        # otherwise it is a request for dynamic estimation:
240
    } elsif( my $avg_msec_per_job = $self->avg_msec_per_job ) {           # further estimations from collected stats
241
242
243

        $avg_msec_per_job = 100 if($avg_msec_per_job<100);

244
        $batch_size = POSIX::ceil( $self->min_batch_time / $avg_msec_per_job );
245
246

    } else {        # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
247
248
249
        $batch_size = -$batch_size || 1;
    }

250
251
        # TailTrimming correction aims at meeting the requirement half way:
    if( my $num_of_workers = POSIX::ceil( ($self->num_running_workers + $self->estimate_num_required_workers($remaining_job_count))/2 ) ) {
252

253
        my $jobs_to_do  = $self->ready_job_count + $remaining_job_count;
254

255
        my $tt_batch_size = POSIX::floor( $jobs_to_do / $num_of_workers );
256
257
258
        if( (0 < $tt_batch_size) && ($tt_batch_size < $batch_size) ) {
            $batch_size = $tt_batch_size;
        } elsif(!$tt_batch_size) {
259
            $batch_size = POSIX::ceil( $jobs_to_do / $num_of_workers ); # essentially, 0 or 1
260
        }
261
    }
262

263

264
    return $batch_size;
265
}
266

267

268
269
270
271
272
273
274
275
276
sub estimate_num_required_workers {     # this 'max allowed' total includes the ones that are currently running
    my $self                = shift @_;
    my $remaining_job_count = shift @_ || 0;    # FIXME: a better estimate would be $self->claimed_job_count when it is introduced

    my $num_required_workers = $self->ready_job_count + $remaining_job_count;   # this 'max' estimation can still be zero

    my $h_cap = $self->hive_capacity;
    if( defined($h_cap) and $h_cap>=0) {  # what is the currently attainable maximum defined via hive_capacity?
        my $hive_current_load = $self->adaptor ? $self->adaptor->db->get_RoleAdaptor->get_hive_current_load() : 0;
277
        my $h_max = $self->num_running_workers + POSIX::floor( $h_cap * ( 1.0 - $hive_current_load ) );
278
279
280
281
282
283
284
285
286
287
        if($h_max < $num_required_workers) {
            $num_required_workers = $h_max;
        }
    }
    my $a_max = $self->analysis->analysis_capacity;
    if( defined($a_max) and $a_max>=0 ) {   # what is the currently attainable maximum defined via analysis_capacity?
        if($a_max < $num_required_workers) {
            $num_required_workers = $a_max;
        }
    }
288

289
    return $num_required_workers;
290
291
292
}


293
sub inprogress_job_count {      # includes CLAIMED
294
295
296
297
298
299
300
    my $self = shift;
    return    $self->total_job_count
            - $self->semaphored_job_count
            - $self->ready_job_count
            - $self->done_job_count
            - $self->failed_job_count;
}
301

302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
my %meta_status_2_color = (
    'DONE'      => 'bright_cyan',
    'RUNNING'   => 'bright_yellow',
    'READY'     => 'bright_green',
    'BLOCKED'   => 'black on_white',
    'EMPTY'     => 'clear',
    'FAILED'    => 'red',
);

my %analysis_status_2_meta_status = (
    'LOADING'       => 'READY',
    'SYNCHING'      => 'READY',
    'ALL_CLAIMED'   => 'BLOCKED',
    'WORKING'       => 'RUNNING',
);

my %count_method_2_meta_status = (
    'semaphored_job_count'  => 'BLOCKED',
    'ready_job_count'       => 'READY',
    'inprogress_job_count'  => 'RUNNING',
    'done_job_count'        => 'DONE',
    'failed_job_count'      => 'FAILED',
);

sub _text_with_status_color {
    my $field_size = shift;
    my $color_enabled = shift;

    my $padding = ($field_size and length($_[0]) < $field_size) ? ' ' x ($field_size - length($_[0])) : '';
    return $padding . ($color_enabled ? color($meta_status_2_color{$_[1]}).$_[0].color('reset') : $_[0]);
}

334

335
336
sub job_count_breakout {
    my $self = shift;
337
338
    my $field_size = shift;
    my $color_enabled = shift;
339

340
    my $this_length = 0;
341
342
343
    my @count_list = ();
    my %count_hash = ();
    my $total_job_count = $self->total_job_count();
344
    foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
345
        if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
346
347
            $this_length += length("$count") + 1;
            push @count_list, _text_with_status_color(undef, $color_enabled, $count, $count_method_2_meta_status{$count_method}).substr($count_method,0,1);
348
349
        }
    }
350
    my $breakout_label = join('+', @count_list);
351
    $this_length += scalar(@count_list)-1 if @count_list;
352
    $breakout_label .= '='.$total_job_count if(scalar(@count_list)!=1); # only provide a total if multiple or no categories available
353
354
355
    $this_length += 1+length("$total_job_count") if(scalar(@count_list)!=1);

    $breakout_label = ' ' x ($field_size - $this_length) . $breakout_label if $field_size and $this_length<$field_size;
356

357
    return ($breakout_label, $total_job_count, \%count_hash);
358
359
}

360
361
362
363
364
365
366
367
368
369
370
371
372
373
sub friendly_avg_job_runtime {
    my $self = shift;

    my $avg = $self->avg_msec_per_job;
    my @units = ([24*3600*1000, 'day'], [3600*1000, 'hr'], [60*1000, 'min'], [1000, 'sec']);

    while (my $unit_description = shift @units) {
        my $x = $avg / $unit_description->[0];
        if ($x >= 1.) {
            return ($x, $unit_description->[1]);
        }
    }
    return ($avg, 'ms');
}
374

375
376
sub toString {
    my $self = shift @_;
377
    my $max_logic_name_length = shift || 40;
378

379
380
    my $can_do_colour                                   = (-t STDOUT ? 1 : 0);
    my ($breakout_label, $total_job_count, $count_hash) = $self->job_count_breakout(24, $can_do_colour);
381
    my $analysis                                        = $self->analysis;
382
    my ($avg_runtime, $avg_runtime_unit)                = $self->friendly_avg_job_runtime;
383

384
    my $output .= sprintf("%-${max_logic_name_length}s(%3d) %s, jobs( %s ), avg:%5.1f %-3s, workers(Running:%d, Est.Required:%d) ",
385
        $analysis->logic_name,
386
        $self->analysis_id // 0,
387

388
        _text_with_status_color(11, $can_do_colour, $self->status, $analysis_status_2_meta_status{$self->status} || $self->status),
389

390
        $breakout_label,
391

392
        $avg_runtime, $avg_runtime_unit,
393
394

        $self->num_running_workers,
395
        $self->estimate_num_required_workers,
396
    );
397
398
    $output .=  '  h.cap:'    .( $self->hive_capacity // '-' )
               .'  a.cap:'    .( $analysis->analysis_capacity // '-')
399
               ."  (sync'd "  .($self->seconds_since_when_updated // 0)." sec ago)";
400
401

    return $output;
402
403
}

404
405
406
407

sub check_blocking_control_rules {
    my $self = shift;
  
408
    my $ctrl_rules = $self->analysis->control_rules_collection();
409

410
    my $all_conditions_satisfied = 1;
411

Leo Gordon's avatar
Leo Gordon committed
412
    if(scalar @$ctrl_rules) {    # there are blocking ctrl_rules to check
413
414
415
416
417

        foreach my $ctrl_rule (@$ctrl_rules) {
                #use this method because the condition_analysis objects can be
                #network distributed to a different database so use it's adaptor to get
                #the AnalysisStats object
418
419
420
421
            my $condition_analysis  = $ctrl_rule->condition_analysis;
            my $condition_stats     = $condition_analysis && $condition_analysis->stats;
            my $condition_status    = $condition_stats    && $condition_stats->status;
            my $condition_cbe       = $condition_analysis && $condition_analysis->can_be_empty;
Leo Gordon's avatar
Leo Gordon committed
422
            my $condition_tjc       = $condition_stats    && $condition_stats->total_job_count;
423

424
            my $this_condition_satisfied = ($condition_status eq 'DONE')
425
                        || ($condition_cbe && !$condition_tjc);             # probably safer than saying ($condition_status eq 'EMPTY') because of the sync order
Leo Gordon's avatar
Leo Gordon committed
426

427
428
            unless( $this_condition_satisfied ) {
                $all_conditions_satisfied = 0;
429
430
431
            }
        }

432
        if($all_conditions_satisfied) {
433
            if($self->status eq 'BLOCKED') {    # unblock, since all conditions are met
434
                $self->status('LOADING');       # anything that is not 'BLOCKED' will do, it will be redefined in the following subroutine
435
436
            }
        } else {    # (re)block
437
            $self->status('BLOCKED');
438
439
        }
    }
Leo Gordon's avatar
Leo Gordon committed
440

441
    return $all_conditions_satisfied;
442
443
444
}


445
446
447
448
sub determine_status {
    my $self = shift;

    if($self->status ne 'BLOCKED') {
449
        if( !$self->total_job_count ) {
450

451
            $self->status('EMPTY');
452

453
        } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) {   # all jobs of the analysis have been finished
454
            my $analysis = $self->analysis;
455
            my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
456
457
            if ($self->failed_job_count > $absolute_tolerance) {
                $self->status('FAILED');
458
459
460
461
                warn       "\n##################################################\n";
                warn sprintf("##   ERROR: %-35s ##\n", $analysis->logic_name." failed!");
                warn sprintf("##     %d jobs failed (tolerance: %d (%3d%%)) ##\n", $self->failed_job_count, $absolute_tolerance, $analysis->failed_job_tolerance);
                warn         "##################################################\n\n";
462
463
464
            } else {
                $self->status('DONE');
            }
465
466
467
468
469
        } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running

            $self->status('READY');

        } elsif( !$self->ready_job_count ) {                                # there are no claimable jobs, possibly because some are semaphored
470
471
472

            $self->status('ALL_CLAIMED');

473
        } elsif( $self->inprogress_job_count ) {
474
475
476
477
478
479
480

            $self->status('WORKING');
        }
    }
}


481
482
483
484
485
486
487
488
sub recalculate_from_job_counts {
    my ($self, $job_counts) = @_;

        # only update job_counts if given the hash:
    if($job_counts) {
        $self->semaphored_job_count( $job_counts->{'SEMAPHORED'} || 0 );
        $self->ready_job_count(      $job_counts->{'READY'} || 0 );
        $self->failed_job_count(     $job_counts->{'FAILED'} || 0 );
489
        $self->done_job_count(       ( $job_counts->{'DONE'} // 0 ) + ($job_counts->{'PASSED_ON'} // 0 ) ); # done here or potentially done elsewhere
490
491
492
493
494
495
496
497
498
        $self->total_job_count(      sum( values %$job_counts ) || 0 );
    }

    $self->check_blocking_control_rules();

    $self->determine_status();
}


499
1;