Worker.pm 24.7 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3 4 5 6 7 8
#
# You may distribute this module under the same terms as perl itself
#
# POD documentation - main docs before the code

=pod 

=head1 NAME
9

10
  Bio::EnsEMBL::Hive::Worker
Jessica Severin's avatar
Jessica Severin committed
11 12

=head1 DESCRIPTION
13

14 15 16
  Object which encapsulates the details of how to find jobs, how to run those
  jobs, and then check the rules to create the next jobs in the chain.
  Essentially knows where to find data, how to process data, and where to
17
  put it when it is done (put in next persons INBOX) so the next Worker
18 19 20 21 22 23 24 25 26
  in the chain can find data to work on.

  Hive based processing is a concept based on a more controlled version
  of an autonomous agent type system.  Each worker is not told what to do
  (like a centralized control system - like the current pipeline system)
  but rather queries a central database for jobs (give me jobs).

  Each worker is linked to an analysis_id, registers its self on creation
  into the Hive, creates a RunnableDB instance of the Analysis->module,
27
  gets relevant configuration information from the database, does its
28
  work, creates the next layer of job entries by interfacing to
29 30 31 32 33 34 35 36 37
  the DataflowRuleAdaptor to determine the analyses it needs to pass its
  output data to and creates jobs on the database of the next analysis.
  It repeats this cycle until it has lived its lifetime or until there are no
  more jobs left to process.
  The lifetime limit is a safety limit to prevent these from 'infecting'
  a system and sitting on a compute node for longer than is socially exceptable.
  This is primarily needed on compute resources like an LSF system where jobs
  are not preempted and run until they are done.

38
  The Queens primary job is to create Workers to get the work down.
39
  As part of this, she is also responsible for summarizing the status of the
40
  analyses by querying the jobs, summarizing, and updating the
41 42 43 44
  analysis_stats table.  From this she is also responsible for monitoring and
  'unblocking' analyses via the analysis_ctrl_rules.
  The Queen is also responsible for freeing up jobs that were claimed by Workers
  that died unexpectantly so that other workers can take over the work.  
45 46

  The Beekeeper is in charge of interfacing between the Queen and a compute resource
47
  or 'compute farm'.  Its job is to query Queens if they need any workers and to
48
  send the requested number of workers to open machines via the runWorker.pl script.
49 50
  It is also responsible for interfacing with the Queen to identify workers which died
  unexpectantly so that she can free the dead workers unfinished jobs.
Jessica Severin's avatar
Jessica Severin committed
51 52 53

=head1 CONTACT

54
  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
55

Jessica Severin's avatar
Jessica Severin committed
56
=head1 APPENDIX
57

58 59
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
60 61 62

=cut

63

Jessica Severin's avatar
Jessica Severin committed
64 65 66
package Bio::EnsEMBL::Hive::Worker;

use strict;
67 68
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
69
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
70
use POSIX;
Jessica Severin's avatar
Jessica Severin committed
71 72 73 74

use Bio::EnsEMBL::Analysis;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
75
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
76
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
Jessica Severin's avatar
Jessica Severin committed
77
use Bio::EnsEMBL::Hive::Extensions;
78
use Bio::EnsEMBL::Hive::Process;
Jessica Severin's avatar
Jessica Severin committed
79

80
use Bio::EnsEMBL::Hive::Utils::RedirectStack;
81 82
use Bio::EnsEMBL::Hive::Utils ('dir_revhash');  # import dir_revhash

83

84 85 86 87 88
sub new {
  my ($class,@args) = @_;
  my $self = bless {}, $class;
  return $self;
}
Jessica Severin's avatar
Jessica Severin committed
89 90 91

sub init {
  my $self = shift;
92 93 94 95 96 97

  my $lifespan_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
  $lifespan_stopwatch->_unit(1); # count in seconds (default is milliseconds)
  $lifespan_stopwatch->restart;
  $self->lifespan_stopwatch( $lifespan_stopwatch );

98
  $self->debug(0);
Jessica Severin's avatar
Jessica Severin committed
99 100 101
  return $self;
}

