AnalysisStats.pm 14.8 KB
Newer Older
1
2
3
=pod 

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::AnalysisStats
6
7
8

=head1 DESCRIPTION

9
10
11
12
    An object that maintains counters for jobs in different states. This data is used by the Scheduler.

=head1 LICENSE

13
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Matthieu Muffato's avatar
Matthieu Muffato committed
14
    Copyright [2016-2018] EMBL-European Bioinformatics Institute
15
16
17
18
19
20
21
22
23
24

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

25
=head1 CONTACT
26

27
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
28
29

=head1 APPENDIX
30

31
32
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
33

34
35
=cut

36

37
38
39
package Bio::EnsEMBL::Hive::AnalysisStats;

use strict;
40
use Scalar::Util ('weaken');
41

42
use Bio::EnsEMBL::Utils::Argument ('rearrange');
43
use Bio::EnsEMBL::Utils::Exception ('throw');
44
use Bio::EnsEMBL::Hive::Analysis;
45

46
47
use base ( 'Bio::EnsEMBL::Storable' );  # inherit dbID(), adaptor() and new() methods

48

49
50
51
52
53
54
55
    ## Minimum amount of time in msec that a worker should run before reporting
    ## back to the hive. This is used when setting the batch_size automatically.
sub min_batch_time {
    return 2*60*1000;
}


56
sub new {
57
58
    my $class = shift;

59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
    my $self = $class->SUPER::new( @_ );    # deal with Storable stuff

    my ( $analysis_id, $batch_size, $hive_capacity, $status,
        $total_job_count, $semaphored_job_count, $ready_job_count, $done_job_count, $failed_job_count, $num_running_workers, $num_required_workers,
        $behaviour, $input_capacity, $output_capacity, $avg_msec_per_job, $avg_input_msec_per_job, $avg_run_msec_per_job, $avg_output_msec_per_job,
        $seconds_since_last_update, $sync_lock) =
      rearrange([qw(analysis_id batch_size hive_capacity status
                total_job_count semaphored_job_count ready_job_count done_job_count failed_job_count num_running_workers num_required_workers
                behaviour input_capacity output_capacity avg_msec_per_job avg_input_msec_per_job avg_run_msec_per_job avg_output_msec_per_job
                seconds_since_last_update sync_lock ) ], @_);
    $self->analysis_id($analysis_id)                            if(defined($analysis_id));
    $self->batch_size($batch_size)                              if(defined($batch_size));
    $self->hive_capacity($hive_capacity)                        if(defined($hive_capacity));
    $self->status($status)                                      if(defined($status));
    $self->total_job_count($total_job_count)                    if(defined($total_job_count));
    $self->semaphored_job_count($semaphored_job_count)          if(defined($semaphored_job_count));
    $self->ready_job_count($ready_job_count)                    if(defined($ready_job_count));
    $self->done_job_count($done_job_count)                      if(defined($done_job_count));
    $self->failed_job_count($failed_job_count)                  if(defined($failed_job_count));
    $self->num_running_workers($num_running_workers)            if(defined($num_running_workers));
    $self->num_required_workers($num_required_workers)          if(defined($num_required_workers));
    $self->behaviour($behaviour)                                if(defined($behaviour));
    $self->input_capacity($input_capacity)                      if(defined($input_capacity));
    $self->output_capacity($output_capacity)                    if(defined($output_capacity));
    $self->avg_msec_per_job($avg_msec_per_job)                  if(defined($avg_msec_per_job));
    $self->avg_input_msec_per_job($avg_input_msec_per_job)      if(defined($avg_input_msec_per_job));
    $self->avg_run_msec_per_job($avg_run_msec_per_job)          if(defined($avg_run_msec_per_job));
    $self->avg_output_msec_per_job($avg_output_msec_per_job)    if(defined($avg_output_msec_per_job));
    $self->seconds_since_last_update($seconds_since_last_update)if(defined($seconds_since_last_update));    # NB: this is an input_column_mapped field
    $self->sync_lock($sync_lock)                                if(defined($sync_lock));
89
90

    return $self;
91
}
92

93

94
95
## pre-settable storable object's getters/setters:

96
sub analysis_id {   # an alias
97
    my $self = shift;
98
    return $self->dbID(@_);
99
100
}

101
102
103
104
105
sub batch_size {
    my $self = shift;
    $self->{'_batch_size'} = shift if(@_);
    $self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
    return $self->{'_batch_size'};
106
107
}

108
109
110
111
sub hive_capacity {
    my $self = shift;
    $self->{'_hive_capacity'} = shift if(@_);
    return $self->{'_hive_capacity'};
112
113
}

114
sub status {
115
    my $self = shift;
116
117
118
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
}
119
120


121
## counters of jobs in different states:
122

123

124
sub total_job_count {
125
    my $self = shift;
126
127
    $self->{'_total_job_count'} = shift if(@_);
    return $self->{'_total_job_count'};
128
129
}

130
sub semaphored_job_count {
131
    my $self = shift;
132
133
134
135
136
137
138
139
    $self->{'_semaphored_job_count'} = shift if(@_);
    return $self->{'_semaphored_job_count'};
}

