AnalysisStats.pm 14.6 KB
Newer Older
1 2 3
=pod 

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::AnalysisStats
6 7 8

=head1 DESCRIPTION

9 10 11 12
    An object that maintains counters for jobs in different states. This data is used by the Scheduler.

=head1 LICENSE

13
    Copyright [1999-2014] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
14 15 16 17 18 19 20 21 22 23

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

24
=head1 CONTACT
25

26
    Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
27 28

=head1 APPENDIX
29

30 31
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
32

33 34
=cut

35

36 37 38
package Bio::EnsEMBL::Hive::AnalysisStats;

use strict;
39
use Scalar::Util ('weaken');
40

41
use Bio::EnsEMBL::Utils::Argument ('rearrange');
42
use Bio::EnsEMBL::Utils::Exception ('throw');
43
use Bio::EnsEMBL::Hive::Analysis;
44

45 46
use base ( 'Bio::EnsEMBL::Storable' );  # inherit dbID(), adaptor() and new() methods

47

48 49 50 51 52 53 54
    ## Minimum amount of time in msec that a worker should run before reporting
    ## back to the hive. This is used when setting the batch_size automatically.
sub min_batch_time {
    return 2*60*1000;
}


55
sub new {
56 57
    my $class = shift;

58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
    my $self = $class->SUPER::new( @_ );    # deal with Storable stuff

    my ( $analysis_id, $batch_size, $hive_capacity, $status,
        $total_job_count, $semaphored_job_count, $ready_job_count, $done_job_count, $failed_job_count, $num_running_workers, $num_required_workers,
        $behaviour, $input_capacity, $output_capacity, $avg_msec_per_job, $avg_input_msec_per_job, $avg_run_msec_per_job, $avg_output_msec_per_job,
        $seconds_since_last_update, $sync_lock) =
      rearrange([qw(analysis_id batch_size hive_capacity status
                total_job_count semaphored_job_count ready_job_count done_job_count failed_job_count num_running_workers num_required_workers
                behaviour input_capacity output_capacity avg_msec_per_job avg_input_msec_per_job avg_run_msec_per_job avg_output_msec_per_job
                seconds_since_last_update sync_lock ) ], @_);
    $self->analysis_id($analysis_id)                            if(defined($analysis_id));
    $self->batch_size($batch_size)                              if(defined($batch_size));
    $self->hive_capacity($hive_capacity)                        if(defined($hive_capacity));
    $self->status($status)                                      if(defined($status));
    $self->total_job_count($total_job_count)                    if(defined($total_job_count));
    $self->semaphored_job_count($semaphored_job_count)          if(defined($semaphored_job_count));
    $self->ready_job_count($ready_job_count)                    if(defined($ready_job_count));
    $self->done_job_count($done_job_count)                      if(defined($done_job_count));
    $self->failed_job_count($failed_job_count)                  if(defined($failed_job_count));
    $self->num_running_workers($num_running_workers)            if(defined($num_running_workers));
    $self->num_required_workers($num_required_workers)          if(defined($num_required_workers));
    $self->behaviour($behaviour)                                if(defined($behaviour));
    $self->input_capacity($input_capacity)                      if(defined($input_capacity));
    $self->output_capacity($output_capacity)                    if(defined($output_capacity));
    $self->avg_msec_per_job($avg_msec_per_job)                  if(defined($avg_msec_per_job));
    $self->avg_input_msec_per_job($avg_input_msec_per_job)      if(defined($avg_input_msec_per_job));
    $self->avg_run_msec_per_job($avg_run_msec_per_job)          if(defined($avg_run_msec_per_job));
    $self->avg_output_msec_per_job($avg_output_msec_per_job)    if(defined($avg_output_msec_per_job));
    $self->seconds_since_last_update($seconds_since_last_update)if(defined($seconds_since_last_update));    # NB: this is an input_column_mapped field
    $self->sync_lock($sync_lock)                                if(defined($sync_lock));
88 89

    return $self;
90
}
91