102
sub queen {
Jessica Severin's avatar
Jessica Severin committed
103
  my $self = shift;
104 105
  $self->{'_queen'} = shift if(@_);
  return $self->{'_queen'};
Jessica Severin's avatar
Jessica Severin committed
106 107 108 109 110 111
}
sub db {
  my $self = shift;
  $self->{'_db'} = shift if(@_);
  return $self->{'_db'};
}
112
sub meadow_type {
113
  my $self = shift;
114 115
  $self->{'_meadow_type'} = shift if(@_);
  return $self->{'_meadow_type'};
116
}
117 118 119 120 121
sub meadow_name {
  my $self = shift;
  $self->{'_meadow_name'} = shift if(@_);
  return $self->{'_meadow_name'};
}
122 123 124 125 126 127
sub debug {
  my $self = shift;
  $self->{'_debug'} = shift if(@_);
  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));
  return $self->{'_debug'};
}
128 129 130 131 132 133
sub execute_writes {
  my $self = shift;
  $self->{'_execute_writes'} = shift if(@_);
  $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
  return $self->{'_execute_writes'};
}
Jessica Severin's avatar
Jessica Severin committed
134 135

=head2 analysis
136

Jessica Severin's avatar
Jessica Severin committed
137 138 139 140 141 142 143
  Arg [1] : (optional) Bio::EnsEMBL::Analysis $value
  Title   :   analysis
  Usage   :   $value = $self->analysis;
              $self->analysis($$analysis);
  Description: Get/Set analysis object of this Worker
  DefaultValue : undef
  Returntype : Bio::EnsEMBL::Analysis object
144

Jessica Severin's avatar
Jessica Severin committed
145 146 147 148 149 150 151
=cut

sub analysis {
  my $self = shift;
  my $analysis = shift;

  if(defined($analysis)) {
152
    throw("analysis arg must be a [Bio::EnsEMBL::Analysis] not a [$analysis]")
Jessica Severin's avatar
Jessica Severin committed
153 154 155 156 157 158 159 160 161
       unless($analysis->isa('Bio::EnsEMBL::Analysis'));
    $self->{'_analysis'} = $analysis;
  }

  return $self->{'_analysis'};
}


=head2 life_span
162

Jessica Severin's avatar
Jessica Severin committed
163 164 165 166 167 168 169 170 171
  Arg [1] : (optional) integer $value (in seconds)
  Title   :   life_span
  Usage   :   $value = $self->life_span;
              $self->life_span($new_value);
  Description: Defines the maximum time a worker can live for. Workers are always
               allowed to complete the jobs they get, but whether they can
               do multiple rounds of work is limited by their life_span
  DefaultValue : 3600 (60 minutes)
  Returntype : integer scalar
172

Jessica Severin's avatar
Jessica Severin committed
173 174
=cut

175
sub life_span { # default life_span = 60minutes
176 177 178 179 180 181 182 183
    my ($self, $value) = @_;

    if(defined($value)) {               # you can still set it to 0 and avoid having the limit on lifespan
        $self->{'_life_span'} = $value;
    } elsif(!defined($self->{'_life_span'})) {
        $self->{'_life_span'} = 60*60;
    }
    return $self->{'_life_span'};
Jessica Severin's avatar
Jessica Severin committed
184 185
}

186
sub lifespan_stopwatch {
187 188 189
    my $self = shift @_;

    if(@_) {
190
        $self->{'_lifespan_stopwatch'} = shift @_;
191
    }
192
    return $self->{'_lifespan_stopwatch'};
193 194 195 196 197 198
}

sub life_span_limit_reached {
    my $self = shift @_;

    if( $self->life_span() ) {
199
        my $alive_for_secs = $self->lifespan_stopwatch->get_elapsed;
200 201 202 203 204 205 206 207 208
        if($alive_for_secs > $self->life_span() ) {
            return $alive_for_secs;
        }
    }
    return 0;
}



209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
=head2 job_limit

  Title   :   job_limit
  Arg [1] :   (optional) integer $value
  Usage   :   $value = $self->job_limit;
              $self->job_limit($new_value);
  Description: Defines the maximum number of jobs a worker can process 
               before it needs to die. A worker 'dies' when either the 
               'life_span' or 'job_limit' is exceeded.
  DefaultValue : undef (relies on life_span to limit life of worker)
  Returntype : integer scalar

=cut

sub job_limit {
  my $self=shift;
225
  $self->{'_job_limit'}=shift if(@_);
226 227 228
  return $self->{'_job_limit'};
}

229
sub work_done {
230 231 232
  my $self = shift @_;

  if(@_) {
233
    $self->{'work_done'} = shift @_;
234
  }
235
  return $self->{'work_done'} || 0;
236 237
}

