Worker.pm 30 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3
=pod 

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::Worker
Jessica Severin's avatar
Jessica Severin committed
6 7

=head1 DESCRIPTION
8

9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
    Object which encapsulates the details of how to find jobs, how to run those
    jobs, and then check the rules to create the next jobs in the chain.
    Essentially knows where to find data, how to process data, and where to
    put it when it is done (put in next persons INBOX) so the next Worker
    in the chain can find data to work on.

    Hive based processing is a concept based on a more controlled version
    of an autonomous agent type system.  Each worker is not told what to do
    (like a centralized control system - like the current pipeline system)
    but rather queries a central database for jobs (give me jobs).

    Each worker is linked to an analysis_id, registers its self on creation
    into the Hive, creates a RunnableDB instance of the Analysis->module,
    gets relevant configuration information from the database, does its
    work, creates the next layer of job entries by interfacing to
    the DataflowRuleAdaptor to determine the analyses it needs to pass its
    output data to and creates jobs on the database of the next analysis.
    It repeats this cycle until it has lived its lifetime or until there are no
    more jobs left to process.
    The lifetime limit is a safety limit to prevent these from 'infecting'
    a system and sitting on a compute node for longer than is socially exceptable.
    This is primarily needed on compute resources like an LSF system where jobs
    are not preempted and run until they are done.

    The Queens primary job is to create Workers to get the work down.
    As part of this, she is also responsible for summarizing the status of the
    analyses by querying the jobs, summarizing, and updating the
    analysis_stats table.  From this she is also responsible for monitoring and
    'unblocking' analyses via the analysis_ctrl_rules.
    The Queen is also responsible for freeing up jobs that were claimed by Workers
    that died unexpectantly so that other workers can take over the work.  

    The Beekeeper is in charge of interfacing between the Queen and a compute resource
    or 'compute farm'.  Its job is to query Queens if they need any workers and to
    send the requested number of workers to open machines via the runWorker.pl script.
    It is also responsible for interfacing with the Queen to identify workers which died
    unexpectantly so that she can free the dead workers unfinished jobs.

=head1 LICENSE

49
    Copyright [1999-2014] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
50 51 52 53 54 55 56 57 58

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
Jessica Severin's avatar
Jessica Severin committed
59 60 61

=head1 CONTACT

62
    Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
63

Jessica Severin's avatar
Jessica Severin committed
64
=head1 APPENDIX
65

66 67
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
68 69 70

=cut

71

Jessica Severin's avatar
Jessica Severin committed
72 73 74
package Bio::EnsEMBL::Hive::Worker;

use strict;
75
use POSIX;
76

77
use Bio::EnsEMBL::Utils::Argument ('rearrange');
78

79 80
use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::Hive::AnalysisStats;
81
use Bio::EnsEMBL::Hive::Extensions;
82
use Bio::EnsEMBL::Hive::Limiter;
83
use Bio::EnsEMBL::Hive::Process;
84
use Bio::EnsEMBL::Hive::DBSQL::AccumulatorAdaptor;
Jessica Severin's avatar
Jessica Severin committed
85
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
86
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
87
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
88
use Bio::EnsEMBL::Hive::Utils::RedirectStack;
89
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
90
use Bio::EnsEMBL::Hive::Utils ('stringify');
91

92 93 94
use base (  'Bio::EnsEMBL::Storable',       # inherit dbID(), adaptor() and new() methods
         );

95

