Analysis.pm 19.3 KB
Newer Older
1 2 3 4
=pod 

=head1 NAME

5
    Bio::EnsEMBL::Hive::Analysis
6 7 8 9 10 11 12 13 14 15

=head1 DESCRIPTION

    An Analysis object represents a "stage" of the Hive pipeline that groups together
    all jobs that share the same module and the same common parameters.

    Individual Jobs are said to "belong" to an Analysis.

    Control rules unblock when their condition Analyses are done.

16 17
=head1 LICENSE

18
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
nwillhoft's avatar
nwillhoft committed
19
    Copyright [2016-2021] EMBL-European Bioinformatics Institute
20 21 22 23 24 25 26 27 28 29

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.

30 31
=head1 CONTACT

32
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
33 34 35

=cut

36

37 38
package Bio::EnsEMBL::Hive::Analysis;

39
use sort 'stable';
40
use strict;
41
use warnings;
42

43
use Bio::EnsEMBL::Hive::Utils ('stringify', 'throw');
Matthieu Muffato's avatar
Matthieu Muffato committed
44
use Bio::EnsEMBL::Hive::AnalysisJob;
45
use Bio::EnsEMBL::Hive::GuestProcess;
Leo Gordon's avatar
Leo Gordon committed
46

47

48
use base ( 'Bio::EnsEMBL::Hive::Storable' );
49 50
 

51 52 53 54 55
sub unikey {    # override the default from Cacheable parent
    return [ 'logic_name' ];
}


56 57 58 59 60 61 62
=head1 AUTOLOADED

    resource_class_id / resource_class

=cut


63
sub logic_name {
64 65
    my $self = shift;
    $self->{'_logic_name'} = shift if(@_);
66 67 68
    return $self->{'_logic_name'};
}

69 70 71 72 73
sub name {              # a useful synonym
    my $self = shift;

    return $self->logic_name(@_);
}
74 75 76


sub module {
77 78
    my $self = shift;
    $self->{'_module'} = shift if(@_);
79 80 81
    return $self->{'_module'};
}

Leo Gordon's avatar
Leo Gordon committed
82

83 84 85 86 87 88
sub language {
    my $self = shift;
    $self->{'_language'} = shift if(@_);
    return $self->{'_language'};
}

89 90

sub parameters {
91
    my $self = shift;
92 93 94 95
    if(@_) {
        my $parameters = shift @_;
        $self->{'_parameters'} = ref($parameters) ? stringify($parameters) : $parameters;
    }
96 97 98 99
    return $self->{'_parameters'};
}


100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
sub comment {
    my $self = shift;
    $self->{'_comment'} = shift if(@_);
    $self->{'_comment'} //= '';
    return $self->{'_comment'};
}


sub tags {
    my $self = shift;
    $self->{'_tags'} = shift if(@_);
    $self->{'_tags'} //= '';
    return $self->{'_tags'};
}


116 117 118
sub failed_job_tolerance {
    my $self = shift;
    $self->{'_failed_job_tolerance'} = shift if(@_);
119
    $self->{'_failed_job_tolerance'} //= 0;
120 121 122 123 124 125 126 127 128 129 130 131 132 133
    return $self->{'_failed_job_tolerance'};
}


sub max_retry_count {
    my $self = shift;
    $self->{'_max_retry_count'} = shift if(@_);
    return $self->{'_max_retry_count'};
}


sub can_be_empty {
    my $self = shift;
    $self->{'_can_be_empty'} = shift if(@_);
134
    $self->{'_can_be_empty'} //= 0;
135 136 137 138 139 140 141
    return $self->{'_can_be_empty'};
}


sub priority {
    my $self = shift;
    $self->{'_priority'} = shift if(@_);
142
    $self->{'_priority'} //= 0;
143
    return $self->{'_priority'};
144 145 146
}


147 148 149 150 151 152 153
sub meadow_type {
    my $self = shift;
    $self->{'_meadow_type'} = shift if(@_);
    return $self->{'_meadow_type'};
}


154 155 156 157 158 159
sub analysis_capacity {
    my $self = shift;
    $self->{'_analysis_capacity'} = shift if(@_);
    return $self->{'_analysis_capacity'};
}

160 161 162 163 164 165
sub hive_capacity {
    my $self = shift;
    $self->{'_hive_capacity'} = shift if(@_);
    return $self->{'_hive_capacity'};
}

