Worker.pm 23.9 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3 4 5 6 7 8
#
# You may distribute this module under the same terms as perl itself
#
# POD documentation - main docs before the code

=pod 

=head1 NAME
9

10
  Bio::EnsEMBL::Hive::Worker
Jessica Severin's avatar
Jessica Severin committed
11 12

=head1 DESCRIPTION
13

14 15 16
  Object which encapsulates the details of how to find jobs, how to run those
  jobs, and then check the rules to create the next jobs in the chain.
  Essentially knows where to find data, how to process data, and where to
17
  put it when it is done (put in next persons INBOX) so the next Worker
18 19 20 21 22 23 24 25 26
  in the chain can find data to work on.

  Hive based processing is a concept based on a more controlled version
  of an autonomous agent type system.  Each worker is not told what to do
  (like a centralized control system - like the current pipeline system)
  but rather queries a central database for jobs (give me jobs).

  Each worker is linked to an analysis_id, registers its self on creation
  into the Hive, creates a RunnableDB instance of the Analysis->module,
27
  gets relevant configuration information from the database, does its
28
  work, creates the next layer of analysis_job entries by interfacing to
29 30 31 32 33 34 35 36 37
  the DataflowRuleAdaptor to determine the analyses it needs to pass its
  output data to and creates jobs on the database of the next analysis.
  It repeats this cycle until it has lived its lifetime or until there are no
  more jobs left to process.
  The lifetime limit is a safety limit to prevent these from 'infecting'
  a system and sitting on a compute node for longer than is socially exceptable.
  This is primarily needed on compute resources like an LSF system where jobs
  are not preempted and run until they are done.

38
  The Queens primary job is to create Workers to get the work down.
39 40 41 42 43 44
  As part of this, she is also responsible for summarizing the status of the
  analyses by querying the analysis_jobs, summarizing, and updating the
  analysis_stats table.  From this she is also responsible for monitoring and
  'unblocking' analyses via the analysis_ctrl_rules.
  The Queen is also responsible for freeing up jobs that were claimed by Workers
  that died unexpectantly so that other workers can take over the work.  
45 46

  The Beekeeper is in charge of interfacing between the Queen and a compute resource
47
  or 'compute farm'.  Its job is to query Queens if they need any workers and to
48
  send the requested number of workers to open machines via the runWorker.pl script.
49 50
  It is also responsible for interfacing with the Queen to identify workers which died
  unexpectantly so that she can free the dead workers unfinished jobs.
Jessica Severin's avatar
Jessica Severin committed
51 52 53

=head1 CONTACT

54
  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
55

Jessica Severin's avatar
Jessica Severin committed
56
=head1 APPENDIX
57

58 59
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
60 61 62

=cut

63

Jessica Severin's avatar
Jessica Severin committed
64 65 66
package Bio::EnsEMBL::Hive::Worker;

use strict;
67 68
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
69
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
70
use POSIX;
Jessica Severin's avatar
Jessica Severin committed
71 72 73 74

use Bio::EnsEMBL::Analysis;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
75
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
76
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
Jessica Severin's avatar
Jessica Severin committed
77
use Bio::EnsEMBL::Hive::Extensions;
78
use Bio::EnsEMBL::Hive::Process;
Jessica Severin's avatar
Jessica Severin committed
79

80 81
use Bio::EnsEMBL::Hive::Utils ('dir_revhash');  # import dir_revhash

82 83 84
## Minimum amount of time in msec that a worker should run before reporting
## back to the hive. This is used when setting the batch_size automatically.
## 120000 msec = 2 minutes
85
my $MIN_BATCH_TIME = 2*60*1000;
86

87 88 89 90 91
sub new {
  my ($class,@args) = @_;
  my $self = bless {}, $class;
  return $self;
}
Jessica Severin's avatar
Jessica Severin committed
92 93 94