96
sub new {
97 98 99 100
    my $class = shift @_;

    my $self = $class->SUPER::new( @_ );    # deal with Storable stuff

101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
    my($analysis_id, $meadow_type, $meadow_name, $host, $process_id, $resource_class_id, $work_done, $status, $born, $last_check_in, $died, $cause_of_death, $log_dir) =
        rearrange([qw(analysis_id meadow_type meadow_name host process_id resource_class_id work_done status born last_check_in died cause_of_death log_dir) ], @_);

    $self->analysis_id($analysis_id)                if(defined($analysis_id));
    $self->meadow_type($meadow_type)                if(defined($meadow_type));
    $self->meadow_name($meadow_name)                if(defined($meadow_name));
    $self->host($host)                              if(defined($host));
    $self->process_id($process_id)                  if(defined($process_id));
    $self->resource_class_id($resource_class_id)    if(defined($resource_class_id));
    $self->work_done($work_done)                    if(defined($work_done));
    $self->status($status)                          if(defined($status));
    $self->born($born)                              if(defined($born));
    $self->last_check_in($last_check_in)            if(defined($last_check_in));
    $self->died($died)                              if(defined($died));
    $self->cause_of_death($cause_of_death)          if(defined($cause_of_death));
    $self->log_dir($log_dir)                        if(defined($log_dir));
117

118
    return $self;
119
}
Jessica Severin's avatar
Jessica Severin committed
120

121

Jessica Severin's avatar
Jessica Severin committed
122
sub init {
123
    my $self = shift;
124

125 126 127 128
    my $lifespan_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    $lifespan_stopwatch->_unit(1); # count in seconds (default is milliseconds)
    $lifespan_stopwatch->restart;
    $self->lifespan_stopwatch( $lifespan_stopwatch );
129

130
    return $self;
Jessica Severin's avatar
Jessica Severin committed
131 132
}

133

134 135 136 137 138 139 140 141 142 143
## Storable object's getters/setters:


sub analysis_id {
    my $self = shift;
    $self->{'_analysis_id'} = shift if(@_);
    return $self->{'_analysis_id'};
}


144
sub meadow_type {
145 146 147
    my $self = shift;
    $self->{'_meadow_type'} = shift if(@_);
    return $self->{'_meadow_type'};
148
}
149 150


151
sub meadow_name {
152 153 154 155 156 157 158 159 160 161
    my $self = shift;
    $self->{'_meadow_name'} = shift if(@_);
    return $self->{'_meadow_name'};
}


sub host {
    my $self = shift;
    $self->{'_host'} = shift if(@_);
    return $self->{'_host'};
162
}
163 164


165 166 167 168 169 170 171
sub process_id {
    my $self = shift;
    $self->{'_process_id'} = shift if(@_);
    return $self->{'_process_id'};
}


172 173 174 175 176 177 178
sub resource_class_id {
    my $self = shift;
    $self->{'_resource_class_id'} = shift if(@_);
    return $self->{'_resource_class_id'};
}


179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
sub work_done {
    my $self = shift;
    $self->{'_work_done'} = shift if(@_);
    return $self->{'_work_done'} || 0;
}


sub status {
    my $self = shift;
    $self->{'_status'} = shift if(@_);
    return $self->{'_status'};
}


sub born {
    my $self = shift;
    $self->{'_born'} = shift if(@_);
    return $self->{'_born'};
}


sub last_check_in {
    my $self = shift;
    $self->{'_last_check_in'} = shift if(@_);
    return $self->{'_last_check_in'};
}


sub died {
    my $self = shift;
    $self->{'_died'} = shift if(@_);
    return $self->{'_died'};
}


sub cause_of_death {
    my $self = shift;
    $self->{'_cause_of_death'} = shift if(@_);
    return $self->{'_cause_of_death'};
}


=head2 log_dir

  Arg [1] : (optional) string directory path
  Title   : log_dir
  Usage   : $worker_log_dir = $self->log_dir;
            $self->log_dir($worker_log_dir);
  Description: Storable getter/setter attribute for the directory where STDOUT and STRERR of the worker will be redirected to.
               In this directory each job will have its own .out and .err files.
  Returntype : string

=cut

sub log_dir {
    my $self = shift;
    $self->{'_log_dir'} = shift if(@_);
    return $self->{'_log_dir'};
}



## Non-Storable attributes:

243 244 245 246 247 248
sub debug {
  my $self = shift;
  $self->{'_debug'} = shift if(@_);
  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));
  return $self->{'_debug'};
}
249 250


251 252 253 254 255 256
sub execute_writes {
  my $self = shift;
  $self->{'_execute_writes'} = shift if(@_);
  $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
  return $self->{'_execute_writes'};
}
Jessica Severin's avatar
Jessica Severin committed
257