166 167 168
sub batch_size {
    my $self = shift;
    $self->{'_batch_size'} = shift if(@_);
169
    $self->{'_batch_size'} //= 1;   # only initialize when undefined, so if defined as 0 will stay 0
170 171
    return $self->{'_batch_size'};
}
172

173 174 175 176 177 178
sub get_compiled_module_name {
    my $self = shift;

    my $runnable_module_name = $self->module
        or die "Analysis '".$self->logic_name."' does not have its 'module' defined";

179
    if ($self->language) {
Matthieu Muffato's avatar
Matthieu Muffato committed
180
        my $wrapper = Bio::EnsEMBL::Hive::GuestProcess::_get_wrapper_for_language($self->language);
181
        if (system($wrapper, 'compile', $runnable_module_name)) {
182
            die "The runnable module '$runnable_module_name' cannot be loaded or compiled:\n";
183
        }
184
        return 'Bio::EnsEMBL::Hive::GuestProcess';
185 186
    }

187
    eval "require $runnable_module_name";
188
    die "The runnable module '$runnable_module_name' cannot be loaded or compiled:\n$@" if($@);
189 190 191
    die "Problem accessing methods in '$runnable_module_name'. Please check that it inherits from Bio::EnsEMBL::Hive::Process and is named correctly.\n"
        unless($runnable_module_name->isa('Bio::EnsEMBL::Hive::Process'));

192 193 194
    die "DEPRECATED: the strict_hash_format() method is no longer supported in Runnables - the input_id() in '$runnable_module_name' has to be a hash now.\n"
        if($runnable_module_name->can('strict_hash_format'));

195 196 197 198
    return $runnable_module_name;
}


199 200
sub url_query_params {
     my ($self) = @_;
201

202 203 204
     return {
        'logic_name'            => $self->logic_name,
     };
205 206 207
}


208
sub display_name {
209 210
    my ($self) = @_;
    return $self->logic_name;
211 212 213
}


214 215 216 217
=head2 stats

  Arg [1]    : none
  Example    : $stats = $analysis->stats;
218
  Description: returns either the previously cached AnalysisStats object, or if it is missing - pulls a fresh one from the DB.
219 220 221 222 223 224 225
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats object
  Exceptions : none
  Caller     : general

=cut

sub stats {
226
    my $self = shift @_;
227

228
    return $self->hive_pipeline->collection_of( 'AnalysisStats' )->find_one_by('analysis', $self);
229 230 231
}


232 233 234 235 236 237 238 239 240 241 242
# --------------------------------- dispatch the following calls directly to our Stats: ---------------------------------------

sub status {
    my $self = shift @_;

    return $self->stats->status(@_);
}

# ------------------------------------------------------------------------------------------------------------------------------


243
sub jobs_collection {
244 245
    my $self = shift @_;

246
    $self->{'_jobs_collection'} = shift if(@_);
247

248 249 250 251 252 253 254
    return $self->{'_jobs_collection'} ||= [];
}


sub control_rules_collection {
    my $self = shift @_;

255
    return $self->hive_pipeline->collection_of( 'AnalysisCtrlRule' )->find_all_by('ctrled_analysis', $self);
256 257 258 259 260 261
}


sub dataflow_rules_collection {
    my $self = shift @_;

262
    return $self->hive_pipeline->collection_of( 'DataflowRule' )->find_all_by('from_analysis', $self);
263 264 265
}


266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
=head2 get_grouped_dataflow_rules

  Args       : none
  Example    : $groups = $analysis->get_grouped_dataflow_rules;
  Description: returns a listref of pairs, where the first element is a separate dfr or a funnel, and the second element is a listref of semaphored fan dfrs
  Returntype : listref

=cut

