AnalysisStats.pm 16.4 KB
Newer Older
1
2
3
=pod 

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::AnalysisStats
6
7
8

=head1 DESCRIPTION

9
10
11
12
    An object that maintains counters for jobs in different states. This data is used by the Scheduler.

=head1 LICENSE

13
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
14
15
16
17
18
19
20
21
22
23

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

24
=head1 CONTACT
25

26
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
27
28

=head1 APPENDIX
29

30
31
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
32

33
34
=cut

35

36
37
38
package Bio::EnsEMBL::Hive::AnalysisStats;

use strict;
39
use warnings;
40
41
use List::Util 'sum';
use POSIX;
42
use Term::ANSIColor;
43

44
use Bio::EnsEMBL::Hive::Utils ('throw');
45
use Bio::EnsEMBL::Hive::Analysis;
46

47
use base ( 'Bio::EnsEMBL::Hive::Cacheable', 'Bio::EnsEMBL::Hive::Storable' );
48

49

50
51
52
53
54
sub unikey {    # override the default from Cacheable parent
    return [ 'analysis' ];
}


55
56
57
58
59
60
    ## Minimum amount of time in msec that a worker should run before reporting
    ## back to the hive. This is used when setting the batch_size automatically.
sub min_batch_time {
    return 2*60*1000;
}

61

62
63
64
65
66
67
=head1 AUTOLOADED

    analysis_id / analysis

=cut

68

69
70
71
72
73
sub batch_size {
    my $self = shift;
    $self->{'_batch_size'} = shift if(@_);
    $self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
    return $self->{'_batch_size'};
74
75
}

76
77
78
79
sub hive_capacity {
    my $self = shift;
    $self->{'_hive_capacity'} = shift if(@_);
    return $self->{'_hive_capacity'};
80
81
}

82
sub status {
83
    my $self = shift;
84
85
86
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
}
87
88


89
## counters of jobs in different states:
90

91

92
sub total_job_count {
93
    my $self = shift;
94
95
    $self->{'_total_job_count'} = shift if(@_);
    return $self->{'_total_job_count'};
96
97
}

98
sub semaphored_job_count {
99
    my $self = shift;
100
101
102
103
104
105
106
107
    $self->{'_semaphored_job_count'} = shift if(@_);
    return $self->{'_semaphored_job_count'};
}

sub ready_job_count {
    my $self = shift;
    $self->{'_ready_job_count'} = shift if(@_);
    return $self->{'_ready_job_count'};
108
109
}

110
sub done_job_count {
111
    my $self = shift;
112
113
    $self->{'_done_job_count'} = shift if(@_);
    return $self->{'_done_job_count'};
114
115
}

116
sub failed_job_count {
117
    my $self = shift;
118
119
120
121
122
123
124
125
126
    $self->{'_failed_job_count'} = shift if(@_);
    $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
    return $self->{'_failed_job_count'};
}

sub num_running_workers {
    my $self = shift;
    $self->{'_num_running_workers'} = shift if(@_);
    return $self->{'_num_running_workers'};
127
128
}

129
130
131
132

## dynamic hive_capacity mode attributes:


133
sub behaviour {
134
135
136
    my $self = shift;
    $self->{'_behaviour'} = shift if(@_);
    return $self->{'_behaviour'};
137
138
139
}

sub input_capacity {
140
141
142
    my $self = shift;
    $self->{'_input_capacity'} = shift if(@_);
    return $self->{'_input_capacity'};
143
144
145
}

sub output_capacity {
146
147
148
    my $self = shift;
    $self->{'_output_capacity'} = shift if(@_);
    return $self->{'_output_capacity'};
149
150
}

151

152
## dynamic hive_capacity mode counters:
153

154
155

sub avg_msec_per_job {
156
    my $self = shift;
157
158
159
    $self->{'_avg_msec_per_job'} = shift if(@_);
    $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
    return $self->{'_avg_msec_per_job'};
160
161
}

162
sub avg_input_msec_per_job {
163
    my $self = shift;
164
165
166
    $self->{'_avg_input_msec_per_job'} = shift if(@_);
    $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
    return $self->{'_avg_input_msec_per_job'};
167
168
}

169
sub avg_run_msec_per_job {
170
    my $self = shift;
171
172
173
    $self->{'_avg_run_msec_per_job'} = shift if(@_);
    $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
    return $self->{'_avg_run_msec_per_job'};
174
175
}