258

259 260 261 262 263 264
sub special_batch {
  my $self = shift;
  $self->{'_special_batch'} = shift if(@_);
  return $self->{'_special_batch'};
}

265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290

sub perform_cleanup {
  my $self = shift;
  $self->{'_perform_cleanup'} = shift if(@_);
  $self->{'_perform_cleanup'} = 1 unless(defined($self->{'_perform_cleanup'}));
  return $self->{'_perform_cleanup'};
}


# this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?

sub retry_throwing_jobs {
    my $self = shift @_;

    $self->{'_retry_throwing_jobs'} = shift @_ if(@_);
    return defined($self->{'_retry_throwing_jobs'}) ? $self->{'_retry_throwing_jobs'} : 1;
}


sub can_respecialize {
    my $self = shift;
    $self->{'_can_respecialize'} = shift if(@_);
    return $self->{'_can_respecialize'};
}


Jessica Severin's avatar
Jessica Severin committed
291
=head2 analysis
292

293
  Arg [1] : (optional) Bio::EnsEMBL::Hive::Analysis $value
Jessica Severin's avatar
Jessica Severin committed
294
  Title   :   analysis
295 296
  Usage   :   $analysis = $self->analysis;
              $self->analysis($analysis);
Jessica Severin's avatar
Jessica Severin committed
297 298
  Description: Get/Set analysis object of this Worker
  DefaultValue : undef
299
  Returntype : Bio::EnsEMBL::Hive::Analysis object
300

Jessica Severin's avatar
Jessica Severin committed
301 302 303
=cut

sub analysis {
304
    my $self = shift @_;
Jessica Severin's avatar
Jessica Severin committed
305

306 307 308 309
    if(@_) {    # setter mode
        $self->{'_analysis'} = shift @_;
    } elsif(! $self->{'_analysis'} ) {
        if(my $analysis_id = $self->analysis_id()) {
310 311
            $self->{'_analysis'} = $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID( $analysis_id )
                or die "Could not fetch analysis for analysis_id=$analysis_id";
312 313 314 315
        } else {
            die "analysis_id not defined, could not fetch Hive::Analysis object";
        }
    }
Jessica Severin's avatar
Jessica Severin committed
316

317
    return $self->{'_analysis'};
Jessica Severin's avatar
Jessica Severin committed
318 319 320 321
}


=head2 life_span
322

Jessica Severin's avatar
Jessica Severin committed
323 324 325 326 327 328 329 330 331
  Arg [1] : (optional) integer $value (in seconds)
  Title   :   life_span
  Usage   :   $value = $self->life_span;
              $self->life_span($new_value);
  Description: Defines the maximum time a worker can live for. Workers are always
               allowed to complete the jobs they get, but whether they can
               do multiple rounds of work is limited by their life_span
  DefaultValue : 3600 (60 minutes)
  Returntype : integer scalar
332

Jessica Severin's avatar
Jessica Severin committed
333 334
=cut

335
sub life_span { # default life_span = 60minutes
336 337 338 339 340 341 342 343
    my ($self, $value) = @_;

    if(defined($value)) {               # you can still set it to 0 and avoid having the limit on lifespan
        $self->{'_life_span'} = $value;
    } elsif(!defined($self->{'_life_span'})) {
        $self->{'_life_span'} = 60*60;
    }
    return $self->{'_life_span'};
Jessica Severin's avatar
Jessica Severin committed
344 345
}

346
sub lifespan_stopwatch {
347 348 349
    my $self = shift @_;

    if(@_) {
350
        $self->{'_lifespan_stopwatch'} = shift @_;
351
    }
352
    return $self->{'_lifespan_stopwatch'};
353 354 355 356 357 358
}

sub life_span_limit_reached {
    my $self = shift @_;

    if( $self->life_span() ) {
359
        my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
360 361 362 363 364 365 366 367
        if($alive_for_secs > $self->life_span() ) {
            return $alive_for_secs;
        }
    }
    return 0;
}


368
=head2 job_limiter
369

370
  Title   :   job_limiter