sub get_grouped_dataflow_rules {
    my $self = shift @_;

    my %set_of_groups = ();     # Note that the key (being a stringified reference) is unusable,
                                # so we end up packing it as the first element of the structure,
                                # and only returning the listref of the values.
281
    my @ordered_keys  = ();     # Perl is missing an "Ordered Hash" structure, so we need to maintain the insertion order ourselves
282

283 284 285
    my $all_dataflow_rules      = $self->dataflow_rules_collection;

    foreach my $dfr ((grep {$_->funnel_dataflow_rule} @$all_dataflow_rules), (grep {!$_->funnel_dataflow_rule} @$all_dataflow_rules)) {
286 287 288

        my $df_targets = $dfr->get_my_targets;

289
        if(my $funnel_dfr = $dfr->funnel_dataflow_rule) {
290
            unless($set_of_groups{$funnel_dfr}) {   # both the type check and the initial push will only be done once per funnel
291 292 293
                my $funnel_targets = $funnel_dfr->get_my_targets;
                foreach my $funnel_target (@$funnel_targets) {
                    unless($funnel_target->to_analysis->isa('Bio::EnsEMBL::Hive::Analysis')) {
294 295
                        throw("Each conditional branch of a semaphored funnel rule must point at an Analysis");
                    }
296
                }
297
                push @ordered_keys, $funnel_dfr;
298
                $set_of_groups{$funnel_dfr} = [$funnel_dfr, [], $funnel_targets];
299 300 301
            }
            my $this_group = $set_of_groups{$funnel_dfr};

302
            foreach my $df_target (@$df_targets) {
303 304 305
                unless($df_target->to_analysis->isa('Bio::EnsEMBL::Hive::Analysis')) {
                    throw("Each conditional branch of a semaphored fan rule must point at an Analysis");
                }
306
            }
307
            push @{$this_group->[1]}, $dfr;
308 309 310 311

        } elsif (!$set_of_groups{$dfr}) {
            push @ordered_keys, $dfr;
            $set_of_groups{$dfr} = [$dfr, [], $df_targets];
312 313
        }
    }
314 315
    my @sorted_rules = sort { scalar(@{$set_of_groups{$a}->[1]}) <=> scalar(@{$set_of_groups{$b}->[1]}) or $set_of_groups{$a}->[0]->branch_code <=> $set_of_groups{$b}->[0]->branch_code } @ordered_keys;
    return [map {$set_of_groups{$_}} @sorted_rules];
316 317 318
}


319 320 321 322 323
sub dataflow_rules_by_branch {
    my $self = shift @_;

    if (not $self->{'_dataflow_rules_by_branch'}) {
        my %dataflow_rules_by_branch = ();
Leo Gordon's avatar
Leo Gordon committed
324 325 326 327 328 329 330 331
        foreach my $df_rule (@{$self->dataflow_rules_collection}) {
            my $dfr_bb = $dataflow_rules_by_branch{ $df_rule->branch_code } ||= []; # no autovivification here, have to do it manually

            if($df_rule->funnel_dataflow_rule) {    # sort rules so that semaphored fan rules come before other (potentially fan) rules for the same branch_code
                unshift @$dfr_bb, $df_rule;
            } else {
                push @$dfr_bb, $df_rule;
            }
332 333 334 335 336
        }
        $self->{'_dataflow_rules_by_branch'} = \%dataflow_rules_by_branch;
    }

    return $self->{'_dataflow_rules_by_branch'};
337 338 339
}