sub init {
  my $self = shift;
95 96 97 98 99 100

  my $lifespan_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
  $lifespan_stopwatch->_unit(1); # count in seconds (default is milliseconds)
  $lifespan_stopwatch->restart;
  $self->lifespan_stopwatch( $lifespan_stopwatch );

101
  $self->debug(0);
Jessica Severin's avatar
Jessica Severin committed
102 103 104
  return $self;
}

105
sub queen {
Jessica Severin's avatar
Jessica Severin committed
106
  my $self = shift;
107 108
  $self->{'_queen'} = shift if(@_);
  return $self->{'_queen'};
Jessica Severin's avatar
Jessica Severin committed
109 110 111 112 113 114
}
sub db {
  my $self = shift;
  $self->{'_db'} = shift if(@_);
  return $self->{'_db'};
}
115 116 117 118 119
sub beekeeper {
  my $self = shift;
  $self->{'_beekeeper'} = shift if(@_);
  return $self->{'_beekeeper'};
}
120 121 122 123 124 125
sub debug {
  my $self = shift;
  $self->{'_debug'} = shift if(@_);
  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));
  return $self->{'_debug'};
}
126 127 128 129 130 131
sub execute_writes {
  my $self = shift;
  $self->{'_execute_writes'} = shift if(@_);
  $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
  return $self->{'_execute_writes'};
}
Jessica Severin's avatar
Jessica Severin committed
132 133

=head2 analysis
134

Jessica Severin's avatar
Jessica Severin committed
135 136 137 138 139 140 141
  Arg [1] : (optional) Bio::EnsEMBL::Analysis $value
  Title   :   analysis
  Usage   :   $value = $self->analysis;
              $self->analysis($$analysis);
  Description: Get/Set analysis object of this Worker
  DefaultValue : undef
  Returntype : Bio::EnsEMBL::Analysis object
142

Jessica Severin's avatar
Jessica Severin committed
143 144 145 146 147 148 149
=cut

sub analysis {
  my $self = shift;
  my $analysis = shift;

  if(defined($analysis)) {
150
    throw("analysis arg must be a [Bio::EnsEMBL::Analysis] not a [$analysis]")
Jessica Severin's avatar
Jessica Severin committed
151 152 153 154 155 156 157 158 159
       unless($analysis->isa('Bio::EnsEMBL::Analysis'));
    $self->{'_analysis'} = $analysis;
  }

  return $self->{'_analysis'};
}


=head2 life_span
160

Jessica Severin's avatar
Jessica Severin committed
161 162 163 164 165 166 167 168 169
  Arg [1] : (optional) integer $value (in seconds)
  Title   :   life_span
  Usage   :   $value = $self->life_span;
              $self->life_span($new_value);
  Description: Defines the maximum time a worker can live for. Workers are always
               allowed to complete the jobs they get, but whether they can
               do multiple rounds of work is limited by their life_span
  DefaultValue : 3600 (60 minutes)
  Returntype : integer scalar
170

Jessica Severin's avatar
Jessica Severin committed
171 172
=cut

173
sub life_span { # default life_span = 60minutes
174 175 176 177 178 179 180 181
    my ($self, $value) = @_;

    if(defined($value)) {               # you can still set it to 0 and avoid having the limit on lifespan
        $self->{'_life_span'} = $value;
    } elsif(!defined($self->{'_life_span'})) {
        $self->{'_life_span'} = 60*60;
    }
    return $self->{'_life_span'};
Jessica Severin's avatar
Jessica Severin committed
182 183
}

184
sub lifespan_stopwatch {
185 186 187
    my $self = shift @_;

    if(@_) {
188
        $self->{'_lifespan_stopwatch'} = shift @_;
189
    }
190
    return $self->{'_lifespan_stopwatch'};
191 192 193 194 195 196
}

