Worker.pm 29.1 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3
=pod 

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::Worker
Jessica Severin's avatar
Jessica Severin committed
6 7

=head1 DESCRIPTION
8

9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    Object which encapsulates the details of how to find jobs, how to run those
    jobs, and then check the rules to create the next jobs in the chain.
    Essentially knows where to find data, how to process data, and where to
    put it when it is done (put in next persons INBOX) so the next Worker
    in the chain can find data to work on.

    Hive based processing is a concept based on a more controlled version
    of an autonomous agent type system.  Each worker is not told what to do
    (like a centralized control system - like the current pipeline system)
    but rather queries a central database for jobs (give me jobs).

    Each worker is linked to an analysis_id, registers its self on creation
    into the Hive, creates a RunnableDB instance of the Analysis->module,
    gets relevant configuration information from the database, does its
    work, creates the next layer of job entries by interfacing to
    the DataflowRuleAdaptor to determine the analyses it needs to pass its
    output data to and creates jobs on the database of the next analysis.
    It repeats this cycle until it has lived its lifetime or until there are no
    more jobs left to process.
    The lifetime limit is a safety limit to prevent these from 'infecting'
    a system and sitting on a compute node for longer than is socially exceptable.
    This is primarily needed on compute resources like an LSF system where jobs
    are not preempted and run until they are done.

    The Queens primary job is to create Workers to get the work down.
    As part of this, she is also responsible for summarizing the status of the
    analyses by querying the jobs, summarizing, and updating the
    analysis_stats table.  From this she is also responsible for monitoring and
    'unblocking' analyses via the analysis_ctrl_rules.
    The Queen is also responsible for freeing up jobs that were claimed by Workers
    that died unexpectantly so that other workers can take over the work.  

    The Beekeeper is in charge of interfacing between the Queen and a compute resource
    or 'compute farm'.  Its job is to query Queens if they need any workers and to
    send the requested number of workers to open machines via the runWorker.pl script.
    It is also responsible for interfacing with the Queen to identify workers which died
    unexpectantly so that she can free the dead workers unfinished jobs.

=head1 LICENSE

49
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
nwillhoft's avatar
nwillhoft committed
50
    Copyright [2016-2021] EMBL-European Bioinformatics Institute
51 52 53 54 55 56 57 58 59

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
Jessica Severin's avatar
Jessica Severin committed
60 61 62

=head1 CONTACT

63
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
64

Jessica Severin's avatar
Jessica Severin committed
65
=head1 APPENDIX
66

67 68
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
69 70 71

=cut

72

Jessica Severin's avatar
Jessica Severin committed
73 74 75
package Bio::EnsEMBL::Hive::Worker;

use strict;
76
use warnings;
77
use POSIX;
78 79 80 81

use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::Hive::Limiter;
82
use Bio::EnsEMBL::Hive::Process;
83
use Bio::EnsEMBL::Hive::DBSQL::AccumulatorAdaptor;
Jessica Severin's avatar
Jessica Severin committed
84
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
85
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
86
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
87
use Bio::EnsEMBL::Hive::Utils::RedirectStack;
88
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
89
use Bio::EnsEMBL::Hive::Utils ('stringify', 'throw');
90

91
use base ( 'Bio::EnsEMBL::Hive::Storable' );
Jessica Severin's avatar
Jessica Severin committed
92

93

94 95 96 97 98 99 100
=head1 AUTOLOADED

    resource_class_id / resource_class

=cut


Jessica Severin's avatar
Jessica Severin committed
101
sub init {
102
    my $self = shift;
103

104 105 106 107
    my $lifespan_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    $lifespan_stopwatch->_unit(1); # count in seconds (default is milliseconds)
    $lifespan_stopwatch->restart;
    $self->lifespan_stopwatch( $lifespan_stopwatch );
108

109
    return $self;
Jessica Severin's avatar
Jessica Severin committed
110 111
}

112

113 114 115
## Storable object's getters/setters:


116
sub meadow_type {
117 118 119
    my $self = shift;
    $self->{'_meadow_type'} = shift if(@_);
    return $self->{'_meadow_type'};
120
}
121 122


123
sub meadow_name {
124 125 126 127 128 129 130 131 132 133
    my $self = shift;
    $self->{'_meadow_name'} = shift if(@_);
    return $self->{'_meadow_name'};
}


sub host {
    my $self = shift;
    $self->{'_host'} = shift if(@_);
    return $self->{'_host'};
134
}
135 136