176
sub avg_output_msec_per_job {
177
    my $self = shift;
178
179
180
    $self->{'_avg_output_msec_per_job'} = shift if(@_);
    $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
    return $self->{'_avg_output_msec_per_job'};
181
182
}

183

184
## other storable attributes:
185

186
sub when_updated {                   # this method is called by the initial store() [at which point it returns undef]
187
    my $self = shift;
188
189
    $self->{'_when_updated'} = shift if(@_);
    return $self->{'_when_updated'};
190
}
191

192
sub seconds_since_when_updated {     # we fetch the server difference, store local time in the memory object, and use the local difference
193
    my( $self, $value ) = @_;
194
195
    $self->{'_when_updated'} = time() - $value if(defined($value));
    return defined($self->{'_when_updated'}) ? time() - $self->{'_when_updated'} : undef;
196
197
}

198
199
200
201
202
203
sub seconds_since_last_fetch {      # track the freshness of the object (store local time, use the local difference)
    my( $self, $value ) = @_;
    $self->{'_last_fetch'} = time() - $value if(defined($value));
    return defined($self->{'_last_fetch'}) ? time() - $self->{'_last_fetch'} : undef;
}

204
205
206
207
sub sync_lock {
    my $self = shift;
    $self->{'_sync_lock'} = shift if(@_);
    return $self->{'_sync_lock'};
208
209
}

210
211
212
213
214

# non-storable attributes and other helper-methods:


sub refresh {
215
216
    my ($self, $seconds_fresh)      = @_;
    my $seconds_since_last_fetch    = $self->seconds_since_last_fetch;
217

218
219
220
221
    if( $self->adaptor
    and (!defined($seconds_fresh) or !defined($seconds_since_last_fetch) or $seconds_fresh < $seconds_since_last_fetch) ) {
        return $self->adaptor->refresh($self);
    }
222
223
}

224

225
226
sub update {
    my $self = shift;
227
228
229
230

    if($self->adaptor) {
        $self->adaptor->update($self);
    }
231
232
}

233

234
sub get_or_estimate_batch_size {
235
    my $self                = shift @_;
236
    my $remaining_job_count = shift @_ || 0;    # FIXME: a better estimate would be $self->claimed_job_count when it is introduced
237

238
239
240
    my $batch_size = $self->batch_size;

    if( $batch_size > 0 ) {        # set to positive or not set (and auto-initialized within $self->batch_size)
241
242

                                                        # otherwise it is a request for dynamic estimation:
243
    } elsif( my $avg_msec_per_job = $self->avg_msec_per_job ) {           # further estimations from collected stats
244
245
246

        $avg_msec_per_job = 100 if($avg_msec_per_job<100);

247
        $batch_size = POSIX::ceil( $self->min_batch_time / $avg_msec_per_job );
248
249

    } else {        # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
250
251
252
        $batch_size = -$batch_size || 1;
    }

253
254
        # TailTrimming correction aims at meeting the requirement half way:
    if( my $num_of_workers = POSIX::ceil( ($self->num_running_workers + $self->estimate_num_required_workers($remaining_job_count))/2 ) ) {
255

256
        my $jobs_to_do  = $self->ready_job_count + $remaining_job_count;
257

258
        my $tt_batch_size = POSIX::floor( $jobs_to_do / $num_of_workers );
259
260
261
        if( (0 < $tt_batch_size) && ($tt_batch_size < $batch_size) ) {
            $batch_size = $tt_batch_size;
        } elsif(!$tt_batch_size) {
262
            $batch_size = POSIX::ceil( $jobs_to_do / $num_of_workers ); # essentially, 0 or 1
263
        }
264
    }
265

266

267
    return $batch_size;
268
}
269

270

271
272
273
274
275
276
277
278
279
sub estimate_num_required_workers {     # this 'max allowed' total includes the ones that are currently running
    my $self                = shift @_;
    my $remaining_job_count = shift @_ || 0;    # FIXME: a better estimate would be $self->claimed_job_count when it is introduced

    my $num_required_workers = $self->ready_job_count + $remaining_job_count;   # this 'max' estimation can still be zero

    my $h_cap = $self->hive_capacity;
    if( defined($h_cap) and $h_cap>=0) {  # what is the currently attainable maximum defined via hive_capacity?
        my $hive_current_load = $self->adaptor ? $self->adaptor->db->get_RoleAdaptor->get_hive_current_load() : 0;
280
        my $h_max = $self->num_running_workers + POSIX::floor( $h_cap * ( 1.0 - $hive_current_load ) );
281
282
283
284
285
286
287
288
289
290
        if($h_max < $num_required_workers) {
            $num_required_workers = $h_max;
        }
    }
    my $a_max = $self->analysis->analysis_capacity;
    if( defined($a_max) and $a_max>=0 ) {   # what is the currently attainable maximum defined via analysis_capacity?
        if($a_max < $num_required_workers) {
            $num_required_workers = $a_max;
        }
    }
291

292
    return $num_required_workers;
293
294
295
}