340
sub dataflow {
341
    my ( $self, $output_ids_for_this_rule, $emitting_job, $same_db_dataflow, $push_emitting_job_on_stack, $df_rule ) = @_;
342

343 344 345
    my $param_id_stack      = '';
    my $accu_id_stack       = '';
    my $emitting_job_id     = undef;
346

347 348 349 350
    if($same_db_dataflow) {
        $param_id_stack     = $emitting_job->param_id_stack;
        $accu_id_stack      = $emitting_job->accu_id_stack;
        $emitting_job_id    = $emitting_job->dbID;
351

352 353 354 355 356 357 358 359 360 361
        if($push_emitting_job_on_stack) {
            my $input_id        = $emitting_job->input_id;
            my $accu_hash       = $emitting_job->accu_hash;

            if($input_id and ($input_id ne '{}')) {     # add the parent to the param_id_stack if it had non-trivial extra parameters
                $param_id_stack = ($param_id_stack ? $param_id_stack.',' : '').$emitting_job_id;
            }
            if(scalar(keys %$accu_hash)) {    # add the parent to the accu_id_stack if it had "own" accumulator
                $accu_id_stack = ($accu_id_stack ? $accu_id_stack.',' : '').$emitting_job_id;
            }
362 363 364 365 366 367
        }
    }

    my $common_params = [
        'prev_job'          => $emitting_job,
        'analysis'          => $self,
368
        'hive_pipeline'     => $self->hive_pipeline,    # Although we may not cache jobs, make sure a new Job "belongs" to the same pipeline as its Analysis
369 370 371
        'param_id_stack'    => $param_id_stack,
        'accu_id_stack'     => $accu_id_stack,
    ];
372

373
    my $job_adaptor     = $self->adaptor->db->get_AnalysisJobAdaptor;
374 375 376 377 378 379 380
    my @output_job_ids  = ();

    if( my $funnel_dataflow_rule = $df_rule->funnel_dataflow_rule ) {    # members of a semaphored fan will have to wait in cache until the funnel is created:

        my $fan_cache_this_branch = $emitting_job->fan_cache->{"$funnel_dataflow_rule"} ||= [];
        push @$fan_cache_this_branch, map { Bio::EnsEMBL::Hive::AnalysisJob->new(
                                                @$common_params,
381 382
                                                'input_id'              => $_,
                                                # controlled_semaphore  => to be set when the $controlled_semaphore has been stored
383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
                                            ) } @$output_ids_for_this_rule;

    } else {    # either a semaphored funnel or a non-semaphored dataflow:

        my $fan_jobs = delete $emitting_job->fan_cache->{"$df_rule"};   # clear the cache at the same time

        if( $fan_jobs && @$fan_jobs ) { # a semaphored funnel

            if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {

                $emitting_job->transient_error(0);
                die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";

            } else {
                my $funnel_job = Bio::EnsEMBL::Hive::AnalysisJob->new(
398 399
                    @$common_params,
                    'input_id'          => $output_ids_for_this_rule->[0],
400
                    'status'            => 'SEMAPHORED',
401 402
                );

403
                    # NB: $job_adaptor happens to belong to the $funnel_job, but not necesarily to $fan_jobs or $emitting_job
404
                my ($semaphore_id, $funnel_job_id, @fan_job_ids) = $job_adaptor->store_a_semaphored_group_of_jobs( $funnel_job, $fan_jobs, $emitting_job );
405

406
                push @output_job_ids, $funnel_job_id, @fan_job_ids;
407 408 409 410
            }
        } else {    # non-semaphored dataflow (but potentially propagating any existing semaphores)
            my @non_semaphored_jobs = map { Bio::EnsEMBL::Hive::AnalysisJob->new(
                                                @$common_params,
411 412
                                                'input_id'              => $_,
                                                'controlled_semaphore'  => $emitting_job->controlled_semaphore,     # propagate parent's semaphore if any
413 414
            ) } @$output_ids_for_this_rule;

415
                # NB: $job_adaptor happens to belong to the @non_semaphored_jobs, but not necessarily to the $emitting_job :
416
            push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0, $emitting_job_id) };
417 418 419 420 421 422 423
        }
    } # /if funnel

    return \@output_job_ids;
}