137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
sub process_id {
    my $self = shift;
    $self->{'_process_id'} = shift if(@_);
    return $self->{'_process_id'};
}


sub work_done {
    my $self = shift;
    $self->{'_work_done'} = shift if(@_);
    return $self->{'_work_done'} || 0;
}


sub status {
    my $self = shift;
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
}


sub born {
    my $self = shift;
    $self->{'_born'} = shift if(@_);
    return $self->{'_born'};
}


sub last_check_in {
    my $self = shift;
    $self->{'_last_check_in'} = shift if(@_);
    return $self->{'_last_check_in'};
}


sub died {
    my $self = shift;
    $self->{'_died'} = shift if(@_);
    return $self->{'_died'};
}


sub cause_of_death {
    my $self = shift;
    $self->{'_cause_of_death'} = shift if(@_);
    return $self->{'_cause_of_death'};
}


=head2 log_dir

  Arg [1] : (optional) string directory path
  Title   : log_dir
  Usage   : $worker_log_dir = $self->log_dir;
            $self->log_dir($worker_log_dir);
  Description: Storable getter/setter attribute for the directory where STDOUT and STRERR of the worker will be redirected to.
               In this directory each job will have its own .out and .err files.
  Returntype : string

=cut

sub log_dir {
    my $self = shift;
    $self->{'_log_dir'} = shift if(@_);
    return $self->{'_log_dir'};
}



## Non-Storable attributes:

208
sub current_role {
209 210 211 212 213 214 215 216 217 218 219 220 221
    my $self = shift;

    if( @_ ) {
        if( my $from_analysis = $self->{'_current_role'} && $self->{'_current_role'}->analysis ) {
            $self->worker_say( "unspecializing from ".$from_analysis->logic_name.'('.$from_analysis->dbID.')' );
        }
        my $new_role = shift @_;
        if( my $to_analysis = $new_role && $new_role->analysis ) {
            $self->worker_say( "specializing to ".$to_analysis->logic_name.'('.$to_analysis->dbID.')' );
        }
        $self->{'_current_role'} = $new_role;
    }
    return $self->{'_current_role'};
222 223 224
}


225 226 227 228 229 230
sub debug {
  my $self = shift;
  $self->{'_debug'} = shift if(@_);
  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));
  return $self->{'_debug'};
}
231 232


233 234 235 236 237 238
sub execute_writes {
  my $self = shift;
  $self->{'_execute_writes'} = shift if(@_);
  $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
  return $self->{'_execute_writes'};
}
Jessica Severin's avatar
Jessica Severin committed
239

240

241 242 243 244 245 246
sub special_batch {
  my $self = shift;
  $self->{'_special_batch'} = shift if(@_);
  return $self->{'_special_batch'};
}

247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272

sub perform_cleanup {
  my $self = shift;
  $self->{'_perform_cleanup'} = shift if(@_);
  $self->{'_perform_cleanup'} = 1 unless(defined($self->{'_perform_cleanup'}));
  return $self->{'_perform_cleanup'};
}


# this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?

sub retry_throwing_jobs {
    my $self = shift @_;

    $self->{'_retry_throwing_jobs'} = shift @_ if(@_);
    return defined($self->{'_retry_throwing_jobs'}) ? $self->{'_retry_throwing_jobs'} : 1;
}


sub can_respecialize {
    my $self = shift;
    $self->{'_can_respecialize'} = shift if(@_);
    return $self->{'_can_respecialize'};
}


Jessica Severin's avatar
Jessica Severin committed
273
=head2 life_span
274

Jessica Severin's avatar
Jessica Severin committed
275 276 277 278 279 280 281 282 283
  Arg [1] : (optional) integer $value (in seconds)
  Title   :   life_span
  Usage   :   $value = $self->life_span;
              $self->life_span($new_value);
  Description: Defines the maximum time a worker can live for. Workers are always
               allowed to complete the jobs they get, but whether they can
               do multiple rounds of work is limited by their life_span
  DefaultValue : 3600 (60 minutes)
  Returntype : integer scalar
284

Jessica Severin's avatar
Jessica Severin committed
285 286
=cut

287
sub life_span { # default life_span = 60minutes
288 289 290 291 292 293 294 295
    my ($self, $value) = @_;

    if(defined($value)) {               # you can still set it to 0 and avoid having the limit on lifespan
        $self->{'_life_span'} = $value;
    } elsif(!defined($self->{'_life_span'})) {
        $self->{'_life_span'} = 60*60;
    }
    return $self->{'_life_span'};
Jessica Severin's avatar
Jessica Severin committed
296 297
}