371
  Arg [1] :   (optional) integer $value
372 373 374 375
  Usage   :   $limiter_obj = $self->job_limiter;
              $self->job_limiter($new_value);
  Description: The maximum number of jobs to be done by the Worker can be limited by the given number.
               A worker 'dies' when either the 'life_span' or 'job_limit' is exceeded.
376
  DefaultValue : undef (relies on life_span to limit life of worker)
377
  Returntype : Hive::Limiter object
378 379 380

=cut

381 382 383
sub job_limiter {
    my $self=shift;
    if( scalar(@_) or !defined($self->{'_job_limiter'}) ) {
384
        $self->{'_job_limiter'} = Bio::EnsEMBL::Hive::Limiter->new("Total number of jobs this Worker is allowed to take", shift @_);
385 386
    }
    return $self->{'_job_limiter'};
387 388
}

389

390
sub more_work_done {
391
    my ($self, $job_partial_timing) = @_;
392

393
    $self->{'_work_done'}++;
394 395 396 397

    while( my ($state, $partial_timing_in_state) = each %$job_partial_timing ) {
        $self->{'_interval_partial_timing'}{$state} += $partial_timing_in_state;
    }
398 399
}

400

401 402 403 404 405 406 407 408 409 410 411
# By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
#
# Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
# but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die.

sub prev_job_error {
    my $self = shift @_;

    $self->{'_prev_job_error'} = shift if(@_);
    return $self->{'_prev_job_error'};
}
412

413 414 415 416 417 418 419
sub runnable_object {
    my $self = shift @_;

    $self->{'_runnable_object'} = shift @_ if(@_);
    return $self->{'_runnable_object'};
}

420

421 422 423 424 425 426 427 428 429 430 431 432
sub get_stdout_redirector {
    my $self = shift;

    return $self->{_stdout_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDOUT);
}

sub get_stderr_redirector {
    my $self = shift;

    return $self->{_stderr_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDERR);
}

433

434 435
sub worker_say {
    my ($self, $msg) = @_;
436

437 438 439
    my $worker_id     = $self->dbID();
    my $analysis_name = $self->analysis_id ? $self->analysis->logic_name.'('.$self->analysis_id.')' : '';
    print "Worker $worker_id [ $analysis_name ] $msg\n";
440 441 442 443 444 445
}


sub toString {
    my $self = shift @_;

446 447 448
    return join(', ',
            'analysis='.($self->analysis_id ? $self->analysis->logic_name.'('.$self->analysis_id.')' : 'UNSPECIALIZED'),
            'resource_class_id='.($self->resource_class_id || 'NULL'),
449 450 451
            'meadow='.$self->meadow_type.'/'.$self->meadow_name,
            'process='.$self->process_id.'@'.$self->host,
            'last_check_in='.$self->last_check_in,
452 453 454 455
            'batch_size='.($self->analysis_id ? $self->analysis->stats->get_or_estimate_batch_size() : 'UNSPECIALIZED'),
            'job_limit='.($self->job_limiter->available_capacity() || 'NONE'),
            'life_span='.($self->life_span || 'UNLIM'),
            'worker_log_dir='.($self->log_dir || 'STDOUT/STDERR'),
456
    );
Jessica Severin's avatar
Jessica Severin committed
457 458
}

459

Jessica Severin's avatar
Jessica Severin committed
460 461 462 463 464
###############################
#
# WORK section
#
###############################
465

Jessica Severin's avatar
Jessica Severin committed
466

467 468 469 470 471 472 473 474 475
=head2 run

  Title   :   run
  Usage   :   $worker->run;
  Description: 
    This is a self looping autonomous function to process jobs.
    First all STDOUT/STDERR is rediected, then looping commences.
    Looping consists of 
      1) claiming jobs,
476 477
      2) processing those jobs through an instance of the 'module class' of 
         the analysis asigned to this worker,  
478
      3) updating the job, analysis_stats, and hive tables to track the 
479 480 481 482 483 484 485 486 487
         progress of the job, the analysis and this worker.
    Looping stops when any one of these are met:
      1) there is no more jobs to process 
      2) job_limit is reached
      3) life_span has been reached.
  Returntype : none