238
sub more_work_done {
239 240
  my $self = shift @_;

241
  $self->{'work_done'}++;
242 243
}

244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259

sub next_batch_size {
    my $self = shift @_;

    my $batch_size = $self->analysis->stats->get_or_estimate_batch_size();

    if(my $job_limit = $self->job_limit()) {               # if job_limit is set, it may influence batch_size
        my $jobs_to_do = $job_limit - $self->work_done();
        if($jobs_to_do < $batch_size) {
            return $jobs_to_do;         # should return 0 when job_limit has been attained
        }
    }
    return $batch_size;
}


260 261 262
sub job_limit_reached {
    my $self = shift @_;

263 264
    if($self->job_limit and $self->work_done >= $self->job_limit) { 
        return $self->work_done;
265 266 267 268
    }
    return 0;
}

269 270 271 272 273 274 275 276 277 278 279
# By maintaining this information we attempt to detect worker contamination without the user specifically telling us about it
#
# Ideally we should be doing an *ALIGNMENT* of error messages (allowing for some numerical IDs to differ),
# but at the moment we assume all errors identical. If the worker failed two jobs in a row - let him die.

sub prev_job_error {
    my $self = shift @_;

    $self->{'_prev_job_error'} = shift if(@_);
    return $self->{'_prev_job_error'};
}
280 281


282
sub dbID {
Jessica Severin's avatar
Jessica Severin committed
283
  my( $self, $value ) = @_;
Leo Gordon's avatar
Leo Gordon committed
284 285
  $self->{'_worker_id'} = $value if($value);
  return $self->{'_worker_id'};
Jessica Severin's avatar
Jessica Severin committed
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
}

sub host {
  my( $self, $value ) = @_;
  $self->{'_host'} = $value if($value);
  return $self->{'_host'};
}

sub process_id {
  my( $self, $value ) = @_;
  $self->{'_ppid'} = $value if($value);
  return $self->{'_ppid'};
}

sub cause_of_death {
  my( $self, $value ) = @_;
  $self->{'_cause_of_death'} = $value if($value);
  return $self->{'_cause_of_death'};
}

306 307 308 309 310 311
sub status {
  my( $self, $value ) = @_;
  $self->{'_status'} = $value if($value);
  return $self->{'_status'};
}

Jessica Severin's avatar
Jessica Severin committed
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
sub born {
  my( $self, $value ) = @_;
  $self->{'_born'} = $value if($value);
  return $self->{'_born'};
}

sub died {
  my( $self, $value ) = @_;
  $self->{'_died'} = $value if($value);
  return $self->{'_died'};
}

sub last_check_in {
  my( $self, $value ) = @_;
  $self->{'_last_check_in'} = $value if($value);
  return $self->{'_last_check_in'};
}

330 331 332
# this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?

sub retry_throwing_jobs {
333 334 335 336
    my $self = shift @_;

    $self->{'_retry_throwing_jobs'} = shift @_ if(@_);
    return defined($self->{'_retry_throwing_jobs'}) ? $self->{'_retry_throwing_jobs'} : 1;
337 338
}

339
=head2 hive_output_dir
340

Jessica Severin's avatar
Jessica Severin committed
341
  Arg [1] : (optional) string directory path
342 343 344 345 346
  Title   :   hive_output_dir
  Usage   :   $hive_output_dir = $self->hive_output_dir;
              $self->hive_output_dir($hive_output_dir);
  Description: getter/setter for the directory where STDOUT and STRERR of the hive will be redirected to.
          If it is "true", each worker will create its own subdirectory in it
347
          where each job will have its own .out and .err files.
Jessica Severin's avatar
Jessica Severin committed
348
  Returntype : string
349

Jessica Severin's avatar
Jessica Severin committed
350 351
=cut

352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373
sub hive_output_dir {
    my $self = shift @_;

    $self->{'_hive_output_dir'} = shift @_ if(@_);
    return $self->{'_hive_output_dir'};
}