296
sub inprogress_job_count {      # includes CLAIMED
297
298
299
300
301
302
303
    my $self = shift;
    return    $self->total_job_count
            - $self->semaphored_job_count
            - $self->ready_job_count
            - $self->done_job_count
            - $self->failed_job_count;
}
304

305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
my %meta_status_2_color = (
    'DONE'      => 'bright_cyan',
    'RUNNING'   => 'bright_yellow',
    'READY'     => 'bright_green',
    'BLOCKED'   => 'black on_white',
    'EMPTY'     => 'clear',
    'FAILED'    => 'red',
);

my %analysis_status_2_meta_status = (
    'LOADING'       => 'READY',
    'SYNCHING'      => 'READY',
    'ALL_CLAIMED'   => 'BLOCKED',
    'WORKING'       => 'RUNNING',
);

my %count_method_2_meta_status = (
    'semaphored_job_count'  => 'BLOCKED',
    'ready_job_count'       => 'READY',
    'inprogress_job_count'  => 'RUNNING',
    'done_job_count'        => 'DONE',
    'failed_job_count'      => 'FAILED',
);

sub _text_with_status_color {
    my $field_size = shift;
    my $color_enabled = shift;

    my $padding = ($field_size and length($_[0]) < $field_size) ? ' ' x ($field_size - length($_[0])) : '';
    return $padding . ($color_enabled ? color($meta_status_2_color{$_[1]}).$_[0].color('reset') : $_[0]);
}

337

338
339
sub job_count_breakout {
    my $self = shift;
340
341
    my $field_size = shift;
    my $color_enabled = shift;
342

343
    my $this_length = 0;
344
345
346
    my @count_list = ();
    my %count_hash = ();
    my $total_job_count = $self->total_job_count();
347
    foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
348
        if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
349
350
            $this_length += length("$count") + 1;
            push @count_list, _text_with_status_color(undef, $color_enabled, $count, $count_method_2_meta_status{$count_method}).substr($count_method,0,1);
351
352
        }
    }
353
    my $breakout_label = join('+', @count_list);
354
    $this_length += scalar(@count_list)-1 if @count_list;
355
    $breakout_label .= '='.$total_job_count if(scalar(@count_list)!=1); # only provide a total if multiple or no categories available
356
357
358
    $this_length += 1+length("$total_job_count") if(scalar(@count_list)!=1);

    $breakout_label = ' ' x ($field_size - $this_length) . $breakout_label if $field_size and $this_length<$field_size;
359

360
    return ($breakout_label, $total_job_count, \%count_hash);
361
362
}

363
364
365
366
367
368
369
370
371
372
373
374
375
376
sub friendly_avg_job_runtime {
    my $self = shift;

    my $avg = $self->avg_msec_per_job;
    my @units = ([24*3600*1000, 'day'], [3600*1000, 'hr'], [60*1000, 'min'], [1000, 'sec']);

    while (my $unit_description = shift @units) {
        my $x = $avg / $unit_description->[0];
        if ($x >= 1.) {
            return ($x, $unit_description->[1]);
        }
    }
    return ($avg, 'ms');
}
377

