AnalysisStats.pm 14.7 KB
Newer Older
1
2
3
=pod 

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::AnalysisStats
6
7
8

=head1 DESCRIPTION

9
10
11
12
    An object that maintains counters for jobs in different states. This data is used by the Scheduler.

=head1 LICENSE

13
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
14
15
16
17
18
19
20
21
22
23

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

24
=head1 CONTACT
25

26
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
27
28

=head1 APPENDIX
29

30
31
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
32

33
34
=cut

35

36
37
38
package Bio::EnsEMBL::Hive::AnalysisStats;

use strict;
39
use warnings;
40
41
use List::Util 'sum';
use POSIX;
42
use Term::ANSIColor;
43

44
use Bio::EnsEMBL::Hive::Utils ('throw');
45
use Bio::EnsEMBL::Hive::Analysis;
46

47
use base ( 'Bio::EnsEMBL::Hive::Cacheable', 'Bio::EnsEMBL::Hive::Storable' );
48

49

50
51
52
53
54
sub unikey {    # override the default from Cacheable parent
    return [ 'analysis' ];
}


55
56
57
58
59
60
    ## Minimum amount of time in msec that a worker should run before reporting
    ## back to the hive. This is used when setting the batch_size automatically.
sub min_batch_time {
    return 2*60*1000;
}

61

62
63
64
65
66
67
=head1 AUTOLOADED

    analysis_id / analysis

=cut

68

69
70
71
72
73
sub batch_size {
    my $self = shift;
    $self->{'_batch_size'} = shift if(@_);
    $self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
    return $self->{'_batch_size'};
74
75
}

76
77
78
79
sub hive_capacity {
    my $self = shift;
    $self->{'_hive_capacity'} = shift if(@_);
    return $self->{'_hive_capacity'};
80
81
}

82
sub status {
83
    my $self = shift;
84
85
86
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
}
87
88


89
## counters of jobs in different states:
90

91

92
sub total_job_count {
93
    my $self = shift;
94
95
    $self->{'_total_job_count'} = shift if(@_);
    return $self->{'_total_job_count'};
96
97
}

98
sub semaphored_job_count {
99
    my $self = shift;
100
101
102
103
104
105
106
107
    $self->{'_semaphored_job_count'} = shift if(@_);
    return $self->{'_semaphored_job_count'};
}

sub ready_job_count {
    my $self = shift;
    $self->{'_ready_job_count'} = shift if(@_);
    return $self->{'_ready_job_count'};
108
109
}

110
sub done_job_count {
111
    my $self = shift;
112
113
    $self->{'_done_job_count'} = shift if(@_);
    return $self->{'_done_job_count'};
114
115
}

116
sub failed_job_count {
117
    my $self = shift;
118
119
120
121
122
123
124
125
126
    $self->{'_failed_job_count'} = shift if(@_);
    $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
    return $self->{'_failed_job_count'};
}

sub num_running_workers {
    my $self = shift;
    $self->{'_num_running_workers'} = shift if(@_);
    return $self->{'_num_running_workers'};
127
128
}

129
sub num_required_workers {      # NB: the meaning of this field is, again, "how many extra workers we need to add"
130
131
132
133
134
135
136
137
138
    my $self = shift;
    $self->{'_num_required_workers'} = shift if(@_);
    return $self->{'_num_required_workers'};
}


## dynamic hive_capacity mode attributes:


139
sub behaviour {
140
141
142
    my $self = shift;
    $self->{'_behaviour'} = shift if(@_);
    return $self->{'_behaviour'};
143
144
145
}

sub input_capacity {
146
147
148
    my $self = shift;
    $self->{'_input_capacity'} = shift if(@_);
    return $self->{'_input_capacity'};
149
150
151
}

sub output_capacity {
152
153
154
    my $self = shift;
    $self->{'_output_capacity'} = shift if(@_);
    return $self->{'_output_capacity'};
155
156
}

157