298
sub lifespan_stopwatch {
299 300 301
    my $self = shift @_;

    if(@_) {
302
        $self->{'_lifespan_stopwatch'} = shift @_;
303
    }
304
    return $self->{'_lifespan_stopwatch'};
305 306 307 308 309 310
}

sub life_span_limit_reached {
    my $self = shift @_;

    if( $self->life_span() ) {
311
        my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
312 313 314 315 316 317 318 319
        if($alive_for_secs > $self->life_span() ) {
            return $alive_for_secs;
        }
    }
    return 0;
}


320
=head2 job_limiter
321

322
  Title   :   job_limiter
323
  Arg [1] :   (optional) integer $value
324 325 326 327
  Usage   :   $limiter_obj = $self->job_limiter;
              $self->job_limiter($new_value);
  Description: The maximum number of jobs to be done by the Worker can be limited by the given number.
               A worker 'dies' when either the 'life_span' or 'job_limit' is exceeded.
328
  DefaultValue : undef (relies on life_span to limit life of worker)
329
  Returntype : Hive::Limiter object
330 331 332

=cut

333 334 335
sub job_limiter {
    my $self=shift;
    if( scalar(@_) or !defined($self->{'_job_limiter'}) ) {
336
        $self->{'_job_limiter'} = Bio::EnsEMBL::Hive::Limiter->new("Total number of jobs this Worker is allowed to take", shift @_);
337 338
    }
    return $self->{'_job_limiter'};
339 340
}

341

342
sub more_work_done {
343
    my ($self, $job_partial_timing) = @_;
344

345
    $self->{'_work_done'}++;
346 347 348 349

    while( my ($state, $partial_timing_in_state) = each %$job_partial_timing ) {
        $self->{'_interval_partial_timing'}{$state} += $partial_timing_in_state;
    }
350 351
}

352

353 354 355 356 357 358 359 360 361 362 363
# By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
#
# Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
# but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die.

sub prev_job_error {
    my $self = shift @_;

    $self->{'_prev_job_error'} = shift if(@_);
    return $self->{'_prev_job_error'};
}
364

365 366 367 368 369 370 371
sub runnable_object {
    my $self = shift @_;

    $self->{'_runnable_object'} = shift @_ if(@_);
    return $self->{'_runnable_object'};
}

372

373 374 375 376 377 378 379 380 381 382 383 384
sub get_stdout_redirector {
    my $self = shift;

    return $self->{_stdout_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDOUT);
}

sub get_stderr_redirector {
    my $self = shift;

    return $self->{_stderr_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDERR);
}

385

386 387
sub worker_say {
    my ($self, $msg) = @_;
388

389 390
    my $worker_id       = $self->dbID();
    my $current_role    = $self->current_role;
391
    my $job_id          = $self->runnable_object && $self->runnable_object->input_job && $self->runnable_object->input_job->dbID;
392
    print "Worker $worker_id [ ". ( $current_role
393 394 395
                                    ? ('Role '.$current_role->dbID.' , '.$current_role->analysis->logic_name.'('.$current_role->analysis_id.')'
                                        . ($job_id ? ", Job $job_id" : '')
                                      )
396 397
                                    : 'UNSPECIALIZED'
                                  )." ] $msg\n";
398 399 400 401
}