sub ready_job_count {
    my $self = shift;
    $self->{'_ready_job_count'} = shift if(@_);
    return $self->{'_ready_job_count'};
140
141
}

142
sub done_job_count {
143
    my $self = shift;
144
145
    $self->{'_done_job_count'} = shift if(@_);
    return $self->{'_done_job_count'};
146
147
}

148
sub failed_job_count {
149
    my $self = shift;
150
151
152
153
154
155
156
157
158
    $self->{'_failed_job_count'} = shift if(@_);
    $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
    return $self->{'_failed_job_count'};
}

sub num_running_workers {
    my $self = shift;
    $self->{'_num_running_workers'} = shift if(@_);
    return $self->{'_num_running_workers'};
159
160
}

161
sub num_required_workers {      # NB: the meaning of this field is, again, "how many extra workers we need to add"
162
163
164
165
166
167
168
169
170
    my $self = shift;
    $self->{'_num_required_workers'} = shift if(@_);
    return $self->{'_num_required_workers'};
}


## dynamic hive_capacity mode attributes:


171
sub behaviour {
172
173
174
    my $self = shift;
    $self->{'_behaviour'} = shift if(@_);
    return $self->{'_behaviour'};
175
176
177
}

sub input_capacity {
178
179
180
    my $self = shift;
    $self->{'_input_capacity'} = shift if(@_);
    return $self->{'_input_capacity'};
181
182
183
}

sub output_capacity {
184
185
186
    my $self = shift;
    $self->{'_output_capacity'} = shift if(@_);
    return $self->{'_output_capacity'};
187
188
}

189

190
## dynamic hive_capacity mode counters:
191

192
193

sub avg_msec_per_job {
194
    my $self = shift;
195
196
197
    $self->{'_avg_msec_per_job'} = shift if(@_);
    $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
    return $self->{'_avg_msec_per_job'};
198
199
}

200
sub avg_input_msec_per_job {
201
    my $self = shift;
202
203
204
    $self->{'_avg_input_msec_per_job'} = shift if(@_);
    $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
    return $self->{'_avg_input_msec_per_job'};
205
206
}

207
sub avg_run_msec_per_job {
208
    my $self = shift;
209
210
211
    $self->{'_avg_run_msec_per_job'} = shift if(@_);
    $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
    return $self->{'_avg_run_msec_per_job'};
212
213
}

214
sub avg_output_msec_per_job {
215
    my $self = shift;
216
217
218
    $self->{'_avg_output_msec_per_job'} = shift if(@_);
    $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
    return $self->{'_avg_output_msec_per_job'};
219
220
}

221

222
## other storable attributes:
223

224
225
226
227
228
sub last_update {                   # this method is called by the initial store() [at which point it returns undef]
    my $self = shift;
    $self->{'_last_update'} = shift if(@_);
    return $self->{'_last_update'};
}
229

230
sub seconds_since_last_update {     # this method is mostly used to convert between server time and local time
231
232
233
    my( $self, $value ) = @_;
    $self->{'_last_update'} = time() - $value if(defined($value));
    return time() - $self->{'_last_update'};
234
235
}

236
237
238
239
sub sync_lock {
    my $self = shift;
    $self->{'_sync_lock'} = shift if(@_);
    return $self->{'_sync_lock'};
240
241
}

242
243
244
245
246
247
248
249

# non-storable attributes and other helper-methods:


sub refresh {
    my $self = shift;

    return $self->adaptor && $self->adaptor->refresh($self);
250
251
}

252
253
254
sub update {
    my $self = shift;
    return unless($self->adaptor);
255
    $self->adaptor->update_stats_and_monitor($self);
256
257
}

258
259
260
261
262
sub update_status {
    my ($self, $status ) = @_;
    return unless($self->adaptor);
    $self->adaptor->update_status($self->analysis_id, $status);
    $self->status($status);
263
264
}

265
266
267
sub get_analysis {
    my $self = shift;
    unless($self->{'_analysis'}) {
268
269
270
        unless($self->analysis_id) {
            throw("self->analysis_id undefined, please investigate");
        }
271
272
273
        $self->{'_analysis'} = $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id);
    }
    return $self->{'_analysis'};
274
275
}

276
sub get_or_estimate_batch_size {
277
278
    my $self = shift;

279
280
281
282
283
284
285
286
287
288
289
290
291
    if( (my $batch_size = $self->batch_size())>0 ) {        # set to positive or not set (and auto-initialized within $self->batch_size)

        return $batch_size;
                                                        # otherwise it is a request for dynamic estimation:
    } elsif( my $avg_msec_per_job = $self->avg_msec_per_job() ) {           # further estimations from collected stats

        $avg_msec_per_job = 100 if($avg_msec_per_job<100);

        return POSIX::ceil( $self->min_batch_time() / $avg_msec_per_job );

    } else {        # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
        return -$batch_size || 1;
    }
292
}
293

294

295
296
297
298
299
300
301
302
sub inprogress_job_count {
    my $self = shift;
    return    $self->total_job_count
            - $self->semaphored_job_count
            - $self->ready_job_count
            - $self->done_job_count
            - $self->failed_job_count;
}
303