=cut

Leo Gordon's avatar
Leo Gordon committed
488
sub run {
489
    my ($self, $specialization_arglist) = @_;
Jessica Severin's avatar
Jessica Severin committed
490

491 492 493
    if( my $worker_log_dir = $self->log_dir ) {
        $self->get_stdout_redirector->push( $worker_log_dir.'/worker.out' );
        $self->get_stderr_redirector->push( $worker_log_dir.'/worker.err' );
494 495
    }

496 497
    my $min_batch_time  = Bio::EnsEMBL::Hive::AnalysisStats::min_batch_time();
    my $job_adaptor     = $self->adaptor->db->get_AnalysisJobAdaptor;
Leo Gordon's avatar
Leo Gordon committed
498

499 500
    print "\n"; # to clear beekeeper's prompt in case output is not logged
    $self->worker_say( $self->toString() );
501
    $self->specialize_and_compile_wrapper( $specialization_arglist );
502

503
    while (!$self->cause_of_death) {  # Worker's lifespan loop (ends only when the worker dies for any reason)
Jessica Severin's avatar
Jessica Severin committed
504

505 506 507
        my $batches_stopwatch           = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
        my $jobs_done_by_batches_loop   = 0; # by all iterations of internal loop
        $self->{'_interval_partial_timing'} = {};
508

509 510 511 512
        if( my $special_batch = $self->special_batch() ) {
            $jobs_done_by_batches_loop += $self->run_one_batch( $special_batch );
            $self->cause_of_death('JOB_LIMIT');
        } else {    # a proper "BATCHES" loop
513

514
            while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
515

516 517
                if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_worker_id( $self->dbID ) }) ) {
                    my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
518
                    $self->worker_say( $msg );
519 520
                    $self->cause_of_death('CONTAMINATED');
                    $job_adaptor->release_undone_jobs_from_worker($self, $msg);
521

522
                } elsif( $self->job_limiter->reached()) {
523
                    $self->worker_say( "job_limit reached (".$self->work_done." jobs completed)" );
524
                    $self->cause_of_death('JOB_LIMIT');
525

526
                } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
527
                    $self->worker_say( "life_span limit reached (alive for $alive_for_secs secs)" );
528
                    $self->cause_of_death('LIFESPAN');
529 530

                } else {
531 532 533
                    my $desired_batch_size = $self->analysis->stats->get_or_estimate_batch_size();
                    $desired_batch_size = $self->job_limiter->preliminary_offer( $desired_batch_size );

534 535
                    my $workers_rank = $self->adaptor->get_workers_rank( $self );
                    my $actual_batch = $job_adaptor->grab_jobs_for_worker( $self, $desired_batch_size, $workers_rank );
536
                    if(scalar(@$actual_batch)) {
537 538 539
                        my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
                        $jobs_done_by_batches_loop += $jobs_done_by_this_batch;
                        $self->job_limiter->final_decision( $jobs_done_by_this_batch );
540 541 542
                    } else {
                        $self->cause_of_death('NO_WORK');
                    }
543 544 545
                }
            }
        }
546

547
        # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
548
        # so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
549
        #
550 551 552 553 554 555 556 557 558 559 560
        if($jobs_done_by_batches_loop) {

            $self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
                $self->analysis->dbID,
                $jobs_done_by_batches_loop,
                $batches_stopwatch->get_elapsed,
                $self->{'_interval_partial_timing'}{'FETCH_INPUT'}  || 0,
                $self->{'_interval_partial_timing'}{'RUN'}          || 0,
                $self->{'_interval_partial_timing'}{'WRITE_OUTPUT'} || 0,
            );
        }
561

562
            # A mechanism whereby workers can be caused to exit even if they were doing fine:
563
        if (!$self->cause_of_death) {
564 565
            my $stats = $self->analysis->stats;     # make sure it is fresh from the DB
            if( defined($stats->hive_capacity) && (0 <= $stats->hive_capacity) && ($self->adaptor->get_hive_current_load >= 1.1)
566
             or defined($self->analysis->analysis_capacity) && (0 <= $self->analysis->analysis_capacity) && ($self->analysis->analysis_capacity < $stats->num_running_workers)
567 568 569 570
            ) {
                $self->cause_of_death('HIVE_OVERLOAD');
            }
        }