158
## dynamic hive_capacity mode counters:
159

160
161

sub avg_msec_per_job {
162
    my $self = shift;
163
164
165
    $self->{'_avg_msec_per_job'} = shift if(@_);
    $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
    return $self->{'_avg_msec_per_job'};
166
167
}

168
sub avg_input_msec_per_job {
169
    my $self = shift;
170
171
172
    $self->{'_avg_input_msec_per_job'} = shift if(@_);
    $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
    return $self->{'_avg_input_msec_per_job'};
173
174
}

175
sub avg_run_msec_per_job {
176
    my $self = shift;
177
178
179
    $self->{'_avg_run_msec_per_job'} = shift if(@_);
    $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
    return $self->{'_avg_run_msec_per_job'};
180
181
}

182
sub avg_output_msec_per_job {
183
    my $self = shift;
184
185
186
    $self->{'_avg_output_msec_per_job'} = shift if(@_);
    $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
    return $self->{'_avg_output_msec_per_job'};
187
188
}

189

190
## other storable attributes:
191

192
sub when_updated {                   # this method is called by the initial store() [at which point it returns undef]
193
    my $self = shift;
194
195
    $self->{'_when_updated'} = shift if(@_);
    return $self->{'_when_updated'};
196
}
197

198
sub seconds_since_when_updated {     # this method is mostly used to convert between server time and local time
199
    my( $self, $value ) = @_;
200
201
    $self->{'_when_updated'} = time() - $value if(defined($value));
    return defined($self->{'_when_updated'}) ? time() - $self->{'_when_updated'} : undef;
202
203
}

204
205
206
207
sub sync_lock {
    my $self = shift;
    $self->{'_sync_lock'} = shift if(@_);
    return $self->{'_sync_lock'};
208
209
}

210
211
212
213
214
215
216
217

# non-storable attributes and other helper-methods:


sub refresh {
    my $self = shift;

    return $self->adaptor && $self->adaptor->refresh($self);
218
219
}

220
221
sub update {
    my $self = shift;
222
223
224
225

    if($self->adaptor) {
        $self->adaptor->update($self);
    }
226
227
}

228

229
sub get_or_estimate_batch_size {
230
231
    my $self = shift;

232
233
234
235
236
237
238
239
240
241
242
243
244
    if( (my $batch_size = $self->batch_size())>0 ) {        # set to positive or not set (and auto-initialized within $self->batch_size)

        return $batch_size;
                                                        # otherwise it is a request for dynamic estimation:
    } elsif( my $avg_msec_per_job = $self->avg_msec_per_job() ) {           # further estimations from collected stats

        $avg_msec_per_job = 100 if($avg_msec_per_job<100);

        return POSIX::ceil( $self->min_batch_time() / $avg_msec_per_job );

    } else {        # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
        return -$batch_size || 1;
    }
245
}
246

247

248
sub inprogress_job_count {      # includes CLAIMED
249
250
251
252
253
254
255
    my $self = shift;
    return    $self->total_job_count
            - $self->semaphored_job_count
            - $self->ready_job_count
            - $self->done_job_count
            - $self->failed_job_count;
}
256

257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
my %meta_status_2_color = (
    'DONE'      => 'bright_cyan',
    'RUNNING'   => 'bright_yellow',
    'READY'     => 'bright_green',
    'BLOCKED'   => 'black on_white',
    'EMPTY'     => 'clear',
    'FAILED'    => 'red',
);

my %analysis_status_2_meta_status = (
    'LOADING'       => 'READY',
    'SYNCHING'      => 'READY',
    'ALL_CLAIMED'   => 'BLOCKED',
    'WORKING'       => 'RUNNING',
);

my %count_method_2_meta_status = (
    'semaphored_job_count'  => 'BLOCKED',
    'ready_job_count'       => 'READY',
    'inprogress_job_count'  => 'RUNNING',
    'done_job_count'        => 'DONE',
    'failed_job_count'      => 'FAILED',
);