sub life_span_limit_reached {
    my $self = shift @_;

    if( $self->life_span() ) {
197
        my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
198 199 200 201 202 203 204 205 206
        if($alive_for_secs > $self->life_span() ) {
            return $alive_for_secs;
        }
    }
    return 0;
}



207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
=head2 job_limit

  Title   :   job_limit
  Arg [1] :   (optional) integer $value
  Usage   :   $value = $self->job_limit;
              $self->job_limit($new_value);
  Description: Defines the maximum number of jobs a worker can process 
               before it needs to die. A worker 'dies' when either the 
               'life_span' or 'job_limit' is exceeded.
  DefaultValue : undef (relies on life_span to limit life of worker)
  Returntype : integer scalar

=cut

sub job_limit {
  my $self=shift;
223
  $self->{'_job_limit'}=shift if(@_);
224 225 226
  return $self->{'_job_limit'};
}

227
sub work_done {
228 229 230
  my $self = shift @_;

  if(@_) {
231
    $self->{'work_done'} = shift @_;
232
  }
233
  return $self->{'work_done'} || 0;
234 235
}

236
sub more_work_done {
237 238
  my $self = shift @_;

239
  $self->{'work_done'}++;
240 241 242 243 244
}

sub job_limit_reached {
    my $self = shift @_;

245 246
    if($self->job_limit and $self->work_done >= $self->job_limit) { 
        return $self->work_done;
247 248 249 250
    }
    return 0;
}

251 252 253 254 255 256 257 258 259 260 261
# By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
#
# Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
# but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die.

sub prev_job_error {
    my $self = shift @_;

    $self->{'_prev_job_error'} = shift if(@_);
    return $self->{'_prev_job_error'};
}
262 263


Leo Gordon's avatar
Leo Gordon committed
264
sub worker_id {
Jessica Severin's avatar
Jessica Severin committed
265
  my( $self, $value ) = @_;
Leo Gordon's avatar
Leo Gordon committed
266 267
  $self->{'_worker_id'} = $value if($value);
  return $self->{'_worker_id'};
Jessica Severin's avatar
Jessica Severin committed
268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
}

sub host {
  my( $self, $value ) = @_;
  $self->{'_host'} = $value if($value);
  return $self->{'_host'};
}

sub process_id {
  my( $self, $value ) = @_;
  $self->{'_ppid'} = $value if($value);
  return $self->{'_ppid'};
}

sub cause_of_death {
  my( $self, $value ) = @_;
  $self->{'_cause_of_death'} = $value if($value);
  return $self->{'_cause_of_death'};
}

288 289 290 291 292 293
sub status {
  my( $self, $value ) = @_;
  $self->{'_status'} = $value if($value);
  return $self->{'_status'};
}

Jessica Severin's avatar
Jessica Severin committed
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
sub born {
  my( $self, $value ) = @_;
  $self->{'_born'} = $value if($value);
  return $self->{'_born'};
}

sub died {
  my( $self, $value ) = @_;
  $self->{'_died'} = $value if($value);
  return $self->{'_died'};
}

sub last_check_in {
  my( $self, $value ) = @_;
  $self->{'_last_check_in'} = $value if($value);
  return $self->{'_last_check_in'};
}

312 313 314 315 316 317 318 319
# this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?

sub retry_throwing_jobs {
  my( $self, $value ) = @_;
  $self->{'_retry_throwing_jobs'} = $value if($value);
  return $self->{'_retry_throwing_jobs'} || 0;
}

320
=head2 hive_output_dir
321

Jessica Severin's avatar
Jessica Severin committed
322
  Arg [1] : (optional) string directory path
323 324 325 326 327 328
  Title   :   hive_output_dir
  Usage   :   $hive_output_dir = $self->hive_output_dir;
              $self->hive_output_dir($hive_output_dir);
  Description: getter/setter for the directory where STDOUT and STRERR of the hive will be redirected to.
          If it is "true", each worker will create its own subdirectory in it
          where each analysis_job will have its own .out and .err files.
