AnalysisStats.pm 14.8 KB
Newer Older
1
2
3
=pod 

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::AnalysisStats
6
7
8

=head1 DESCRIPTION

9
10
11
12
    An object that maintains counters for jobs in different states. This data is used by the Scheduler.

=head1 LICENSE

13
    Copyright [1999-2016] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
14
15
16
17
18
19
20
21
22
23

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

24
=head1 CONTACT
25

26
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
27
28

=head1 APPENDIX
29

30
31
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
32

33
34
=cut

35

36
37
38
package Bio::EnsEMBL::Hive::AnalysisStats;

use strict;
39
use Scalar::Util ('weaken');
40

41
use Bio::EnsEMBL::Utils::Argument ('rearrange');
42
use Bio::EnsEMBL::Utils::Exception ('throw');
43
use Bio::EnsEMBL::Hive::Analysis;
44

45
46
use base ( 'Bio::EnsEMBL::Storable' );  # inherit dbID(), adaptor() and new() methods

47

48
49
50
51
52
53
54
    ## Minimum amount of time in msec that a worker should run before reporting
    ## back to the hive. This is used when setting the batch_size automatically.
sub min_batch_time {
    return 2*60*1000;
}


55
sub new {
56
57
    my $class = shift;

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
    my $self = $class->SUPER::new( @_ );    # deal with Storable stuff

    my ( $analysis_id, $batch_size, $hive_capacity, $status,
        $total_job_count, $semaphored_job_count, $ready_job_count, $done_job_count, $failed_job_count, $num_running_workers, $num_required_workers,
        $behaviour, $input_capacity, $output_capacity, $avg_msec_per_job, $avg_input_msec_per_job, $avg_run_msec_per_job, $avg_output_msec_per_job,
        $seconds_since_last_update, $sync_lock) =
      rearrange([qw(analysis_id batch_size hive_capacity status
                total_job_count semaphored_job_count ready_job_count done_job_count failed_job_count num_running_workers num_required_workers
                behaviour input_capacity output_capacity avg_msec_per_job avg_input_msec_per_job avg_run_msec_per_job avg_output_msec_per_job
                seconds_since_last_update sync_lock ) ], @_);
    $self->analysis_id($analysis_id)                            if(defined($analysis_id));
    $self->batch_size($batch_size)                              if(defined($batch_size));
    $self->hive_capacity($hive_capacity)                        if(defined($hive_capacity));
    $self->status($status)                                      if(defined($status));
    $self->total_job_count($total_job_count)                    if(defined($total_job_count));
    $self->semaphored_job_count($semaphored_job_count)          if(defined($semaphored_job_count));
    $self->ready_job_count($ready_job_count)                    if(defined($ready_job_count));
    $self->done_job_count($done_job_count)                      if(defined($done_job_count));
    $self->failed_job_count($failed_job_count)                  if(defined($failed_job_count));
    $self->num_running_workers($num_running_workers)            if(defined($num_running_workers));
    $self->num_required_workers($num_required_workers)          if(defined($num_required_workers));
    $self->behaviour($behaviour)                                if(defined($behaviour));
    $self->input_capacity($input_capacity)                      if(defined($input_capacity));
    $self->output_capacity($output_capacity)                    if(defined($output_capacity));
    $self->avg_msec_per_job($avg_msec_per_job)                  if(defined($avg_msec_per_job));
    $self->avg_input_msec_per_job($avg_input_msec_per_job)      if(defined($avg_input_msec_per_job));
    $self->avg_run_msec_per_job($avg_run_msec_per_job)          if(defined($avg_run_msec_per_job));
    $self->avg_output_msec_per_job($avg_output_msec_per_job)    if(defined($avg_output_msec_per_job));
    $self->seconds_since_last_update($seconds_since_last_update)if(defined($seconds_since_last_update));    # NB: this is an input_column_mapped field
    $self->sync_lock($sync_lock)                                if(defined($sync_lock));
88
89

    return $self;
90
}
91

92

93
94
## pre-settable storable object's getters/setters:

95
sub analysis_id {   # an alias
96
    my $self = shift;
97
    return $self->dbID(@_);
98
99
}

100
101
102
103
104
sub batch_size {
    my $self = shift;
    $self->{'_batch_size'} = shift if(@_);
    $self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
    return $self->{'_batch_size'};
105
106
}

107
108
109
110
sub hive_capacity {
    my $self = shift;
    $self->{'_hive_capacity'} = shift if(@_);
    return $self->{'_hive_capacity'};
111
112
}

113
sub status {
114
    my $self = shift;
115
116
117
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
}
118
119


120
## counters of jobs in different states:
121

122

123
sub total_job_count {
124
    my $self = shift;
125
126
    $self->{'_total_job_count'} = shift if(@_);
    return $self->{'_total_job_count'};
127
128
}

129
sub semaphored_job_count {
130
    my $self = shift;
131
132
133
134
135
136
137
138
    $self->{'_semaphored_job_count'} = shift if(@_);
    return $self->{'_semaphored_job_count'};
}