571

572
        if( $self->cause_of_death() =~ /^(NO_WORK|HIVE_OVERLOAD)$/ ) {
573 574 575
            if( $self->cause_of_death() eq 'NO_WORK') {
                $self->adaptor->db->get_AnalysisStatsAdaptor->update_status($self->analysis_id, 'ALL_CLAIMED');
            }
576 577 578
            
            if( $self->can_respecialize and !$specialization_arglist ) {
                $self->cause_of_death(undef);
579
                $self->adaptor->db->get_AnalysisStatsAdaptor->decrease_running_workers($self->analysis->dbID);  # FIXME: tidy up this counting of active roles
580 581 582 583 584
                $self->specialize_and_compile_wrapper();
            }
        }

    }     # /Worker's lifespan loop
585

586 587
        # have runnable clean up any global/process files/data it may have created
    if($self->perform_cleanup) {
588
        if(my $runnable_object = $self->runnable_object()) {    # the temp_directory is actually kept in the Process object:
589 590 591
            $runnable_object->cleanup_worker_temp_directory();
        }
    }
592

593
    $self->adaptor->register_worker_death($self, 1);
594

595
    if($self->debug) {
596
        $self->worker_say( 'AnalysisStats : '.$self->analysis->stats->toString ) if($self->analysis_id());
597 598
        $self->worker_say( 'dbc '.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles' );
    }
599

600
    $self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death  );
601 602 603 604 605

    if( $self->log_dir ) {
        $self->get_stdout_redirector->pop();
        $self->get_stderr_redirector->pop();
    }
606
}
Jessica Severin's avatar
Jessica Severin committed
607

608

609 610 611 612 613 614 615
sub specialize_and_compile_wrapper {
    my ($self, $specialization_arglist) = @_;

    eval {
        $self->enter_status('SPECIALIZATION');
        my $respecialization_from = $self->analysis_id && $self->analysis->logic_name.'('.$self->analysis_id.')';
        $self->adaptor->specialize_new_worker( $self, $specialization_arglist ? @$specialization_arglist : () );
616
        my $specialization_to = $self->analysis->logic_name.'('.$self->analysis_id.')';
617
        if($respecialization_from) {
618 619
            my $msg = "respecializing from $respecialization_from to $specialization_to";
            $self->worker_say( $msg );
620
            $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 0 );
621 622
        } else {
            $self->worker_say( "specializing to $specialization_to" );
623 624 625 626
        }
        1;
    } or do {
        my $msg = $@;
627 628
        chomp $msg;
        $self->worker_say( "[re]specialization failed:\t$msg" );
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
        $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 1 );

        $self->cause_of_death('SEE_MSG') unless($self->cause_of_death());   # some specific causes could have been set prior to die "...";
    };

    if( !$self->cause_of_death() ) {
        eval {
            $self->enter_status('COMPILATION');
            my $runnable_object = $self->analysis->process or die "Unknown compilation error";
            $runnable_object->db( $self->adaptor->db );
            $runnable_object->worker( $self );
            $runnable_object->debug( $self->debug );
            $runnable_object->execute_writes( $self->execute_writes );

            $self->runnable_object( $runnable_object );
            $self->enter_status('READY');

            $self->adaptor->db->dbc->disconnect_when_inactive(0);
            1;
        } or do {
            my $msg = $@;
650
            $self->worker_say( "runnable '".$self->analysis->module."' compilation failed :\t$msg" );
651 652 653 654 655 656 657 658
            $self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 1 );

            $self->cause_of_death('SEE_MSG') unless($self->cause_of_death());   # some specific causes could have been set prior to die "...";
        };
    }
}