Jessica Severin's avatar
Jessica Severin committed
329
  Returntype : string
330

Jessica Severin's avatar
Jessica Severin committed
331 332
=cut

333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
sub hive_output_dir {
    my $self = shift @_;

    $self->{'_hive_output_dir'} = shift @_ if(@_);
    return $self->{'_hive_output_dir'};
}

sub worker_output_dir {
    my $self = shift @_;

    if((my $worker_output_dir = $self->{'_worker_output_dir'}) and not @_) { # no need to set, just return:

        return $worker_output_dir;

    } else { # let's try to set first:
    
        if(@_) { # setter mode ignores hive_output_dir

            $worker_output_dir = shift @_;

        } elsif( my $hive_output_dir = $self->hive_output_dir ) {

            my $worker_id = $self->worker_id();

357
            $worker_output_dir = join('/', $hive_output_dir, dir_revhash($worker_id), 'worker_id_'.$worker_id );
358 359 360 361 362 363 364 365 366
        }

        if($worker_output_dir) { # will not attempt to create if set to false
            system("mkdir -p $worker_output_dir") && die "Could not create '$worker_output_dir' because: $!";
        }

        $self->{'_worker_output_dir'} = $worker_output_dir;
    }
    return $self->{'_worker_output_dir'};
Jessica Severin's avatar
Jessica Severin committed
367 368
}

369

370 371 372 373 374 375 376
sub perform_global_cleanup {
  my $self = shift;
  $self->{'_perform_global_cleanup'} = shift if(@_);
  $self->{'_perform_global_cleanup'} = 1 unless(defined($self->{'_perform_global_cleanup'}));
  return $self->{'_perform_global_cleanup'};
}

Jessica Severin's avatar
Jessica Severin committed
377 378
sub print_worker {
  my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
379
  print("WORKER: worker_id=",$self->worker_id,
Jessica Severin's avatar
Jessica Severin committed
380 381
     " analysis_id=(",$self->analysis->dbID,")",$self->analysis->logic_name,
     " host=",$self->host,
382
     " pid=",$self->process_id,
Jessica Severin's avatar
Jessica Severin committed
383 384 385 386
     "\n");
  print("  batch_size = ", $self->batch_size,"\n");
  print("  job_limit  = ", $self->job_limit,"\n") if(defined($self->job_limit));
  print("  life_span  = ", $self->life_span,"\n") if(defined($self->life_span));
387 388
  if(my $worker_output_dir = $self->worker_output_dir) {
    print("  worker_output_dir = $worker_output_dir\n");
389
  } else {
390
    print("  worker_output_dir = STDOUT/STDERR\n");
391
  }
Jessica Severin's avatar
Jessica Severin committed
392 393
}

394 395 396 397 398 399

sub worker_process_temp_directory {
  my $self = shift;
  
  unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
    #create temp directory to hold fasta databases
400 401
    my $username = $ENV{'USER'};
    my $worker_id = $self->worker_id();
Leo Gordon's avatar
Leo Gordon committed
402
    $self->{'_tmp_dir'} = "/tmp/worker_${username}.${worker_id}/";
403
    mkdir($self->{'_tmp_dir'}, 0777);
404
    throw("unable to create a writable directory ".$self->{'_tmp_dir'}) unless(-w $self->{'_tmp_dir'});
405 406 407 408 409 410 411 412 413 414 415 416 417
  }
  return $self->{'_tmp_dir'};
}


sub cleanup_worker_process_temp_directory {
  my $self = shift;
  if($self->{'_tmp_dir'}) {
    my $cmd = "rm -r ". $self->{'_tmp_dir'};
    system($cmd);
  }
}

Jessica Severin's avatar
Jessica Severin committed
418 419 420 421 422
###############################
#
# WORK section
#
###############################
423

Jessica Severin's avatar
Jessica Severin committed
424
=head2 batch_size
425

426
  Args    :   none