sub worker_output_dir {
    my $self = shift @_;

    if((my $worker_output_dir = $self->{'_worker_output_dir'}) and not @_) { # no need to set, just return:

        return $worker_output_dir;

    } else { # let's try to set first:
    
        if(@_) { # setter mode ignores hive_output_dir

            $worker_output_dir = shift @_;

        } elsif( my $hive_output_dir = $self->hive_output_dir ) {

374
            my $worker_id = $self->dbID;
375

376
            my $dir_revhash = dir_revhash($worker_id);
377
            $worker_output_dir = join('/', $hive_output_dir, dir_revhash($worker_id), 'worker_id_'.$worker_id );
378 379 380 381 382 383 384 385 386
        }

        if($worker_output_dir) { # will not attempt to create if set to false
            system("mkdir -p $worker_output_dir") && die "Could not create '$worker_output_dir' because: $!";
        }

        $self->{'_worker_output_dir'} = $worker_output_dir;
    }
    return $self->{'_worker_output_dir'};
Jessica Severin's avatar
Jessica Severin committed
387 388
}

389 390 391 392 393 394 395 396 397 398 399 400
sub get_stdout_redirector {
    my $self = shift;

    return $self->{_stdout_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDOUT);
}

sub get_stderr_redirector {
    my $self = shift;

    return $self->{_stderr_redirector} ||= Bio::EnsEMBL::Hive::Utils::RedirectStack->new(\*STDERR);
}

401

Leo Gordon's avatar
Leo Gordon committed
402
sub perform_cleanup {
403
  my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
404 405 406
  $self->{'_perform_cleanup'} = shift if(@_);
  $self->{'_perform_cleanup'} = 1 unless(defined($self->{'_perform_cleanup'}));
  return $self->{'_perform_cleanup'};
407 408
}

Jessica Severin's avatar
Jessica Severin committed
409 410
sub print_worker {
  my $self = shift;
411
  print("WORKER: worker_id=",$self->dbID,
Jessica Severin's avatar
Jessica Severin committed
412 413
     " analysis_id=(",$self->analysis->dbID,")",$self->analysis->logic_name,
     " host=",$self->host,
414
     " pid=",$self->process_id,
Jessica Severin's avatar
Jessica Severin committed
415
     "\n");
416 417 418
  print("\tbatch_size = ", $self->analysis->stats->get_or_estimate_batch_size(),"\n");
  print("\tjob_limit  = ", $self->job_limit,"\n") if(defined($self->job_limit));
  print("\tlife_span  = ", $self->life_span,"\n") if(defined($self->life_span));
419
  if(my $worker_output_dir = $self->worker_output_dir) {
420
    print("\tworker_output_dir = $worker_output_dir\n");
421
  } else {
422
    print("\tworker_output_dir = STDOUT/STDERR\n");
423
  }
Jessica Severin's avatar
Jessica Severin committed
424 425
}

426 427 428 429 430 431

sub worker_process_temp_directory {
  my $self = shift;
  
  unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
    #create temp directory to hold fasta databases
432
    my $username = $ENV{'USER'};
433
    my $worker_id = $self->dbID;
Leo Gordon's avatar
Leo Gordon committed
434
    $self->{'_tmp_dir'} = "/tmp/worker_${username}.${worker_id}/";
435
    mkdir($self->{'_tmp_dir'}, 0777);
436
    throw("unable to create a writable directory ".$self->{'_tmp_dir'}) unless(-w $self->{'_tmp_dir'});
437 438 439 440 441 442 443 444 445 446 447 448 449
  }
  return $self->{'_tmp_dir'};
}


sub cleanup_worker_process_temp_directory {
  my $self = shift;
  if($self->{'_tmp_dir'}) {
    my $cmd = "rm -r ". $self->{'_tmp_dir'};
    system($cmd);
  }
}

Jessica Severin's avatar
Jessica Severin committed
450 451 452 453 454
###############################
#
# WORK section
#
###############################
455

Jessica Severin's avatar
Jessica Severin committed
456

457 458 459 460 461 462 463 464 465
=head2 run

  Title   :   run
  Usage   :   $worker->run;
  Description: 
    This is a self looping autonomous function to process jobs.
    First all STDOUT/STDERR is rediected, then looping commences.
    Looping consists of 
      1) claiming jobs,
466 467
      2) processing those jobs through an instance of the 'module class' of 
         the analysis asigned to this worker,  
468
      3) updating the job, analysis_stats, and hive tables to track the 
469 470 471 472 473 474 475 476 477
         progress of the job, the analysis and this worker.
    Looping stops when any one of these are met:
      1) there is no more jobs to process 
      2) job_limit is reached
      3) life_span has been reached.
  Returntype : none

=cut