92

93 94
## pre-settable storable object's getters/setters:

95
sub analysis_id {   # an alias
96
    my $self = shift;
97
    return $self->dbID(@_);
98 99
}

100 101 102 103 104
sub batch_size {
    my $self = shift;
    $self->{'_batch_size'} = shift if(@_);
    $self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
    return $self->{'_batch_size'};
105 106
}

107 108 109 110
sub hive_capacity {
    my $self = shift;
    $self->{'_hive_capacity'} = shift if(@_);
    return $self->{'_hive_capacity'};
111 112
}

113
sub status {
114
    my $self = shift;
115 116 117
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
}
118 119


120
## counters of jobs in different states:
121

122

123
sub total_job_count {
124
    my $self = shift;
125 126
    $self->{'_total_job_count'} = shift if(@_);
    return $self->{'_total_job_count'};
127 128
}

129
sub semaphored_job_count {
130
    my $self = shift;
131 132 133 134 135 136 137 138
    $self->{'_semaphored_job_count'} = shift if(@_);
    return $self->{'_semaphored_job_count'};
}

sub ready_job_count {
    my $self = shift;
    $self->{'_ready_job_count'} = shift if(@_);
    return $self->{'_ready_job_count'};
139 140
}

141
sub done_job_count {
142
    my $self = shift;
143 144
    $self->{'_done_job_count'} = shift if(@_);
    return $self->{'_done_job_count'};
145 146
}

147
sub failed_job_count {
148
    my $self = shift;
149 150 151 152 153 154 155 156 157
    $self->{'_failed_job_count'} = shift if(@_);
    $self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
    return $self->{'_failed_job_count'};
}

sub num_running_workers {
    my $self = shift;
    $self->{'_num_running_workers'} = shift if(@_);
    return $self->{'_num_running_workers'};
158 159
}

160
sub num_required_workers {      # NB: the meaning of this field is, again, "how many extra workers we need to add"
161 162 163 164 165 166 167 168 169
    my $self = shift;
    $self->{'_num_required_workers'} = shift if(@_);
    return $self->{'_num_required_workers'};
}


## dynamic hive_capacity mode attributes:


170
sub behaviour {
171 172 173
    my $self = shift;
    $self->{'_behaviour'} = shift if(@_);
    return $self->{'_behaviour'};
174 175 176
}

sub input_capacity {
177 178 179
    my $self = shift;
    $self->{'_input_capacity'} = shift if(@_);
    return $self->{'_input_capacity'};
180 181 182
}

sub output_capacity {
183 184 185
    my $self = shift;
    $self->{'_output_capacity'} = shift if(@_);
    return $self->{'_output_capacity'};
186 187
}

188

189
## dynamic hive_capacity mode counters:
190

191 192

sub avg_msec_per_job {
193
    my $self = shift;
194 195 196
    $self->{'_avg_msec_per_job'} = shift if(@_);
    $self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
    return $self->{'_avg_msec_per_job'};
197 198
}

199
sub avg_input_msec_per_job {
200
    my $self = shift;
201 202 203
    $self->{'_avg_input_msec_per_job'} = shift if(@_);
    $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
    return $self->{'_avg_input_msec_per_job'};
204 205
}

206
sub avg_run_msec_per_job {
207
    my $self = shift;
208 209 210
    $self->{'_avg_run_msec_per_job'} = shift if(@_);
    $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
    return $self->{'_avg_run_msec_per_job'};
211 212
}

213
sub avg_output_msec_per_job {
214
    my $self = shift;
215 216 217
    $self->{'_avg_output_msec_per_job'} = shift if(@_);
    $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
    return $self->{'_avg_output_msec_per_job'};
218 219
}

220

221
## other storable attributes:
222