304

305
306
307
sub job_count_breakout {
    my $self = shift;

308
309
310
    my @count_list = ();
    my %count_hash = ();
    my $total_job_count = $self->total_job_count();
311
    foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
312
313
        if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
            push @count_list, $count.substr($count_method,0,1);
314
315
        }
    }
316
317
    my $breakout_label = join('+', @count_list);
    $breakout_label .= '='.$total_job_count if(scalar(@count_list)!=1); # only provide a total if multiple or no categories available
318

319
    return ($breakout_label, $total_job_count, \%count_hash);
320
321
322
}


323
324
sub toString {
    my $self = shift @_;
325

326
327
    my $analysis = $self->get_analysis;

328
329
330
331
    my $output .= sprintf("%-27s(%2d) %11s jobs(Sem:%d, Rdy:%d, InProg:%d, Done+Pass:%d, Fail:%d)=%d Ave_msec:%d, workers(Running:%d, Reqired:%d) ",
        $analysis->logic_name,
        $self->analysis_id,

332
        $self->status,
333
334
335
336
337

        $self->semaphored_job_count,
        $self->ready_job_count,
        $self->inprogress_job_count,
        $self->done_job_count,
338
        $self->failed_job_count,
339
340
        $self->total_job_count,

341
        $self->avg_msec_per_job,
342
343
344

        $self->num_running_workers,
        $self->num_required_workers,
345
    );
346
347
    $output .=  '  h.cap:'    .( defined($self->hive_capacity) ? $self->hive_capacity : '-' )
               .'  a.cap:'    .( defined($analysis->analysis_capacity) ? $analysis->analysis_capacity : '-')
348
               ."  (sync'd "  .$self->seconds_since_last_update." sec ago)";
349
350

    return $output;
351
352
}

353
354
355
356

sub check_blocking_control_rules {
    my $self = shift;
  
357
    my $ctrl_rules = $self->adaptor->db->get_AnalysisCtrlRuleAdaptor->fetch_all_by_ctrled_analysis_id($self->analysis_id);
358

359
    my $all_conditions_satisfied = 1;
360

Leo Gordon's avatar
Leo Gordon committed
361
    if(scalar @$ctrl_rules) {    # there are blocking ctrl_rules to check
362
363
364
365
366

        foreach my $ctrl_rule (@$ctrl_rules) {
                #use this method because the condition_analysis objects can be
                #network distributed to a different database so use it's adaptor to get
                #the AnalysisStats object
367
368
369
370
            my $condition_analysis  = $ctrl_rule->condition_analysis;
            my $condition_stats     = $condition_analysis && $condition_analysis->stats;
            my $condition_status    = $condition_stats    && $condition_stats->status;
            my $condition_cbe       = $condition_analysis && $condition_analysis->can_be_empty;
Leo Gordon's avatar
Leo Gordon committed
371
            my $condition_tjc       = $condition_stats    && $condition_stats->total_job_count;
372

373
            my $this_condition_satisfied = ($condition_status eq 'DONE')
374
                        || ($condition_cbe && !$condition_tjc);             # probably safer than saying ($condition_status eq 'EMPTY') because of the sync order
Leo Gordon's avatar
Leo Gordon committed
375

376
377
            unless( $this_condition_satisfied ) {
                $all_conditions_satisfied = 0;
378
379
380
            }
        }

381
        if($all_conditions_satisfied) {
382
383
384
385
386
387
388
            if($self->status eq 'BLOCKED') {    # unblock, since all conditions are met
                $self->update_status('LOADING'); # trigger sync
            }
        } else {    # (re)block
            $self->update_status('BLOCKED');
        }
    }
Leo Gordon's avatar
Leo Gordon committed
389

390
    return $all_conditions_satisfied;
391
392
393
}


394
395
396
397
sub determine_status {
    my $self = shift;

    if($self->status ne 'BLOCKED') {
398
        if( !$self->total_job_count ) {
399

400
            $self->status('EMPTY');
401

402
        } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) {   # all jobs of the analysis have been finished
403
404
            my $analysis = $self->get_analysis;
            my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
405
406
            if ($self->failed_job_count > $absolute_tolerance) {
                $self->status('FAILED');
407
408
409
410
                warn       "\n##################################################\n";
                warn sprintf("##   ERROR: %-35s ##\n", $analysis->logic_name." failed!");
                warn sprintf("##     %d jobs failed (tolerance: %d (%3d%%)) ##\n", $self->failed_job_count, $absolute_tolerance, $analysis->failed_job_tolerance);
                warn         "##################################################\n\n";
411
412
413
            } else {
                $self->status('DONE');
            }
414
415
416
417
418
        } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running

            $self->status('READY');

        } elsif( !$self->ready_job_count ) {                                # there are no claimable jobs, possibly because some are semaphored
419
420
421

            $self->status('ALL_CLAIMED');

422
        } elsif( $self->inprogress_job_count ) {
423
424
425
426
427
428
429

            $self->status('WORKING');
        }
    }
}


430
1;