Jessica Severin's avatar
Jessica Severin committed
427 428 429 430 431 432 433
  Title   :   batch_size
  Usage   :   $value = $self->batch_size;
              $self->batch_size($new_value);
  Description: Defines the number of jobs that should run in batch
               before querying the database for the next job batch.  Used by the
               Hive system to manage the number of workers needed to complete a
               particular job type.
434
  DefaultValue : batch_size of analysis
Jessica Severin's avatar
Jessica Severin committed
435
  Returntype : integer scalar
436

Jessica Severin's avatar
Jessica Severin committed
437 438
=cut

439 440 441 442 443 444 445 446
sub set_worker_batch_size {
  my $self = shift;
  my $batch_size = shift;
  if(defined($batch_size)) {
    $self->{'_batch_size'} = $batch_size;
  }
}

Jessica Severin's avatar
Jessica Severin committed
447 448
sub batch_size {
  my $self = shift;
449

450 451
  my $stats = $self->analysis->stats;
  my $batch_size = $stats->batch_size;
452 453 454 455
  if(defined($self->{'_batch_size'})) {
    $batch_size = $self->{'_batch_size'};
  } 
    
456
  if(($batch_size <= 0) and ($stats->avg_msec_per_job)) {
457
    $batch_size = POSIX::ceil($MIN_BATCH_TIME / $stats->avg_msec_per_job); # num jobs in $MIN_BATCH_TIME msecs
458
  }
459 460
  $batch_size = 1 if($batch_size < 1); # make sure we grab at least one job
  
461
  if($self->job_limit and ($self->job_limit < $batch_size)) {
462 463 464
    $batch_size = $self->job_limit;
  }
  return $batch_size;
Jessica Severin's avatar
Jessica Severin committed
465 466 467
}


468 469 470 471 472 473 474 475 476
=head2 run

  Title   :   run
  Usage   :   $worker->run;
  Description: 
    This is a self looping autonomous function to process jobs.
    First all STDOUT/STDERR is rediected, then looping commences.
    Looping consists of 
      1) claiming jobs,
477 478
      2) processing those jobs through an instance of the 'module class' of 
         the analysis asigned to this worker,  
479 480 481 482 483 484 485 486 487 488
      3) updating the analysis_job, analysis_stats, and hive tables to track the 
         progress of the job, the analysis and this worker.
    Looping stops when any one of these are met:
      1) there is no more jobs to process 
      2) job_limit is reached
      3) life_span has been reached.
  Returntype : none

=cut