sub toString {
402 403 404
    my ($self, $include_analysis) = @_;

    my $current_role = $self->current_role;
405

406
    return join(', ',
407 408
            $include_analysis ? ( 'analysis='.($current_role ? $current_role->analysis->logic_name.'('.$current_role->analysis_id.')' : 'UNSPECIALIZED') ) : (),
            'resource_class_id='.($self->resource_class_id // 'NULL'),
409 410
            'meadow='.$self->meadow_type.'/'.$self->meadow_name,
            'process='.$self->process_id.'@'.$self->host,
411 412 413 414 415
            'last_check_in='.($self->last_check_in // 'NEVER'),
            'batch_size='.($current_role ? $current_role->analysis->stats->get_or_estimate_batch_size() : 'UNSPECIALIZED'),
            'job_limit='.($self->job_limiter->available_capacity() // 'NONE'),
            'life_span='.($self->life_span // 'UNLIM'),
            'worker_log_dir='.($self->log_dir // 'STDOUT/STDERR'),
416
    );
Jessica Severin's avatar
Jessica Severin committed
417 418
}

419

Jessica Severin's avatar
Jessica Severin committed
420 421 422 423 424
###############################
#
# WORK section
#
###############################
425

Jessica Severin's avatar
Jessica Severin committed
426

427 428 429 430 431 432 433 434 435
=head2 run

  Title   :   run
  Usage   :   $worker->run;
  Description: 
    This is a self looping autonomous function to process jobs.
    First all STDOUT/STDERR is rediected, then looping commences.
    Looping consists of 
      1) claiming jobs,
436 437
      2) processing those jobs through an instance of the 'module class' of 
         the analysis asigned to this worker,  
438
      3) updating the job, analysis_stats, and hive tables to track the 
439 440 441 442 443 444 445 446 447
         progress of the job, the analysis and this worker.
    Looping stops when any one of these are met:
      1) there is no more jobs to process 
      2) job_limit is reached
      3) life_span has been reached.
  Returntype : none

=cut

Leo Gordon's avatar
Leo Gordon committed
448
sub run {
449
    my ($self, $specialization_arghash) = @_;
Jessica Severin's avatar
Jessica Severin committed
450

451 452 453
    if( my $worker_log_dir = $self->log_dir ) {
        $self->get_stdout_redirector->push( $worker_log_dir.'/worker.out' );
        $self->get_stderr_redirector->push( $worker_log_dir.'/worker.err' );
454 455
    }

456 457
    my $min_batch_time  = Bio::EnsEMBL::Hive::AnalysisStats::min_batch_time();
    my $job_adaptor     = $self->adaptor->db->get_AnalysisJobAdaptor;
Leo Gordon's avatar
Leo Gordon committed
458

459 460
    print "\n"; # to clear beekeeper's prompt in case output is not logged
    $self->worker_say( $self->toString() );
461
    $self->specialize_and_compile_wrapper( $specialization_arghash );
462

463
    while (!$self->cause_of_death) {  # Worker's lifespan loop (ends only when the worker dies for any reason)
Jessica Severin's avatar
Jessica Severin committed
464

465 466 467
        my $batches_stopwatch           = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
        my $jobs_done_by_batches_loop   = 0; # by all iterations of internal loop
        $self->{'_interval_partial_timing'} = {};
468

469
        if( my $special_batch = $self->special_batch() ) {
470
            my $special_batch_length = scalar(@$special_batch);     # has to be recorded because the list is gradually destroyed
471
            $jobs_done_by_batches_loop += $self->run_one_batch( $special_batch );
472
            $self->cause_of_death( $jobs_done_by_batches_loop == $special_batch_length ? 'JOB_LIMIT' : 'CONTAMINATED');
473
        } else {    # a proper "BATCHES" loop
474

475
            while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
476
                my $current_role        = $self->current_role;
477

478
                if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
479
                    my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
480
                    $self->worker_say( $msg );
481
                    $self->cause_of_death('CONTAMINATED');
482
                    $job_adaptor->release_undone_jobs_from_role($current_role, $msg);
483

484
                } elsif( $self->job_limiter->reached()) {
485
                    $self->worker_say( "job_limit reached (".$self->work_done." jobs completed)" );
486
                    $self->cause_of_death('JOB_LIMIT');
487

488
                } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
489
                    $self->worker_say( "life_span limit reached (alive for $alive_for_secs secs)" );
490
                    $self->cause_of_death('LIFESPAN');
491 492

                } else {
493
                    my $desired_batch_size  = $current_role->analysis->stats->get_or_estimate_batch_size();
494 495
                    my $hit_the_limit;  # dummy at the moment
                    ($desired_batch_size, $hit_the_limit)   = $self->job_limiter->preliminary_offer( $desired_batch_size );
496

497
                    my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
498
                    if(scalar(@$actual_batch)) {
499
                        $self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_claim($self->current_role->analysis->dbID, scalar(@$actual_batch));
500 501 502
                        my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
                        $jobs_done_by_batches_loop += $jobs_done_by_this_batch;
                        $self->job_limiter->final_decision( $jobs_done_by_this_batch );
503 504 505
                    } else {
                        $self->cause_of_death('NO_WORK');
                    }
506 507 508
                }
            }
        }
509

510
        # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
511
        # so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
512
        #