223 224 225 226 227
sub last_update {                   # this method is called by the initial store() [at which point it returns undef]
    my $self = shift;
    $self->{'_last_update'} = shift if(@_);
    return $self->{'_last_update'};
}
228

229
sub seconds_since_last_update {     # this method is mostly used to convert between server time and local time
230 231 232
    my( $self, $value ) = @_;
    $self->{'_last_update'} = time() - $value if(defined($value));
    return time() - $self->{'_last_update'};
233 234
}

235 236 237 238
sub sync_lock {
    my $self = shift;
    $self->{'_sync_lock'} = shift if(@_);
    return $self->{'_sync_lock'};
239 240
}

241 242 243 244 245 246 247 248

# non-storable attributes and other helper-methods:


sub refresh {
    my $self = shift;

    return $self->adaptor && $self->adaptor->refresh($self);
249 250
}

251 252 253 254
sub update {
    my $self = shift;
    return unless($self->adaptor);
    $self->adaptor->update($self);
255 256
}

257 258 259 260 261
sub update_status {
    my ($self, $status ) = @_;
    return unless($self->adaptor);
    $self->adaptor->update_status($self->analysis_id, $status);
    $self->status($status);
262 263
}

264 265 266
sub get_analysis {
    my $self = shift;
    unless($self->{'_analysis'}) {
267 268 269
        unless($self->analysis_id) {
            throw("self->analysis_id undefined, please investigate");
        }
270 271 272
        $self->{'_analysis'} = $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id);
    }
    return $self->{'_analysis'};
273 274
}

275
sub get_or_estimate_batch_size {
276 277
    my $self = shift;

278 279 280 281 282 283 284 285 286 287 288 289 290
    if( (my $batch_size = $self->batch_size())>0 ) {        # set to positive or not set (and auto-initialized within $self->batch_size)

        return $batch_size;
                                                        # otherwise it is a request for dynamic estimation:
    } elsif( my $avg_msec_per_job = $self->avg_msec_per_job() ) {           # further estimations from collected stats

        $avg_msec_per_job = 100 if($avg_msec_per_job<100);

        return POSIX::ceil( $self->min_batch_time() / $avg_msec_per_job );

    } else {        # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
        return -$batch_size || 1;
    }
291
}
292

293

294 295 296 297 298 299 300 301
sub inprogress_job_count {
    my $self = shift;
    return    $self->total_job_count
            - $self->semaphored_job_count
            - $self->ready_job_count
            - $self->done_job_count
            - $self->failed_job_count;
}
302

303

304 305 306
sub job_count_breakout {
    my $self = shift;

307 308 309
    my @count_list = ();
    my %count_hash = ();
    my $total_job_count = $self->total_job_count();
310
    foreach my $count_method (qw(semaphored_job_count ready_job_count inprogress_job_count done_job_count failed_job_count)) {
311 312
        if( my $count = $count_hash{$count_method} = $self->$count_method() ) {
            push @count_list, $count.substr($count_method,0,1);
313 314
        }
    }
315 316
    my $breakout_label = join('+', @count_list);
    $breakout_label .= '='.$total_job_count if(scalar(@count_list)!=1); # only provide a total if multiple or no categories available
317

318
    return ($breakout_label, $total_job_count, \%count_hash);
319 320 321
}


322 323
sub toString {
    my $self = shift @_;
324

325 326
    my $analysis = $self->get_analysis;

327 328 329 330
    my $output .= sprintf("%-27s(%2d) %11s jobs(Sem:%d, Rdy:%d, InProg:%d, Done+Pass:%d, Fail:%d)=%d Ave_msec:%d, workers(Running:%d, Reqired:%d) ",
        $analysis->logic_name,
        $self->analysis_id,

331
        $self->status,
332 333 334 335 336

        $self->semaphored_job_count,
        $self->ready_job_count,
        $self->inprogress_job_count,
        $self->done_job_count,
337
        $self->failed_job_count,
338 339
        $self->total_job_count,

340
        $self->avg_msec_per_job,
341 342 343

        $self->num_running_workers,
        $self->num_required_workers,
344
    );
345 346
    $output .=  '  h.cap:'    .( defined($self->hive_capacity) ? $self->hive_capacity : '-' )
               .'  a.cap:'    .( defined($analysis->analysis_capacity) ? $analysis->analysis_capacity : '-')
347
               ."  (sync'd "  .$self->seconds_since_last_update." sec ago)";
348 349

    return $output;
350 351
}