Leo Gordon's avatar
Leo Gordon committed
489
sub run {
Jessica Severin's avatar
Jessica Severin committed
490
  my $self = shift;
491
  my $specific_job = $self->_specific_job;
Jessica Severin's avatar
Jessica Severin committed
492

493
  if( my $worker_output_dir = $self->worker_output_dir ) {
Jessica Severin's avatar
Jessica Severin committed
494 495
    open OLDOUT, ">&STDOUT";
    open OLDERR, ">&STDERR";
496 497
    open WORKER_STDOUT, ">${worker_output_dir}/worker.out";
    open WORKER_STDERR, ">${worker_output_dir}/worker.err";
Jessica Severin's avatar
Jessica Severin committed
498 499 500 501 502
    close STDOUT;
    close STDERR;
    open STDOUT, ">&WORKER_STDOUT";
    open STDERR, ">&WORKER_STDERR";
  }
503
  $self->print_worker();
Jessica Severin's avatar
Jessica Severin committed
504

505
  $self->db->dbc->disconnect_when_inactive(0);
Jessica Severin's avatar
Jessica Severin committed
506

Leo Gordon's avatar
Leo Gordon committed
507 508
  my $max_retry_count = $self->analysis->stats->max_retry_count();  # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs

509
  do { # Worker's lifespan loop (ends only when the worker dies)
510 511 512 513 514
    my $batches_stopwatch           = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
    $self->{'fetching_stopwatch'}   = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    $self->{'running_stopwatch'}    = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    $self->{'writing_stopwatch'}    = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    my $jobs_done_by_batches_loop   = 0; # by all iterations of internal loop
Jessica Severin's avatar
Jessica Severin committed
515

516
    BATCHES: {  # note: in order to label a do{} loop you have to enclose it in an extra block
517
    do {    # Worker's "batches loop" exists to prevent logging the status too frequently.
518
                     # If a batch took less than $MIN_BATCH_TIME to run, the Worker keeps taking&running more batches.
519 520 521

      my $jobs = $specific_job
        ? [ $self->queen->worker_reclaim_job($self,$specific_job) ]
Leo Gordon's avatar
Leo Gordon committed
522
        : $self->db->get_AnalysisJobAdaptor->grab_jobs_for_worker( $self );
Jessica Severin's avatar
Jessica Severin committed
523

524
      $self->queen->worker_check_in($self); #will sync analysis_stats if needed
Jessica Severin's avatar
Jessica Severin committed
525

526
      $self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
527

528 529 530 531 532 533 534 535
      if($self->debug) {
        $self->analysis->stats->print_stats;
        print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n");
      }

      foreach my $job (@{$jobs}) {
        $job->print_job if($self->debug); 

536
        $self->start_job_output_redirection($job);
537
        eval {  # capture any throw/die
538 539
            $self->run_module_with_job($job);
        };
540
        my $msg_thrown = $@;
541 542
        $self->stop_job_output_redirection($job);

543 544 545 546 547 548
        if($msg_thrown) {   # record the message - whether it was a success or failure:
            my $job_id                   = $job->dbID();
            my $job_status_at_the_moment = $job->status();
            warn "Job with id=$job_id died in status '$job_status_at_the_moment' for the following reason: $msg_thrown\n";
            $self->db()->get_JobMessageAdaptor()->register_message($job_id, $msg_thrown, $job->incomplete );
        }
549

550
        if($job->incomplete) {
551 552 553
                # If the job specifically said what to do next, respect that last wish.
                # Otherwise follow the default behaviour set by the beekeeper in $worker:
                #
Leo Gordon's avatar
Leo Gordon committed
554 555 556 557 558
            my $may_retry = defined($job->transient_error) ? $job->transient_error : $self->retry_throwing_jobs;

            $job->adaptor->release_and_age_job( $job->dbID, $max_retry_count, $may_retry );

            if($self->status eq 'COMPILATION'       # if it failed to compile, there is no point in continuing as the code WILL be broken
559 560
            or $self->prev_job_error                # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
            or $job->lethal_for_worker ) {          # trust the job's expert knowledge
Leo Gordon's avatar
Leo Gordon committed
561
                my $reason = ($self->status eq 'COMPILATION') ? 'compilation error'
562 563 564
                           : $self->prev_job_error           ? 'two failed jobs in a row'
                           :                                   'suggested by job itself';
                warn "Job's error has contaminated the Worker ($reason), so the Worker will now die\n";
565 566 567
                $self->cause_of_death('CONTAMINATED');
                last BATCHES;
            }
568 569
        } else {    # job successfully completed:

570
            if(my $semaphored_job_id = $job->semaphored_job_id) {
571
                $job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore
572 573
            }
            $self->more_work_done;
574 575
            $jobs_done_by_batches_loop++;
            $job->update_status('DONE');
576
        }
577 578 579

        $self->prev_job_error( $job->incomplete );
        $self->enter_status('READY');
580
      }
581 582 583 584 585 586 587 588 589

      if( $specific_job ) {
            $self->cause_of_death('JOB_LIMIT'); 
      } elsif( my $jobs_completed = $self->job_limit_reached()) {
            print "job_limit reached (completed $jobs_completed jobs)\n";
            $self->cause_of_death('JOB_LIMIT'); 
      } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
            print "life_span limit reached (alive for $alive_for_secs secs)\n";
            $self->cause_of_death('LIFESPAN'); 
Leo Gordon's avatar
Leo Gordon committed
590
      }
591
    } while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $MIN_BATCH_TIME);
592
    } # this is the extra block enclosing a labelled do{} loop