659 660
sub run_one_batch {
    my ($self, $jobs) = @_;
Jessica Severin's avatar
Jessica Severin committed
661

662
    my $jobs_done_here = 0;
663

664 665 666
    my $hive_use_param_stack    = $self->adaptor->db->hive_use_param_stack();
    my $accu_adaptor            = $self->adaptor->db->get_AccumulatorAdaptor;
    my $max_retry_count         = $self->analysis->max_retry_count();  # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs
667

668 669
    $self->adaptor->check_in_worker( $self );
    $self->adaptor->safe_synchronize_AnalysisStats($self->analysis->stats);
670 671

    if($self->debug) {
672
        $self->worker_say( 'AnalysisStats : '.$self->analysis->stats->toString );
673
        $self->worker_say( 'claimed '.scalar(@{$jobs}).' jobs to process' );
674
    }
675

676 677
    my $job_partial_timing;

678
    ONE_BATCH: while(my $job = shift @$jobs) {         # to make sure jobs go out of scope without undue delay
679 680

        my $job_id = $job->dbID();
681
        $self->worker_say( $job->toString ) if($self->debug); 
682

683
        my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
684
        $job_partial_timing = {};
685

686
        $self->start_job_output_redirection($job);  # switch logging into job's STDERR
687
        eval {  # capture any throw/die
688
            $job->incomplete(1);
689

690 691
            $job->accu_hash( $accu_adaptor->fetch_structures_for_job_ids( $job_id )->{ $job_id } );

692
            my $runnable_object = $self->runnable_object();
693

694
            $self->adaptor->db->dbc->query_count(0);
695 696
            $job_stopwatch->restart();

697
            my @params_precedence = (
698 699 700 701 702
                $runnable_object->param_defaults(),
                $self->adaptor->db->get_MetaContainer->get_param_hash(),
                $self->analysis->parameters(),
            );

703 704 705 706
            if( $hive_use_param_stack ) {
                my $input_ids_hash      = $job->adaptor->fetch_input_ids_for_job_ids( $job->param_id_stack, 2, 0 );     # input_ids have lower precedence (FOR EACH ID)
                my $accu_hash           = $accu_adaptor->fetch_structures_for_job_ids( $job->accu_id_stack, 2, 1 );     # accus have higher precedence (FOR EACH ID)
                my %input_id_accu_hash  = ( %$input_ids_hash, %$accu_hash );
707
                push @params_precedence, @input_id_accu_hash{ sort { $a <=> $b } keys %input_id_accu_hash }; # take a slice. Mmm...
708 709 710 711 712 713
            }
            push @params_precedence, $job->input_id(), $job->accu_hash();

            $job->param_init( $runnable_object->strict_hash_format(), @params_precedence );

            $self->worker_say( "Job $job_id unsubstituted_params= ".stringify($job->{'_unsubstituted_param_hash'}) ) if($self->debug());
714

715
            $runnable_object->input_job( $job );    # "take" the job
716
            $job_partial_timing = $runnable_object->life_cycle();
717
            $runnable_object->input_job( undef );   # release an extra reference to the job
718 719

            $job->incomplete(0);
720
        };
721
        my $msg_thrown          = $@;
722

723
        $job->runtime_msec( $job_stopwatch->get_elapsed );  # whether successful or not
724
        $job->query_count( $self->adaptor->db->dbc->query_count );
725

726
        my $job_completion_line = "Job $job_id : complete";
727

728 729
        if($msg_thrown) {   # record the message - whether it was a success or failure:
            my $job_status_at_the_moment = $job->status();
Leo Gordon's avatar
Leo Gordon committed
730
            my $action = $job->incomplete ? 'died' : 'exited';
731
            $job_completion_line = "Job $job_id : $action in status '$job_status_at_the_moment' for the following reason: $msg_thrown";
732
            $self->adaptor->db->get_LogMessageAdaptor()->store_job_message($job_id, $msg_thrown, $job->incomplete );
733
        }
734

735
        print STDERR "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->incomplete));      # one copy goes to the job's STDERR
736
        $self->stop_job_output_redirection($job);                                                               # and then we switch back to worker's STDERR
737
        $self->worker_say( $job_completion_line );                                                              # one copy goes to the worker's STDERR
738

739
        if($job->incomplete) {
740 741 742
                # If the job specifically said what to do next, respect that last wish.
                # Otherwise follow the default behaviour set by the beekeeper in $worker:
                #
Leo Gordon's avatar
Leo Gordon committed
743 744
            my $may_retry = defined($job->transient_error) ? $job->transient_error : $self->retry_throwing_jobs;

745
            $job->adaptor->release_and_age_job( $job_id, $max_retry_count, $may_retry, $job->runtime_msec );
Leo Gordon's avatar
Leo Gordon committed
746

747 748 749
            if( $self->prev_job_error                # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
             or $job->lethal_for_worker ) {          # trust the job's expert knowledge
                my $reason = $self->prev_job_error            ? 'two failed jobs in a row'
750
                           :                                    'suggested by job itself';
751
                $self->worker_say( "Job's error has contaminated the Worker ($reason), so the Worker will now die" );
752
                $self->cause_of_death('CONTAMINATED');
753
                last ONE_BATCH;
754
            }
755
        } else {    # job successfully completed:
756
            $self->more_work_done( $job_partial_timing );
757
            $jobs_done_here++;
758
            $job->update_status('DONE');
759 760

            if(my $semaphored_job_id = $job->semaphored_job_id) {
761 762 763
                my $dbc = $self->adaptor->db->dbc;
                $dbc->do( "SELECT 1 FROM job WHERE job_id=$semaphored_job_id FOR UPDATE" ) if($dbc->driver ne 'sqlite');

764 765
                $job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore
            }
766 767

            if($job->lethal_for_worker) {
768
                $self->worker_say( "The Job, although complete, wants the Worker to die" );
769
                $self->cause_of_death('CONTAMINATED');
770
                last ONE_BATCH;
771
            }
772
        }
773 774 775

        $self->prev_job_error( $job->incomplete );
        $self->enter_status('READY');
776
    } # /while(my $job = shift @$jobs)
777

778
    return $jobs_done_here;
Jessica Severin's avatar
Jessica Severin committed
779 780 781
}