sub _text_with_status_color {
    my $field_size = shift;
    my $color_enabled = shift;

    my $padding = ($field_size and length($_[0]) < $field_size) ? ' ' x ($field_size - length($_[0])) : '';
    return $padding . ($color_enabled ? color($meta_status_2_color{$_[1]}).$_[0].color('reset') : $_[0]);
}

289

290
291
sub job_count_breakout {
    my $self = shift;
292
293
    my $field_size = shift;
    my $color_enabled = shift;
294

295
    my $this_length = 0;
296
297
298
    my @count_list = ();
    my %count_hash = ();
    my $total_job_count = $self->total_job_count();
299
    foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
300
        if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
301
302
            $this_length += length("$count") + 1;
            push @count_list, _text_with_status_color(undef, $color_enabled, $count, $count_method_2_meta_status{$count_method}).substr($count_method,0,1);
303
304
        }
    }
305
    my $breakout_label = join('+', @count_list);
306
    $this_length += scalar(@count_list)-1 if @count_list;
307
    $breakout_label .= '='.$total_job_count if(scalar(@count_list)!=1); # only provide a total if multiple or no categories available
308
309
310
    $this_length += 1+length("$total_job_count") if(scalar(@count_list)!=1);

    $breakout_label = ' ' x ($field_size - $this_length) . $breakout_label if $field_size and $this_length<$field_size;
311

312
    return ($breakout_label, $total_job_count, \%count_hash);
313
314
}

315
316
317
318
319
320
321
322
323
324
325
326
327
328
sub friendly_avg_job_runtime {
    my $self = shift;

    my $avg = $self->avg_msec_per_job;
    my @units = ([24*3600*1000, 'day'], [3600*1000, 'hr'], [60*1000, 'min'], [1000, 'sec']);

    while (my $unit_description = shift @units) {
        my $x = $avg / $unit_description->[0];
        if ($x >= 1.) {
            return ($x, $unit_description->[1]);
        }
    }
    return ($avg, 'ms');
}
329