513 514 515
        if($jobs_done_by_batches_loop) {

            $self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
516
                $self->current_role->analysis->dbID,
517 518 519 520 521 522 523
                $jobs_done_by_batches_loop,
                $batches_stopwatch->get_elapsed,
                $self->{'_interval_partial_timing'}{'FETCH_INPUT'}  || 0,
                $self->{'_interval_partial_timing'}{'RUN'}          || 0,
                $self->{'_interval_partial_timing'}{'WRITE_OUTPUT'} || 0,
            );
        }
524

525
            # A mechanism whereby workers can be caused to exit even if they were doing fine:
526
        if (!$self->cause_of_death) {
527 528
            my $analysis = $self->current_role->analysis;
            my $stats = $analysis->stats;     # make sure it is fresh from the DB
529
            if( defined($stats->hive_capacity) && (0 <= $stats->hive_capacity) && ($self->adaptor->db->get_RoleAdaptor->get_hive_current_load >= 1.1)
530
             or defined($analysis->analysis_capacity) && (0 <= $analysis->analysis_capacity) && ($analysis->analysis_capacity < $stats->num_running_workers)
531 532 533 534
            ) {
                $self->cause_of_death('HIVE_OVERLOAD');
            }
        }
535

536 537 538
        my $cod = $self->cause_of_death() || '';

        if( $cod eq 'NO_WORK') {
539 540 541
            $self->adaptor->db->get_AnalysisStatsAdaptor->update_status( $self->current_role->analysis_id, 'ALL_CLAIMED' );
        }

Leo Gordon's avatar
Leo Gordon committed
542
        if( $cod =~ /^(NO_WORK|HIVE_OVERLOAD)$/ and $self->can_respecialize and (!$specialization_arghash->{'-analyses_pattern'} or $specialization_arghash->{'-analyses_pattern'}!~/^\w+$/) ) {
543 544
            my $old_role = $self->current_role;
            $self->adaptor->db->get_RoleAdaptor->finalize_role( $old_role, 1 );
545
            $self->current_role( undef );
546
            $self->cause_of_death(undef);
547
            $self->specialize_and_compile_wrapper( $specialization_arghash, $old_role->analysis );
548 549 550
        }

    }     # /Worker's lifespan loop
551

552 553
        # have runnable clean up any global/process files/data it may have created
    if($self->perform_cleanup) {
554
        if(my $runnable_object = $self->runnable_object()) {    # the temp_directory is actually kept in the Process object:
555 556 557
            $runnable_object->cleanup_worker_temp_directory();
        }
    }
558

559 560 561
    # The second arguments ("self_burial") controls whether we need to
    # release the current (unfinished) batch
    $self->adaptor->register_worker_death($self, ($self->cause_of_death eq 'CONTAMINATED' ? 0 : 1));
562

563
    if($self->debug) {
564
        $self->worker_say( 'AnalysisStats : '.$self->current_role->analysis->stats->toString ) if( $self->current_role );
565 566
        $self->worker_say( 'dbc '.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles' );
    }
567

568
    $self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death  );
569 570 571 572 573

    if( $self->log_dir ) {
        $self->get_stdout_redirector->pop();
        $self->get_stderr_redirector->pop();
    }
574
}
Jessica Severin's avatar
Jessica Severin committed
575

576

577
sub specialize_and_compile_wrapper {
578
    my ($self, $specialization_arghash, $prev_analysis) = @_;
579 580 581

    eval {
        $self->enter_status('SPECIALIZATION');
582
        $self->adaptor->specialize_worker( $self, $specialization_arghash );
583 584 585
        1;
    } or do {
        my $msg = $@;
586
        chomp $msg;
587
        $self->worker_say( "specialization failed:\t$msg" );
588 589

        $self->cause_of_death('SEE_MSG') unless($self->cause_of_death());   # some specific causes could have been set prior to die "...";
590 591 592

        my $is_error = $self->cause_of_death() ne 'NO_ROLE';
        $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, $is_error );
593 594 595 596 597
    };

    if( !$self->cause_of_death() ) {
        eval {
            $self->enter_status('COMPILATION');
598 599 600 601

            my $runnable_object = $self->current_role->analysis->get_compiled_module_name->new()
                or die "Unknown compilation error";

602 603 604 605 606 607 608 609 610 611 612 613
            $runnable_object->db( $self->adaptor->db );
            $runnable_object->worker( $self );
            $runnable_object->debug( $self->debug );
            $runnable_object->execute_writes( $self->execute_writes );

            $self->runnable_object( $runnable_object );
            $self->enter_status('READY');

            $self->adaptor->db->dbc->disconnect_when_inactive(0);
            1;
        } or do {
            my $msg = $@;
614 615
            $self->worker_say( "runnable '".$self->current_role->analysis->module."' compilation failed :\t$msg" );
            $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self, $msg, 1 );
616 617 618 619 620 621 622

            $self->cause_of_death('SEE_MSG') unless($self->cause_of_death());   # some specific causes could have been set prior to die "...";
        };
    }
}