352 353 354 355

sub check_blocking_control_rules {
    my $self = shift;
  
356
    my $ctrl_rules = $self->adaptor->db->get_AnalysisCtrlRuleAdaptor->fetch_all_by_ctrled_analysis_id($self->analysis_id);
357

358
    my $all_conditions_satisfied = 1;
359

Leo Gordon's avatar
Leo Gordon committed
360
    if(scalar @$ctrl_rules) {    # there are blocking ctrl_rules to check
361 362 363 364 365

        foreach my $ctrl_rule (@$ctrl_rules) {
                #use this method because the condition_analysis objects can be
                #network distributed to a different database so use it's adaptor to get
                #the AnalysisStats object
366 367 368 369
            my $condition_analysis  = $ctrl_rule->condition_analysis;
            my $condition_stats     = $condition_analysis && $condition_analysis->stats;
            my $condition_status    = $condition_stats    && $condition_stats->status;
            my $condition_cbe       = $condition_analysis && $condition_analysis->can_be_empty;
Leo Gordon's avatar
Leo Gordon committed
370
            my $condition_tjc       = $condition_stats    && $condition_stats->total_job_count;
371

372
            my $this_condition_satisfied = ($condition_status eq 'DONE')
373
                        || ($condition_cbe && !$condition_tjc);             # probably safer than saying ($condition_status eq 'EMPTY') because of the sync order
Leo Gordon's avatar
Leo Gordon committed
374

375 376
            unless( $this_condition_satisfied ) {
                $all_conditions_satisfied = 0;
377 378 379
            }
        }

380
        if($all_conditions_satisfied) {
381 382 383 384 385 386 387
            if($self->status eq 'BLOCKED') {    # unblock, since all conditions are met
                $self->update_status('LOADING'); # trigger sync
            }
        } else {    # (re)block
            $self->update_status('BLOCKED');
        }
    }
Leo Gordon's avatar
Leo Gordon committed
388

389
    return $all_conditions_satisfied;
390 391 392
}


393 394 395 396
sub determine_status {
    my $self = shift;

    if($self->status ne 'BLOCKED') {
397
        if( !$self->total_job_count ) {
398

399
            $self->status('EMPTY');
400

401
        } elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) {   # all jobs of the analysis have been finished
402 403
            my $analysis = $self->get_analysis;
            my $absolute_tolerance = $analysis->failed_job_tolerance * $self->total_job_count / 100.0;
404 405 406
            if ($self->failed_job_count > $absolute_tolerance) {
                $self->status('FAILED');
                print "\n##################################################\n";
407 408
                printf("##   ERROR: %-35s ##\n", $analysis->logic_name." failed!");
                printf("##     %d jobs failed (tolerance: %d (%3d%%)) ##\n", $self->failed_job_count, $absolute_tolerance, $analysis->failed_job_tolerance);
409 410 411 412
                print "##################################################\n\n";
            } else {
                $self->status('DONE');
            }
413 414 415 416 417
        } elsif( $self->ready_job_count && !$self->inprogress_job_count ) { # there are claimable jobs, but nothing actually running

            $self->status('READY');

        } elsif( !$self->ready_job_count ) {                                # there are no claimable jobs, possibly because some are semaphored
418 419 420

            $self->status('ALL_CLAIMED');

421
        } elsif( $self->inprogress_job_count ) {
422 423 424 425 426 427 428

            $self->status('WORKING');
        }
    }
}


429
1;