sub ready_job_count {
    my $self = shift;
    $self->{'_ready_job_count'} = shift if(@_);
    return $self->{'_ready_job_count'};
139
140
}

141
sub done_job_count {
142
    my $self = shift;
143
144
    $self->{'_done_job_count'} = shift if(@_);
    return $self->{'_done_job_count'};
145
146
}

147
sub failed_job_count {
148
    my $self = shift;
149
150
151
152
153
154
155
156
157
    $self->{'_failed_job_count'} = shift if(@_);
    $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
    return $self->{'_failed_job_count'};
}

sub num_running_workers {
    my $self = shift;
    $self->{'_num_running_workers'} = shift if(@_);
    return $self->{'_num_running_workers'};
158
159
}

160
sub num_required_workers {      # NB: the meaning of this field is, again, "how many extra workers we need to add"
161
162
163
164
165
166
167
168
169
    my $self = shift;
    $self->{'_num_required_workers'} = shift if(@_);
    return $self->{'_num_required_workers'};
}


## dynamic hive_capacity mode attributes:


170
sub behaviour {
171
172
173
    my $self = shift;
    $self->{'_behaviour'} = shift if(@_);
    return $self->{'_behaviour'};
174
175
176
}

sub input_capacity {
177
178
179
    my $self = shift;
    $self->{'_input_capacity'} = shift if(@_);
    return $self->{'_input_capacity'};
180
181
182
}

sub output_capacity {
183
184
185
    my $self = shift;
    $self->{'_output_capacity'} = shift if(@_);
    return $self->{'_output_capacity'};
186
187
}

188

189
## dynamic hive_capacity mode counters:
190

191
192

sub avg_msec_per_job {
193
    my $self = shift;
194
195
196
    $self->{'_avg_msec_per_job'} = shift if(@_);
    $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
    return $self->{'_avg_msec_per_job'};
197
198
}

199
sub avg_input_msec_per_job {
200
    my $self = shift;
201
202
203
    $self->{'_avg_input_msec_per_job'} = shift if(@_);
    $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
    return $self->{'_avg_input_msec_per_job'};
204
205
}

206
sub avg_run_msec_per_job {
207
    my $self = shift;
208
209
210
    $self->{'_avg_run_msec_per_job'} = shift if(@_);
    $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
    return $self->{'_avg_run_msec_per_job'};
211
212
}

213
sub avg_output_msec_per_job {
214
    my $self = shift;
215
216
217
    $self->{'_avg_output_msec_per_job'} = shift if(@_);
    $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
    return $self->{'_avg_output_msec_per_job'};
218
219
}

220

221
## other storable attributes:
222

223
224
225
226
227
sub last_update {                   # this method is called by the initial store() [at which point it returns undef]
    my $self = shift;
    $self->{'_last_update'} = shift if(@_);
    return $self->{'_last_update'};
}
228

229
sub seconds_since_last_update {     # this method is mostly used to convert between server time and local time
230
231
232
    my( $self, $value ) = @_;
    $self->{'_last_update'} = time() - $value if(defined($value));
    return time() - $self->{'_last_update'};
233
234
}

235
236
237
238
sub sync_lock {
    my $self = shift;
    $self->{'_sync_lock'} = shift if(@_);
    return $self->{'_sync_lock'};
239
240
}

241
242
243
244
245
246
247
248

# non-storable attributes and other helper-methods:


sub refresh {
    my $self = shift;

    return $self->adaptor && $self->adaptor->refresh($self);
249
250
}

251
252
253
sub update {
    my $self = shift;
    return unless($self->adaptor);
254
    $self->adaptor->update_stats_and_monitor($self);
255
256
}

257
258
259
260
261
sub update_status {
    my ($self, $status ) = @_;
    return unless($self->adaptor);
    $self->adaptor->update_status($self->analysis_id, $status);
    $self->status($status);
262
263
}

264
265
266
sub get_analysis {
    my $self = shift;
    unless($self->{'_analysis'}) {
267
268
269
        unless($self->analysis_id) {
            throw("self->analysis_id undefined, please investigate");
        }
270
271
272
        $self->{'_analysis'} = $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id);
    }
    return $self->{'_analysis'};
273
274
}

275
sub get_or_estimate_batch_size {
276
277
    my $self = shift;

278
279
280
281
282
283
284
285
286
287
288
289
290
    if( (my $batch_size = $self->batch_size())>0 ) {        # set to positive or not set (and auto-initialized within $self->batch_size)

        return $batch_size;
                                                        # otherwise it is a request for dynamic estimation:
    } elsif( my $avg_msec_per_job = $self->avg_msec_per_job() ) {           # further estimations from collected stats

        $avg_msec_per_job = 100 if($avg_msec_per_job<100);

        return POSIX::ceil( $self->min_batch_time() / $avg_msec_per_job );

    } else {        # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
        return -$batch_size || 1;
    }
291
}
292

293

294
295
296
297
298
299
300
301
sub inprogress_job_count {
    my $self = shift;
    return    $self->total_job_count
            - $self->semaphored_job_count
            - $self->ready_job_count
            - $self->done_job_count
            - $self->failed_job_count;
}
302