593

594 595 596
        # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
        # so they are not allowed to happen too frequently (not before $MIN_BATCH_TIME of work has been done)
        #
597 598 599 600 601 602 603 604
    $self->db->get_AnalysisStatsAdaptor->interval_update_work_done(
        $self->analysis->dbID,
        $jobs_done_by_batches_loop,
        $batches_stopwatch->get_elapsed,
        $self->{'fetching_stopwatch'}->get_elapsed,
        $self->{'running_stopwatch'}->get_elapsed,
        $self->{'writing_stopwatch'}->get_elapsed,
    );
605

606 607
    if (!$self->cause_of_death
    and $self->analysis->stats->hive_capacity >= 0
608 609 610
    and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity
    and $self->analysis->stats->adaptor->decrease_running_workers_on_hive_overload( $self->analysis->dbID ) # careful with order, this operation has side-effect
    ) {
611 612
        $self->cause_of_death('HIVE_OVERLOAD');
    }
613
  } while (!$self->cause_of_death); # /Worker's lifespan loop
Leo Gordon's avatar
Leo Gordon committed
614

615 616
  if($self->perform_global_cleanup) {
    #have runnable cleanup any global/process files/data it may have created
617
    $self->cleanup_worker_process_temp_directory;
618
  }
Jessica Severin's avatar
Jessica Severin committed
619

620
  $self->queen->register_worker_death($self);
Jessica Severin's avatar
Jessica Severin committed
621

622 623
  $self->analysis->stats->print_stats if($self->debug);

624
  printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count);
625
  print("total jobs completed : ", $self->work_done, "\n");
626
  
627
  if( $self->worker_output_dir() ) {
Jessica Severin's avatar
Jessica Severin committed
628 629 630 631 632 633 634 635 636 637
    close STDOUT;
    close STDERR;
    close WORKER_STDOUT;
    close WORKER_STDERR;
    open STDOUT, ">&", \*OLDOUT;
    open STDERR, ">&", \*OLDERR;
  }
}


638 639
sub run_module_with_job {
  my ($self, $job) = @_;
Jessica Severin's avatar
Jessica Severin committed
640

641
  $job->incomplete(1);
642
  $job->autoflow(1);
643

644 645 646
  $self->enter_status('COMPILATION');
  $job->update_status('COMPILATION');
  my $runObj = $self->analysis->process or die "Unknown compilation error";
Jessica Severin's avatar
Jessica Severin committed
647
  
648
  my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
649 650
  $self->queen->dbc->query_count(0);

651
  #pass the input_id from the job into the Process object
652
  if( $runObj->isa('Bio::EnsEMBL::Hive::Process') ) {
653 654
    $runObj->input_job($job);
    $runObj->queen($self->queen);
655 656
    $runObj->worker($self);
    $runObj->debug($self->debug);
657 658 659

    $job->param_init( $runObj->strict_hash_format(), $runObj->param_defaults(), $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() );

660 661 662
  } else {
    $runObj->input_id($job->input_id);
    $runObj->db($self->db);
663 664

    $job->param_init( 0, $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() ); # Well, why not?
665
  }
666

667 668 669
    $self->enter_status('GET_INPUT');
    $job->update_status('GET_INPUT');
    print("\nGET_INPUT\n") if($self->debug); 
Leo Gordon's avatar
Leo Gordon committed
670

671 672 673
    $self->{'fetching_stopwatch'}->continue();
    $runObj->fetch_input;
    $self->{'fetching_stopwatch'}->pause();
Leo Gordon's avatar
Leo Gordon committed
674

675 676 677
    $self->enter_status('RUN');
    $job->update_status('RUN');
    print("\nRUN\n") if($self->debug); 
Leo Gordon's avatar
Leo Gordon committed
678

679 680 681
    $self->{'running_stopwatch'}->continue();
    $runObj->run;
    $self->{'running_stopwatch'}->pause();
Leo Gordon's avatar
Leo Gordon committed
682

683 684 685 686
    if($self->execute_writes) {
        $self->enter_status('WRITE_OUTPUT');
        $job->update_status('WRITE_OUTPUT');
        print("\nWRITE_OUTPUT\n") if($self->debug); 
Leo Gordon's avatar
Leo Gordon committed
687

688 689 690
        $self->{'writing_stopwatch'}->continue();
        $runObj->write_output;
        $self->{'writing_stopwatch'}->pause();
Leo Gordon's avatar
Leo Gordon committed
691

692
        if( $job->autoflow ) {
693
            printf("AUTOFLOW input->output\n") if($self->debug);
694
            $job->dataflow_output_id();
Leo Gordon's avatar
Leo Gordon committed
695
        }
696 697
    } else {
        print("\n\n!!!! NOT write_output\n\n\n") if($self->debug); 
698
    }
699

Leo Gordon's avatar
Leo Gordon committed
700
    $job->query_count($self->queen->dbc->query_count);
701
    $job->runtime_msec( $job_stopwatch->get_elapsed );
702

703
    $job->incomplete(0);
Jessica Severin's avatar
Jessica Severin committed
704 705
}