Leo Gordon's avatar
Leo Gordon committed
478
sub run {
Jessica Severin's avatar
Jessica Severin committed
479 480
  my $self = shift;

481
  $self->print_worker();
482
  if( my $worker_output_dir = $self->worker_output_dir ) {
483 484
    $self->get_stdout_redirector->push( $worker_output_dir.'/worker.out' );
    $self->get_stderr_redirector->push( $worker_output_dir.'/worker.err' );
485
    $self->print_worker();
Jessica Severin's avatar
Jessica Severin committed
486 487
  }

488
  $self->db->dbc->disconnect_when_inactive(0);
Jessica Severin's avatar
Jessica Severin committed
489

490 491
  my $min_batch_time    = $self->analysis->stats->min_batch_time();
  my $job_adaptor       = $self->db->get_AnalysisJobAdaptor;
Leo Gordon's avatar
Leo Gordon committed
492

493
  do { # Worker's lifespan loop (ends only when the worker dies)
494 495 496 497 498
    my $batches_stopwatch           = Bio::EnsEMBL::Hive::Utils::Stopwatch->new()->restart();
    $self->{'fetching_stopwatch'}   = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    $self->{'running_stopwatch'}    = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    $self->{'writing_stopwatch'}    = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    my $jobs_done_by_batches_loop   = 0; # by all iterations of internal loop
Jessica Severin's avatar
Jessica Severin committed
499

500
    if( my $specific_job = $self->_specific_job() ) {
501
        $jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->reclaim_job_for_worker($self, $specific_job) );
502 503 504
        $self->cause_of_death('JOB_LIMIT'); 
    } else {    # a proper "BATCHES" loop

505
        while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
506

507
            if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_worker_id( $self->dbID ) }) ) {
508 509 510 511
                my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
                warn "$msg";
                $self->cause_of_death('CONTAMINATED'); 
                $job_adaptor->release_undone_jobs_from_worker($self, $msg);
512
            } else {
513 514 515
                if(my $how_many_this_batch = $self->next_batch_size()) {
                    $jobs_done_by_batches_loop += $self->run_one_batch( $job_adaptor->grab_jobs_for_worker( $self, $how_many_this_batch ) );
                }
516 517 518 519 520 521 522 523 524 525 526

                if( my $jobs_completed = $self->job_limit_reached()) {
                    print "job_limit reached ($jobs_completed jobs completed)\n";
                    $self->cause_of_death('JOB_LIMIT'); 
                } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
                    print "life_span limit reached (alive for $alive_for_secs secs)\n";
                    $self->cause_of_death('LIFESPAN'); 
                }
            }
        }
    }
527

528
        # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
529
        # so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
530
        #
531 532 533 534 535 536 537 538 539 540 541
    if($jobs_done_by_batches_loop) {

        $self->db->get_AnalysisStatsAdaptor->interval_update_work_done(
            $self->analysis->dbID,
            $jobs_done_by_batches_loop,
            $batches_stopwatch->get_elapsed,
            $self->{'fetching_stopwatch'}->get_elapsed,
            $self->{'running_stopwatch'}->get_elapsed,
            $self->{'writing_stopwatch'}->get_elapsed,
        );
    }
542 543

    if (!$self->cause_of_death
544 545
    and 0 <= $self->analysis->stats->hive_capacity
    and $self->analysis->stats->hive_capacity < $self->analysis->stats->num_running_workers
546 547 548
    ) {
        $self->cause_of_death('HIVE_OVERLOAD');
    }
549

550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
  } while (!$self->cause_of_death); # /Worker's lifespan loop

  if($self->perform_cleanup) {
    #have runnable cleanup any global/process files/data it may have created
    $self->cleanup_worker_process_temp_directory;
  }

  $self->queen->register_worker_death($self);

  $self->analysis->stats->print_stats if($self->debug);

  printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count);
  print("total jobs completed : ", $self->work_done, "\n");
  
  if( $self->worker_output_dir() ) {
565 566
    $self->get_stdout_redirector->pop();
    $self->get_stderr_redirector->pop();
567 568
  }
}
Jessica Severin's avatar
Jessica Severin committed
569