623 624
sub run_one_batch {
    my ($self, $jobs) = @_;
Jessica Severin's avatar
Jessica Severin committed
625

626
    my $jobs_done_here = 0;
627

628
    my $current_role            = $self->current_role;
629 630
    my $hive_use_param_stack    = $self->adaptor->db->hive_use_param_stack();
    my $accu_adaptor            = $self->adaptor->db->get_AccumulatorAdaptor;
631
    my $max_retry_count         = $current_role->analysis->max_retry_count();  # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
632

633
    $self->adaptor->check_in_worker( $self );
634
    $self->adaptor->safe_synchronize_AnalysisStats( $current_role->analysis->stats );
635 636

    if($self->debug) {
637
        $self->worker_say( 'AnalysisStats : ' . $current_role->analysis->stats->toString );
638
        $self->worker_say( 'claimed '.scalar(@{$jobs}).' jobs to process' );
639
    }
640

641 642
    my $job_partial_timing;

643
    ONE_BATCH: while(my $job = shift @$jobs) {         # to make sure jobs go out of scope without undue delay
644 645

        my $job_id = $job->dbID();
646
        $self->worker_say( $job->toString ) if($self->debug); 
647

648
        my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
649
        $job_partial_timing = {};
650

651
        $self->start_job_output_redirection($job);  # switch logging into job's STDERR
652
        eval {  # capture any throw/die
653 654 655
            my $runnable_object = $self->runnable_object();
            $runnable_object->input_job( $job );    # "take" the job

656
            $job->incomplete(1);
657

658 659
            $job->accu_hash( $accu_adaptor->fetch_structures_for_job_ids( $job_id )->{ $job_id } );

660
            $self->adaptor->db->dbc->query_count(0);
661 662
            $job_stopwatch->restart();

663
            my @params_precedence = (
664
                $runnable_object->param_defaults(),
665
                $self->adaptor->db->get_PipelineWideParametersAdaptor->fetch_param_hash(),
666
                $current_role->analysis->parameters(),
667 668
            );

669 670 671 672
            if( $hive_use_param_stack ) {
                my $input_ids_hash      = $job->adaptor->fetch_input_ids_for_job_ids( $job->param_id_stack, 2, 0 );     # input_ids have lower precedence (FOR EACH ID)
                my $accu_hash           = $accu_adaptor->fetch_structures_for_job_ids( $job->accu_id_stack, 2, 1 );     # accus have higher precedence (FOR EACH ID)
                my %input_id_accu_hash  = ( %$input_ids_hash, %$accu_hash );
673
                push @params_precedence, @input_id_accu_hash{ sort { $a <=> $b } keys %input_id_accu_hash }; # take a slice. Mmm...
674 675 676 677 678 679
            }
            push @params_precedence, $job->input_id(), $job->accu_hash();

            $job->param_init( $runnable_object->strict_hash_format(), @params_precedence );

            $self->worker_say( "Job $job_id unsubstituted_params= ".stringify($job->{'_unsubstituted_param_hash'}) ) if($self->debug());
680

681
            $job_partial_timing = $runnable_object->life_cycle();
682
        };
683 684
        if(my $msg = $@) {
            $job->died_somewhere( $job->incomplete );  # it will be OR'd inside
685
            $self->runnable_object->warning( $msg, $job->incomplete );
686
        }
687

688 689 690
            # whether the job completed successfully or not:
        $self->runnable_object->input_job( undef );   # release an extra reference to the job
        $job->runtime_msec( $job_stopwatch->get_elapsed );
691
        $job->query_count( $self->adaptor->db->dbc->query_count );
692

693
        my $job_completion_line = "Job $job_id : ". ($job->died_somewhere ? 'died' : 'complete' );
694

695
        print STDERR "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->died_somewhere));  # one copy goes to the job's STDERR
696
        $self->stop_job_output_redirection($job);                                                               # and then we switch back to worker's STDERR
697
        $self->worker_say( $job_completion_line );                                                              # one copy goes to the worker's STDERR
698

699
        $self->current_role->register_attempt( ! $job->died_somewhere );
700

701
        if($job->died_somewhere) {
702 703 704
                # If the job specifically said what to do next, respect that last wish.
                # Otherwise follow the default behaviour set by the beekeeper in $worker:
                #
Leo Gordon's avatar
Leo Gordon committed
705 706
            my $may_retry = defined($job->transient_error) ? $job->transient_error : $self->retry_throwing_jobs;

707
            $job->adaptor->release_and_age_job( $job_id, $max_retry_count, $may_retry, $job->runtime_msec );
Leo Gordon's avatar
Leo Gordon committed
708

709 710 711
            if( $self->prev_job_error                # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
             or $job->lethal_for_worker ) {          # trust the job's expert knowledge
                my $reason = $self->prev_job_error            ? 'two failed jobs in a row'
712
                           :                                    'suggested by job itself';
713
                $self->worker_say( "Job's error has contaminated the Worker ($reason), so the Worker will now die" );
714
                $self->cause_of_death('CONTAMINATED');
715
                last ONE_BATCH;
716
            }
717
        } else {    # job successfully completed:
718
            $self->more_work_done( $job_partial_timing );
719
            $jobs_done_here++;
720
            $job->set_and_update_status('DONE');
721 722

            if(my $semaphored_job_id = $job->semaphored_job_id) {
723 724 725
                my $dbc = $self->adaptor->db->dbc;
                $dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($dbc->driver ne 'sqlite');

726 727
                $job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore
            }
728 729

            if($job->lethal_for_worker) {
730
                $self->worker_say( "The Job, although complete, wants the Worker to die" );
731
                $self->cause_of_death('CONTAMINATED');
732
                last ONE_BATCH;
733
            }
734
        }
735

736
        $self->prev_job_error( $job->died_somewhere );
737
        $self->enter_status('READY');
738
    } # /while(my $job = shift @$jobs)
739

740
    return $jobs_done_here;
Jessica Severin's avatar
Jessica Severin committed
741 742 743
}