706 707 708 709
sub enter_status {
  my ($self, $status) = @_;
  return $self->queen->enter_status($self, $status);
}
Jessica Severin's avatar
Jessica Severin committed
710

711
sub start_job_output_redirection {
712 713 714 715
    my $self = shift;
    my $job  = shift or return;

    my $job_adaptor = $job->adaptor or return;
Jessica Severin's avatar
Jessica Severin committed
716

717
    if( my $worker_output_dir = $self->worker_output_dir ) {
Jessica Severin's avatar
Jessica Severin committed
718

719 720
        $job->stdout_file( $worker_output_dir . '/job_id_' . $job->dbID . '.out' );
        $job->stderr_file( $worker_output_dir . '/job_id_' . $job->dbID . '.err' );
Jessica Severin's avatar
Jessica Severin committed
721

722 723
        close STDOUT;
        open STDOUT, ">".$job->stdout_file;
Jessica Severin's avatar
Jessica Severin committed
724

725 726
        close STDERR;
        open STDERR, ">".$job->stderr_file;
727

728 729
        $job_adaptor->store_out_files($job);
    }
Jessica Severin's avatar
Jessica Severin committed
730 731 732
}


733
sub stop_job_output_redirection {
734 735
    my $self = shift;
    my $job  = shift or return;
Jessica Severin's avatar
Jessica Severin committed
736

737
    my $job_adaptor = $job->adaptor or return;
738

739
    if( $self->worker_output_dir ) {
740

741 742 743
        # the following flushes $job->stderr_file and $job->stdout_file
        open STDOUT, ">&WORKER_STDOUT";
        open STDERR, ">&WORKER_STDERR";
Jessica Severin's avatar
Jessica Severin committed
744

745 746 747 748 749 750 751 752
        if(-z $job->stdout_file) {
            unlink $job->stdout_file;
            $job->stdout_file('');
        }
        if(-z $job->stderr_file) {
            unlink $job->stderr_file;
            $job->stderr_file('');
        }
753

754 755
        $job_adaptor->store_out_files($job);
    }
Jessica Severin's avatar
Jessica Severin committed
756 757
}

758 759 760 761 762
sub _specific_job {
  my $self = shift;
  $self->{'_specific_job'} = shift if(@_);
  return $self->{'_specific_job'};
}
763

Jessica Severin's avatar
Jessica Severin committed
764
1;