570 571
sub run_one_batch {
    my ($self, $jobs) = @_;
Jessica Severin's avatar
Jessica Severin committed
572

573
    my $jobs_done_here = 0;
574

575 576
    my $max_retry_count = $self->analysis->stats->max_retry_count();  # a constant (as the Worker is already specialized by the Queen) needed later for retrying jobs

577
    $self->queen->check_in_worker( $self );
578
    $self->queen->safe_synchronize_AnalysisStats($self->analysis->stats);
579 580 581 582

    $self->cause_of_death('NO_WORK') unless(scalar @{$jobs});

    if($self->debug) {
583
        $self->analysis->stats->print_stats;
584
        print "claimed ".scalar(@{$jobs})." jobs to process\n";
585
    }
586

587
    foreach my $job (@{$jobs}) {
588 589
        $job->print_job if($self->debug); 

590 591
        my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();

592
        $self->start_job_output_redirection($job);  # switch logging into job's STDERR
593
        eval {  # capture any throw/die
594 595 596 597

            $self->enter_status('COMPILATION', $job);       # ToDo: when Runnables are ready, switch to compiling once per batch (saves time)
            my $runnable_db = $self->analysis->process or die "Unknown compilation error";

598
            $self->db->dbc->query_count(0);
599 600 601
            $job_stopwatch->restart();

            $self->run_module_with_job($runnable_db, $job);
602
        };
603
        my $msg_thrown          = $@;
604 605

        $job->runtime_msec( $job_stopwatch->get_elapsed );
606
        $job->query_count( $self->db->dbc->query_count );
607

608 609
        my $job_id              = $job->dbID();
        my $job_completion_line = "\njob $job_id : complete\n";
610

611 612
        if($msg_thrown) {   # record the message - whether it was a success or failure:
            my $job_status_at_the_moment = $job->status();
Leo Gordon's avatar
Leo Gordon committed
613
            my $action = $job->incomplete ? 'died' : 'exited';
614
            $job_completion_line = "\njob $job_id : $action in status '$job_status_at_the_moment' for the following reason: $msg_thrown\n";
615
            $self->db->get_JobMessageAdaptor()->register_message($job_id, $msg_thrown, $job->incomplete );
616
        }
617

618 619 620 621
        print STDERR $job_completion_line if($self->worker_output_dir and ($self->debug or $job->incomplete));  # one copy goes to the job's STDERR
        $self->stop_job_output_redirection($job);                                                               # and then we switch back to worker's STDERR
        print STDERR $job_completion_line;                                                                      # one copy goes to the worker's STDERR

622
        if($job->incomplete) {
623 624 625
                # If the job specifically said what to do next, respect that last wish.
                # Otherwise follow the default behaviour set by the beekeeper in $worker:
                #
Leo Gordon's avatar
Leo Gordon committed
626 627 628 629 630
            my $may_retry = defined($job->transient_error) ? $job->transient_error : $self->retry_throwing_jobs;

            $job->adaptor->release_and_age_job( $job->dbID, $max_retry_count, $may_retry );

            if($self->status eq 'COMPILATION'       # if it failed to compile, there is no point in continuing as the code WILL be broken
631 632
            or $self->prev_job_error                # a bit of AI: if the previous job failed as well, it is LIKELY that we have contamination
            or $job->lethal_for_worker ) {          # trust the job's expert knowledge
Leo Gordon's avatar
Leo Gordon committed
633
                my $reason = ($self->status eq 'COMPILATION') ? 'compilation error'
634 635
                           : $self->prev_job_error            ? 'two failed jobs in a row'
                           :                                    'suggested by job itself';
636
                warn "Job's error has contaminated the Worker ($reason), so the Worker will now die\n";
637
                $self->cause_of_death('CONTAMINATED');
638
                return $jobs_done_here;
639
            }
640
        } else {    # job successfully completed:
641
            $self->more_work_done;
642
            $jobs_done_here++;
643
            $job->update_status('DONE');
644 645 646 647

            if(my $semaphored_job_id = $job->semaphored_job_id) {
                $job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore
            }
648
        }
649 650 651

        $self->prev_job_error( $job->incomplete );
        $self->enter_status('READY');
652
    }
653

654
    return $jobs_done_here;
Jessica Severin's avatar
Jessica Severin committed
655 656 657
}