424 425 426
sub toString {
    my $self = shift @_;

427
    return 'Analysis['.($self->dbID // '').']: '.$self->display_name.'->('.join(', ', ($self->module // 'no_module').($self->language ? sprintf(' (%s)', $self->language) : ''), $self->parameters // '{}', $self->resource_class ? $self->resource_class->name : 'no_rc').')';
428 429
}

430 431

sub print_diagram_node {
432 433
    my ($self, $ref_pipeline, $prefix, $seen_analyses) = @_;

434
    if($seen_analyses->{$self}++) {
435
        print "(".$self->relative_display_name($ref_pipeline).")\n";  # NB: the prefix of the label itself is done by the previous level
436 437
        return;
    }
438

439
    print $self->relative_display_name($ref_pipeline)."\n";  # NB: the prefix of the label itself is done by the previous level
440 441 442 443 444

    my $groups = $self->get_grouped_dataflow_rules;

    foreach my $i (0..scalar(@$groups)-1) {

445 446
        my ($funnel_dfr, $fan_dfrs, $df_targets) = @{ $groups->[$i] };

447 448
        my $this_funnel_offset  = '';

Leo Gordon's avatar
Leo Gordon committed
449 450 451 452
        if(scalar(@$groups)>1 and scalar(@$fan_dfrs)) {     # if more than one group (no single backbone) and semaphored, the semaphored group will be offset:
            print $prefix."\n";
            print $prefix." ╘════╤══╗\n";   # " └────┬──┐\n";
            $this_funnel_offset = ($i < scalar(@$groups)-1) ? '' : '     ';    # non-last vs last group
453 454
        }

455
        foreach my $j (0..scalar(@$fan_dfrs)-1) {   # for each of the dependent fan rules, show them one by one:
456 457
            my $fan_dfr     = $fan_dfrs->[$j];
            my $fan_branch  = $fan_dfr->branch_code;
458

459 460 461
            print $prefix.$this_funnel_offset." │  ║\n";
            print $prefix.$this_funnel_offset." │  ║\n";
            print $prefix.$this_funnel_offset." │  ║#$fan_branch\n";
462

463 464
            my $fan_df_targets = $fan_dfr->get_my_targets;

465
            foreach my $k (0..scalar(@$fan_df_targets)-1) {   # for each fan's target
466 467
                my $fan_target = $fan_df_targets->[$k];

468
                print $prefix.$this_funnel_offset." │  ║\n";
469 470

                if(my $fan_choice = (scalar(@$fan_df_targets)!=1) || defined($fan_target->on_condition)) {
471
                    if(defined(my $on_condition = $fan_target->on_condition)) {
472
                        print $prefix.$this_funnel_offset." │  ║ WHEN $on_condition\n";
473
                    } else {
474
                        print $prefix.$this_funnel_offset." │  ║ ELSE\n";
475 476
                    }
                }
477
                print $prefix.$this_funnel_offset.' │├─╚═> ';
478 479

                my $next_fan_or_condition_offset = ($j<scalar(@$fan_dfrs)-1 or $k<scalar(@$fan_df_targets)-1) ? ' │  ║   ' : '';
480

481 482
                if(my $template = $fan_target->input_id_template) {
                    print "$template\n";
483 484 485
                    print $prefix.$this_funnel_offset.$next_fan_or_condition_offset."\n";
                    print $prefix.$this_funnel_offset.$next_fan_or_condition_offset." V\n";
                    print $prefix.$this_funnel_offset.$next_fan_or_condition_offset;
486
                }
487

488
                $fan_target->to_analysis->print_diagram_node($ref_pipeline, $prefix.$this_funnel_offset.$next_fan_or_condition_offset, $seen_analyses );
489
            }
490 491
        }

492 493 494 495 496 497 498 499 500 501 502 503
        my $funnel_branch   = $funnel_dfr->branch_code;

        print $prefix.$this_funnel_offset."\n";
        print $prefix.$this_funnel_offset."\n";
        print $prefix.$this_funnel_offset." │#$funnel_branch\n";

        foreach my $k (0..scalar(@$df_targets)-1) {   # for each funnel's target
            my $df_target = $df_targets->[$k];

            print $prefix.$this_funnel_offset."\n";

            my $funnel_choice = (scalar(@$df_targets)!=1) || defined($df_target->on_condition);
504

505
            if($funnel_choice) {
506
                if(defined(my $on_condition = $df_target->on_condition)) {
507 508 509 510 511 512
                    print $prefix.$this_funnel_offset." │ WHEN $on_condition\n";
                } else {
                    print $prefix.$this_funnel_offset." │ ELSE\n";
                }
            }

Leo Gordon's avatar
Leo Gordon committed
513
            my $next_funnel_or_condition_offset = '';
514

515 516 517 518 519
            if( (scalar(@$groups)==1 or $this_funnel_offset) and !$funnel_choice ) {  # 'the only group' (backbone) or a semaphore funnel ...
                print $prefix.$this_funnel_offset." V\n";       # ... make a vertical arrow
                print $prefix.$this_funnel_offset;
            } else {
                print $prefix.$this_funnel_offset.' └─▻ ';      # otherwise fork to the right
Leo Gordon's avatar
Leo Gordon committed
520
                $next_funnel_or_condition_offset = ($i<scalar(@$groups)-1 or $k<scalar(@$df_targets)-1) ? '' : '    ';
521 522 523
            }
            if(my $template = $df_target->input_id_template) {
                print "$template\n";
Leo Gordon's avatar
Leo Gordon committed
524 525 526
                print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset."\n";
                print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset." V\n";
                print $prefix.$this_funnel_offset.$next_funnel_or_condition_offset;
527 528 529 530
            }

            my $target = $df_target->to_analysis;
            if($target->can('print_diagram_node')) {
Leo Gordon's avatar
Leo Gordon committed
531
                $target->print_diagram_node($ref_pipeline, $prefix.$this_funnel_offset.$next_funnel_or_condition_offset, $seen_analyses );
532 533 534 535 536
            } elsif($target->isa('Bio::EnsEMBL::Hive::NakedTable')) {
                print '[[ '.$target->relative_display_name($ref_pipeline)." ]]\n";
            } elsif($target->isa('Bio::EnsEMBL::Hive::Accumulator')) {
                print '<<-- '.$target->relative_display_name($ref_pipeline)."\n";
            }
537 538 539 540
        }
    }
}

541 542
1;