378
379
sub toString {
    my $self = shift @_;
380
    my $max_logic_name_length = shift || 40;
381

382
383
    my $can_do_colour                                   = (-t STDOUT ? 1 : 0);
    my ($breakout_label, $total_job_count, $count_hash) = $self->job_count_breakout(24, $can_do_colour);
384
    my $analysis                                        = $self->analysis;
385
    my ($avg_runtime, $avg_runtime_unit)                = $self->friendly_avg_job_runtime;
386

387
    my $output .= sprintf("%-${max_logic_name_length}s(%3d) %s, jobs( %s ), avg:%5.1f %-3s, workers(Running:%d, Est.Required:%d) ",
388
        $analysis->logic_name,
389
        $self->analysis_id // 0,
390

391
        _text_with_status_color(11, $can_do_colour, $self->status, $analysis_status_2_meta_status{$self->status} || $self->status),
392

393
        $breakout_label,
394

395
        $avg_runtime, $avg_runtime_unit,
396
397

        $self->num_running_workers,
398
        $self->estimate_num_required_workers,
399
    );
400
401
    $output .=  '  h.cap:'    .( $self->hive_capacity // '-' )
               .'  a.cap:'    .( $analysis->analysis_capacity // '-')
402
               ."  (sync'd "  .($self->seconds_since_when_updated // 0)." sec ago)";
403
404

    return $output;
405
406
}

407
408
409
410

sub check_blocking_control_rules {
    my $self = shift;
  
411
    my $ctrl_rules = $self->analysis->control_rules_collection();
412

413
    my $all_conditions_satisfied = 1;
414

Leo Gordon's avatar
Leo Gordon committed
415
    if(scalar @$ctrl_rules) {    # there are blocking ctrl_rules to check
416
417
418
419
420

        foreach my $ctrl_rule (@$ctrl_rules) {
                #use this method because the condition_analysis objects can be
                #network distributed to a different database so use it's adaptor to get
                #the AnalysisStats object
421
422
423
424
            my $condition_analysis  = $ctrl_rule->condition_analysis;
            my $condition_stats     = $condition_analysis && $condition_analysis->stats;
            my $condition_status    = $condition_stats    && $condition_stats->status;
            my $condition_cbe       = $condition_analysis && $condition_analysis->can_be_empty;
Leo Gordon's avatar
Leo Gordon committed
425
            my $condition_tjc       = $condition_stats    && $condition_stats->total_job_count;
426

427
            my $this_condition_satisfied = ($condition_status eq 'DONE')
428
                        || ($condition_cbe && !$condition_tjc);             # probably safer than saying ($condition_status eq 'EMPTY') because of the sync order
Leo Gordon's avatar
Leo Gordon committed
429

430
431
            unless( $this_condition_satisfied ) {
                $all_conditions_satisfied = 0;
432
433
434
            }
        }

435
        if($all_conditions_satisfied) {
436
            if($self->status eq 'BLOCKED') {    # unblock, since all conditions are met
437
                $self->status('LOADING');       # anything that is not 'BLOCKED' will do, it will be redefined in the following subroutine
438
439
            }
        } else {    # (re)block
440
            $self->status('BLOCKED');
441
442
        }
    }
Leo Gordon's avatar
Leo Gordon committed
443

444
    return $all_conditions_satisfied;
445
446
447
}


448
449
450
451
sub determine_status {
    my $self = shift;

    if($self->status ne 'BLOCKED') {
452
        if( !$self->total_job_count ) {
453

454
            $self->status('EMPTY');
455

456
        } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) {   # all jobs of the analysis have been finished
457
            my $analysis = $self->analysis;
458
            my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
459
460
            if ($self->failed_job_count > $absolute_tolerance) {
                $self->status('FAILED');
461
462
463
464
                warn       "\n##################################################\n";
                warn sprintf("##   ERROR: %-35s ##\n", $analysis->logic_name." failed!");
                warn sprintf("##     %d jobs failed (tolerance: %d (%3d%%)) ##\n", $self->failed_job_count, $absolute_tolerance, $analysis->failed_job_tolerance);
                warn         "##################################################\n\n";
465
466
467
            } else {
                $self->status('DONE');
            }
468
469
470
471
472
        } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running

            $self->status('READY');

        } elsif( !$self->ready_job_count ) {                                # there are no claimable jobs, possibly because some are semaphored
473
474
475

            $self->status('ALL_CLAIMED');

476
        } elsif( $self->inprogress_job_count ) {
477
478
479
480
481
482
483

            $self->status('WORKING');
        }
    }
}


484
485
486
487
488
489
490
491
sub recalculate_from_job_counts {
    my ($self, $job_counts) = @_;

        # only update job_counts if given the hash:
    if($job_counts) {
        $self->semaphored_job_count( $job_counts->{'SEMAPHORED'} || 0 );
        $self->ready_job_count(      $job_counts->{'READY'} || 0 );
        $self->failed_job_count(     $job_counts->{'FAILED'} || 0 );
492
        $self->done_job_count(       ( $job_counts->{'DONE'} // 0 ) + ($job_counts->{'PASSED_ON'} // 0 ) ); # done here or potentially done elsewhere
493
494
495
496
497
498
499
500
501
        $self->total_job_count(      sum( values %$job_counts ) || 0 );
    }

    $self->check_blocking_control_rules();

    $self->determine_status();
}


502
1;