658
sub run_module_with_job {
659
    my ($self, $runnable_db, $job) = @_;
Jessica Severin's avatar
Jessica Severin committed
660

661 662 663 664
    $runnable_db->input_job( $job );
    $runnable_db->db( $self->db );
    $runnable_db->worker( $self );
    $runnable_db->debug( $self->debug );
665

666 667 668
    $job->param_init( $runnable_db->strict_hash_format(), $runnable_db->param_defaults(), $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() );
    $job->incomplete(1);
    $job->autoflow(1);
669

670
    $self->enter_status('GET_INPUT', $job);
Leo Gordon's avatar
Leo Gordon committed
671

672
    $self->{'fetching_stopwatch'}->continue();
673
    $runnable_db->fetch_input;
674
    $self->{'fetching_stopwatch'}->pause();
Leo Gordon's avatar
Leo Gordon committed
675

676
    $self->enter_status('RUN', $job);
Leo Gordon's avatar
Leo Gordon committed
677

678
    $self->{'running_stopwatch'}->continue();
679
    $runnable_db->run;
680
    $self->{'running_stopwatch'}->pause();
Leo Gordon's avatar
Leo Gordon committed
681

682
    if($self->execute_writes) {
683
        $self->enter_status('WRITE_OUTPUT', $job);
Leo Gordon's avatar
Leo Gordon committed
684

685
        $self->{'writing_stopwatch'}->continue();
686
        $runnable_db->write_output;
687
        $self->{'writing_stopwatch'}->pause();
Leo Gordon's avatar
Leo Gordon committed
688

689
        if( $job->autoflow ) {
690
            print STDERR "\njob ".$job->dbID." : AUTOFLOW input->output\n" if($self->debug);
691
            $job->dataflow_output_id();
Leo Gordon's avatar
Leo Gordon committed
692
        }
693
    } else {
694
        print STDERR "\n!!! *no* WRITE_OUTPUT requested, so there will be no AUTOFLOW\n" if($self->debug); 
695
    }
696

697 698
    my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
    if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
699
        $job->transient_error(0);
700
        die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
701 702
    }

703
    $job->incomplete(0);
Jessica Severin's avatar
Jessica Severin committed
704 705
}

706

707
sub enter_status {
708 709 710
    my ($self, $status, $job) = @_;

    if($self->debug) {
711
        print STDERR "\n". ($job ? 'job '.$job->dbID : 'worker'). " : $status\n";
712 713 714 715 716 717 718
    }

    if($job) {
        $job->update_status( $status );
    }
    $self->status( $status );
    $self->queen->check_in_worker( $self );
719
}
Jessica Severin's avatar
Jessica Severin committed
720

721
sub start_job_output_redirection {
722
    my ($self, $job, $worker_output_dir) = @_;
Jessica Severin's avatar
Jessica Severin committed
723

724 725 726
    if(my $worker_output_dir = $self->worker_output_dir) {
        $self->get_stdout_redirector->push( $job->stdout_file( $worker_output_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.out' ) );
        $self->get_stderr_redirector->push( $job->stderr_file( $worker_output_dir . '/job_id_' . $job->dbID . '_' . $job->retry_count . '.err' ) );
727

728 729 730
        if(my $job_adaptor = $job->adaptor) {
            $job_adaptor->store_out_files($job);
        }
731
    }
Jessica Severin's avatar
Jessica Severin committed
732 733 734
}


735
sub stop_job_output_redirection {
736
    my ($self, $job) = @_;
737

738 739 740
    if($self->worker_output_dir) {
        $self->get_stdout_redirector->pop();
        $self->get_stderr_redirector->pop();
741

742
        my $force_cleanup = !($self->debug || $job->incomplete);
Jessica Severin's avatar
Jessica Severin committed
743

744 745
        if($force_cleanup or -z $job->stdout_file) {
            warn "Deleting '".$job->stdout_file."' file\n";
746
            unlink $job->stdout_file;
747
            $job->stdout_file(undef);
748
        }
749 750
        if($force_cleanup or -z $job->stderr_file) {
            warn "Deleting '".$job->stderr_file."' file\n";
751
            unlink $job->stderr_file;
752
            $job->stderr_file(undef);
753
        }
754

755 756 757
        if(my $job_adaptor = $job->adaptor) {
            $job_adaptor->store_out_files($job);
        }
758
    }
Jessica Severin's avatar
Jessica Severin committed
759 760
}

761

762 763 764 765 766
sub _specific_job {
  my $self = shift;
  $self->{'_specific_job'} = shift if(@_);
  return $self->{'_specific_job'};
}
767

Jessica Severin's avatar
Jessica Severin committed
768
1;