303

304
305
306
sub job_count_breakout {
    my $self = shift;

307
308
309
    my @count_list = ();
    my %count_hash = ();
    my $total_job_count = $self->total_job_count();
310
    foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
311
312
        if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
            push @count_list, $count.substr($count_method,0,1);
313
314
        }
    }
315
316
    my $breakout_label = join('+', @count_list);
    $breakout_label .= '='.$total_job_count if(scalar(@count_list)!=1); # only provide a total if multiple or no categories available
317

318
    return ($breakout_label, $total_job_count, \%count_hash);
319
320
321
}


322
323
sub toString {
    my $self = shift @_;
324

325
326
    my $analysis = $self->get_analysis;

327
328
329
330
    my $output .= sprintf("%-27s(%2d) %11s jobs(Sem:%d, Rdy:%d, InProg:%d, Done+Pass:%d, Fail:%d)=%d Ave_msec:%d, workers(Running:%d, Reqired:%d) ",
        $analysis->logic_name,
        $self->analysis_id,

331
        $self->status,
332
333
334
335
336

        $self->semaphored_job_count,
        $self->ready_job_count,
        $self->inprogress_job_count,
        $self->done_job_count,
337
        $self->failed_job_count,
338
339
        $self->total_job_count,

340
        $self->avg_msec_per_job,
341
342
343

        $self->num_running_workers,
        $self->num_required_workers,
344
    );
345
346
    $output .=  '  h.cap:'    .( defined($self->hive_capacity) ? $self->hive_capacity : '-' )
               .'  a.cap:'    .( defined($analysis->analysis_capacity) ? $analysis->analysis_capacity : '-')
347
               ."  (sync'd "  .$self->seconds_since_last_update." sec ago)";
348
349

    return $output;
350
351
}

352
353
354
355

sub check_blocking_control_rules {
    my $self = shift;
  
356
    my $ctrl_rules = $self->adaptor->db->get_AnalysisCtrlRuleAdaptor->fetch_all_by_ctrled_analysis_id($self->analysis_id);
357

358
    my $all_conditions_satisfied = 1;
359

Leo Gordon's avatar
Leo Gordon committed
360
    if(scalar @$ctrl_rules) {    # there are blocking ctrl_rules to check
361
362
363
364
365

        foreach my $ctrl_rule (@$ctrl_rules) {
                #use this method because the condition_analysis objects can be
                #network distributed to a different database so use it's adaptor to get
                #the AnalysisStats object
366
367
368
369
            my $condition_analysis  = $ctrl_rule->condition_analysis;
            my $condition_stats     = $condition_analysis && $condition_analysis->stats;
            my $condition_status    = $condition_stats    && $condition_stats->status;
            my $condition_cbe       = $condition_analysis && $condition_analysis->can_be_empty;
Leo Gordon's avatar
Leo Gordon committed
370
            my $condition_tjc       = $condition_stats    && $condition_stats->total_job_count;
371

372
            my $this_condition_satisfied = ($condition_status eq 'DONE')
373
                        || ($condition_cbe && !$condition_tjc);             # probably safer than saying ($condition_status eq 'EMPTY') because of the sync order
Leo Gordon's avatar
Leo Gordon committed
374

375
376
            unless( $this_condition_satisfied ) {
                $all_conditions_satisfied = 0;
377
378
379
            }
        }

380
        if($all_conditions_satisfied) {
381
382
383
384
385
386
387
            if($self->status eq 'BLOCKED') {    # unblock, since all conditions are met
                $self->update_status('LOADING'); # trigger sync
            }
        } else {    # (re)block
            $self->update_status('BLOCKED');
        }
    }
Leo Gordon's avatar
Leo Gordon committed
388

389
    return $all_conditions_satisfied;
390
391
392
}


393
394
395
396
sub determine_status {
    my $self = shift;

    if($self->status ne 'BLOCKED') {
397
        if( !$self->total_job_count ) {
398

399
            $self->status('EMPTY');
400

401
        } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) {   # all jobs of the analysis have been finished
402
403
            my $analysis = $self->get_analysis;
            my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
404
405
            if ($self->failed_job_count > $absolute_tolerance) {
                $self->status('FAILED');
406
407
408
409
                warn       "\n##################################################\n";
                warn sprintf("##   ERROR: %-35s ##\n", $analysis->logic_name." failed!");
                warn sprintf("##     %d jobs failed (tolerance: %d (%3d%%)) ##\n", $self->failed_job_count, $absolute_tolerance, $analysis->failed_job_tolerance);
                warn         "##################################################\n\n";
410
411
412
            } else {
                $self->status('DONE');
            }
413
414
415
416
417
        } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running

            $self->status('READY');

        } elsif( !$self->ready_job_count ) {                                # there are no claimable jobs, possibly because some are semaphored
418
419
420

            $self->status('ALL_CLAIMED');

421
        } elsif( $self->inprogress_job_count ) {
422
423
424
425
426
427
428

            $self->status('WORKING');
        }
    }
}


429
1;