330
331
sub toString {
    my $self = shift @_;
332
    my $max_logic_name_length = shift || 40;
333

334
335
    my $can_do_colour                                   = (-t STDOUT ? 1 : 0);
    my ($breakout_label, $total_job_count, $count_hash) = $self->job_count_breakout(24, $can_do_colour);
336
    my $analysis                                        = $self->analysis;
337
    my ($avg_runtime, $avg_runtime_unit)                = $self->friendly_avg_job_runtime;
338

339
    my $output .= sprintf("%-${max_logic_name_length}s(%3d) %s, jobs( %s ), avg:%5.1f %-3s, workers(Running:%d, Reqired:%d) ",
340
        $analysis->logic_name,
341
        $self->analysis_id // 0,
342

343
        _text_with_status_color(11, $can_do_colour, $self->status, $analysis_status_2_meta_status{$self->status} || $self->status),
344

345
        $breakout_label,
346

347
        $avg_runtime, $avg_runtime_unit,
348
349
350

        $self->num_running_workers,
        $self->num_required_workers,
351
    );
352
353
    $output .=  '  h.cap:'    .( $self->hive_capacity // '-' )
               .'  a.cap:'    .( $analysis->analysis_capacity // '-')
354
               ."  (sync'd "  .($self->seconds_since_when_updated // 0)." sec ago)";
355
356

    return $output;
357
358
}

359
360
361
362

sub check_blocking_control_rules {
    my $self = shift;
  
363
    my $ctrl_rules = $self->analysis->control_rules_collection();
364

365
    my $all_conditions_satisfied = 1;
366

Leo Gordon's avatar
Leo Gordon committed
367
    if(scalar @$ctrl_rules) {    # there are blocking ctrl_rules to check
368
369
370
371
372

        foreach my $ctrl_rule (@$ctrl_rules) {
                #use this method because the condition_analysis objects can be
                #network distributed to a different database so use it's adaptor to get
                #the AnalysisStats object
373
374
375
376
            my $condition_analysis  = $ctrl_rule->condition_analysis;
            my $condition_stats     = $condition_analysis && $condition_analysis->stats;
            my $condition_status    = $condition_stats    && $condition_stats->status;
            my $condition_cbe       = $condition_analysis && $condition_analysis->can_be_empty;
Leo Gordon's avatar
Leo Gordon committed
377
            my $condition_tjc       = $condition_stats    && $condition_stats->total_job_count;
378

379
            my $this_condition_satisfied = ($condition_status eq 'DONE')
380
                        || ($condition_cbe && !$condition_tjc);             # probably safer than saying ($condition_status eq 'EMPTY') because of the sync order
Leo Gordon's avatar
Leo Gordon committed
381

382
383
            unless( $this_condition_satisfied ) {
                $all_conditions_satisfied = 0;
384
385
386
            }
        }

387
        if($all_conditions_satisfied) {
388
            if($self->status eq 'BLOCKED') {    # unblock, since all conditions are met
389
                $self->status('LOADING');       # anything that is not 'BLOCKED' will do, it will be redefined in the following subroutine
390
391
            }
        } else {    # (re)block
392
            $self->status('BLOCKED');
393
394
        }
    }
Leo Gordon's avatar
Leo Gordon committed
395

396
    return $all_conditions_satisfied;
397
398
399
}


400
401
402
403
sub determine_status {
    my $self = shift;

    if($self->status ne 'BLOCKED') {
404
        if( !$self->total_job_count ) {
405

406
            $self->status('EMPTY');
407

408
        } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) {   # all jobs of the analysis have been finished
409
            my $analysis = $self->analysis;
410
            my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
411
412
            if ($self->failed_job_count > $absolute_tolerance) {
                $self->status('FAILED');
413
414
415
416
                warn       "\n##################################################\n";
                warn sprintf("##   ERROR: %-35s ##\n", $analysis->logic_name." failed!");
                warn sprintf("##     %d jobs failed (tolerance: %d (%3d%%)) ##\n", $self->failed_job_count, $absolute_tolerance, $analysis->failed_job_tolerance);
                warn         "##################################################\n\n";
417
418
419
            } else {
                $self->status('DONE');
            }
420
421
422
423
424
        } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running

            $self->status('READY');

        } elsif( !$self->ready_job_count ) {                                # there are no claimable jobs, possibly because some are semaphored
425
426
427

            $self->status('ALL_CLAIMED');

428
        } elsif( $self->inprogress_job_count ) {
429
430
431
432
433
434
435

            $self->status('WORKING');
        }
    }
}


436
437
438
439
440
441
442
443
sub recalculate_from_job_counts {
    my ($self, $job_counts) = @_;

        # only update job_counts if given the hash:
    if($job_counts) {
        $self->semaphored_job_count( $job_counts->{'SEMAPHORED'} || 0 );
        $self->ready_job_count(      $job_counts->{'READY'} || 0 );
        $self->failed_job_count(     $job_counts->{'FAILED'} || 0 );
444
        $self->done_job_count(       ( $job_counts->{'DONE'} // 0 ) + ($job_counts->{'PASSED_ON'} // 0 ) ); # done here or potentially done elsewhere
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
        $self->total_job_count(      sum( values %$job_counts ) || 0 );
    }

        # compute the number of total required workers for this analysis (taking into account the jobs that are already running)
    my $analysis              = $self->analysis();
    my $scheduling_allowed    =  ( !defined( $self->hive_capacity ) or $self->hive_capacity )
                              && ( !defined( $analysis->analysis_capacity  ) or $analysis->analysis_capacity  );
    my $required_workers    = $scheduling_allowed
                            && POSIX::ceil( $self->ready_job_count() / $self->get_or_estimate_batch_size() );
    $self->num_required_workers( $required_workers );

    $self->check_blocking_control_rules();

    $self->determine_status();
}


462
1;