744 745 746 747 748 749 750 751 752 753 754
sub set_and_update_status {
    my ($self, $status ) = @_;

    $self->status($status);

    if(my $adaptor = $self->adaptor) {
        $adaptor->check_in_worker( $self );
    }
}


755
sub enter_status {
756
    my ($self, $status) = @_;
757 758

    if($self->debug) {
759
        $self->worker_say( '-> '.$status );
760 761
    }

762
    $self->set_and_update_status( $status );
763
}
Jessica Severin's avatar
Jessica Severin committed
764

765

766
sub start_job_output_redirection {
767
    my ($self, $job) = @_;
Jessica Severin's avatar
Jessica Severin committed
768

769 770 771
    if(my $worker_log_dir = $self->log_dir) {
        $self->get_stdout_redirector->push( $job->stdout_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
        $self->get_stderr_redirector->push( $job->stderr_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
772

773 774 775
        if(my $job_adaptor = $job->adaptor) {
            $job_adaptor->store_out_files($job);
        }
776
    }
Jessica Severin's avatar
Jessica Severin committed
777 778 779
}


780
sub stop_job_output_redirection {
781
    my ($self, $job) = @_;
782

783
    if($self->log_dir) {
784 785
        $self->get_stdout_redirector->pop();
        $self->get_stderr_redirector->pop();
786

787
        my $force_cleanup = !($self->debug || $job->died_somewhere);
Jessica Severin's avatar
Jessica Severin committed
788

789
        if($force_cleanup or -z $job->stdout_file) {
790
            $self->worker_say( "Deleting '".$job->stdout_file."' file" );
791
            unlink $job->stdout_file;
792
            $job->stdout_file(undef);
793
        }
794
        if($force_cleanup or -z $job->stderr_file) {
795
            $self->worker_say( "Deleting '".$job->stderr_file."' file" );
796
            unlink $job->stderr_file;
797
            $job->stderr_file(undef);
798
        }
799

800 801 802
        if(my $job_adaptor = $job->adaptor) {
            $job_adaptor->store_out_files($job);
        }
803
    }
Jessica Severin's avatar
Jessica Severin committed
804 805
}

806

Jessica Severin's avatar
Jessica Severin committed
807
1;