782
sub enter_status {
783 784 785
    my ($self, $status, $msg) = @_;

    $msg ||= ": $status";
786 787

    if($self->debug) {
788
        $self->worker_say( $msg );
789 790 791
    }

    $self->status( $status );
792
    $self->adaptor->check_in_worker( $self );
793
}
Jessica Severin's avatar
Jessica Severin committed
794

795

796
sub start_job_output_redirection {
797
    my ($self, $job) = @_;
Jessica Severin's avatar
Jessica Severin committed
798

799 800 801
    if(my $worker_log_dir = $self->log_dir) {
        $self->get_stdout_redirector->push( $job->stdout_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
        $self->get_stderr_redirector->push( $job->stderr_file( $worker_log_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
802

803 804 805
        if(my $job_adaptor = $job->adaptor) {
            $job_adaptor->store_out_files($job);
        }
806
    }
Jessica Severin's avatar
Jessica Severin committed
807 808 809
}


810
sub stop_job_output_redirection {
811
    my ($self, $job) = @_;
812

813
    if($self->log_dir) {
814 815
        $self->get_stdout_redirector->pop();
        $self->get_stderr_redirector->pop();
816

817
        my $force_cleanup = !($self->debug || $job->incomplete);
Jessica Severin's avatar
Jessica Severin committed
818

819
        if($force_cleanup or -z $job->stdout_file) {
820
            $self->worker_say( "Deleting '".$job->stdout_file."' file" );
821
            unlink $job->stdout_file;
822
            $job->stdout_file(undef);
823
        }
824
        if($force_cleanup or -z $job->stderr_file) {
825
            $self->worker_say( "Deleting '".$job->stderr_file."' file" );
826
            unlink $job->stderr_file;
827
            $job->stderr_file(undef);
828
        }
829

830 831 832
        if(my $job_adaptor = $job->adaptor) {
            $job_adaptor->store_out_files($job);
        }
833
    }
Jessica Severin's avatar
Jessica Severin committed
834 835
}

836

Jessica Severin's avatar
Jessica Severin committed
837
1;