Worker.pm 21.3 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3 4 5 6 7 8
#
# You may distribute this module under the same terms as perl itself
#
# POD documentation - main docs before the code

=pod 

=head1 NAME
9

10
  Bio::EnsEMBL::Hive::Worker
Jessica Severin's avatar
Jessica Severin committed
11 12

=head1 DESCRIPTION
13

14 15 16
  Object which encapsulates the details of how to find jobs, how to run those
  jobs, and then check the rules to create the next jobs in the chain.
  Essentially knows where to find data, how to process data, and where to
17
  put it when it is done (put in next persons INBOX) so the next Worker
18 19 20 21 22 23 24 25 26
  in the chain can find data to work on.

  Hive based processing is a concept based on a more controlled version
  of an autonomous agent type system.  Each worker is not told what to do
  (like a centralized control system - like the current pipeline system)
  but rather queries a central database for jobs (give me jobs).

  Each worker is linked to an analysis_id, registers its self on creation
  into the Hive, creates a RunnableDB instance of the Analysis->module,
27
  gets relevant configuration information from the database, does its
28
  work, creates the next layer of analysis_job entries by interfacing to
29 30 31 32 33 34 35 36 37
  the DataflowRuleAdaptor to determine the analyses it needs to pass its
  output data to and creates jobs on the database of the next analysis.
  It repeats this cycle until it has lived its lifetime or until there are no
  more jobs left to process.
  The lifetime limit is a safety limit to prevent these from 'infecting'
  a system and sitting on a compute node for longer than is socially exceptable.
  This is primarily needed on compute resources like an LSF system where jobs
  are not preempted and run until they are done.

38
  The Queens primary job is to create Workers to get the work down.
39 40 41 42 43 44
  As part of this, she is also responsible for summarizing the status of the
  analyses by querying the analysis_jobs, summarizing, and updating the
  analysis_stats table.  From this she is also responsible for monitoring and
  'unblocking' analyses via the analysis_ctrl_rules.
  The Queen is also responsible for freeing up jobs that were claimed by Workers
  that died unexpectantly so that other workers can take over the work.  
45 46

  The Beekeeper is in charge of interfacing between the Queen and a compute resource
47
  or 'compute farm'.  Its job is to query Queens if they need any workers and to
48
  send the requested number of workers to open machines via the runWorker.pl script.
49 50
  It is also responsible for interfacing with the Queen to identify workers which died
  unexpectantly so that she can free the dead workers unfinished jobs.
Jessica Severin's avatar
Jessica Severin committed
51 52 53

=head1 CONTACT

54
  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
55

Jessica Severin's avatar
Jessica Severin committed
56
=head1 APPENDIX
57

58 59
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
60 61 62

=cut

63

Jessica Severin's avatar
Jessica Severin committed
64 65 66
package Bio::EnsEMBL::Hive::Worker;

use strict;
67 68
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
69
use Sys::Hostname;
70
use Time::HiRes qw(time);
71
use POSIX;
Jessica Severin's avatar
Jessica Severin committed
72 73 74 75

use Bio::EnsEMBL::Analysis;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
76
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
77
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
Jessica Severin's avatar
Jessica Severin committed
78
use Bio::EnsEMBL::Hive::Extensions;
79
use Bio::EnsEMBL::Hive::Process;
Jessica Severin's avatar
Jessica Severin committed
80

81 82
use Bio::EnsEMBL::Hive::Utils ('dir_revhash');  # import dir_revhash

83 84 85
## Minimum amount of time in msec that a worker should run before reporting
## back to the hive. This is used when setting the batch_size automatically.
## 120000 msec = 2 minutes
86
my $MIN_BATCH_TIME = 2*60*1000;
87

88 89 90 91 92
sub new {
  my ($class,@args) = @_;
  my $self = bless {}, $class;
  return $self;
}
Jessica Severin's avatar
Jessica Severin committed
93 94 95

sub init {
  my $self = shift;
96
  $self->start_time(time());
97
  $self->debug(0);
Jessica Severin's avatar
Jessica Severin committed
98 99 100
  return $self;
}

101
sub queen {
Jessica Severin's avatar
Jessica Severin committed
102
  my $self = shift;
103 104
  $self->{'_queen'} = shift if(@_);
  return $self->{'_queen'};
Jessica Severin's avatar
Jessica Severin committed
105 106 107 108 109 110
}
sub db {
  my $self = shift;
  $self->{'_db'} = shift if(@_);
  return $self->{'_db'};
}
111 112 113 114 115
sub beekeeper {
  my $self = shift;
  $self->{'_beekeeper'} = shift if(@_);
  return $self->{'_beekeeper'};
}
116 117 118 119 120 121
sub debug {
  my $self = shift;
  $self->{'_debug'} = shift if(@_);
  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));
  return $self->{'_debug'};
}
122 123 124 125 126 127
sub execute_writes {
  my $self = shift;
  $self->{'_execute_writes'} = shift if(@_);
  $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
  return $self->{'_execute_writes'};
}
Jessica Severin's avatar
Jessica Severin committed
128 129

=head2 analysis
130

Jessica Severin's avatar
Jessica Severin committed
131 132 133 134 135 136 137
  Arg [1] : (optional) Bio::EnsEMBL::Analysis $value
  Title   :   analysis
  Usage   :   $value = $self->analysis;
              $self->analysis($$analysis);
  Description: Get/Set analysis object of this Worker
  DefaultValue : undef
  Returntype : Bio::EnsEMBL::Analysis object
138

Jessica Severin's avatar
Jessica Severin committed
139 140 141 142 143 144 145
=cut

sub analysis {
  my $self = shift;
  my $analysis = shift;

  if(defined($analysis)) {
146
    throw("analysis arg must be a [Bio::EnsEMBL::Analysis] not a [$analysis]")
Jessica Severin's avatar
Jessica Severin committed
147 148 149 150 151 152 153 154 155
       unless($analysis->isa('Bio::EnsEMBL::Analysis'));
    $self->{'_analysis'} = $analysis;
  }

  return $self->{'_analysis'};
}


=head2 life_span
156

Jessica Severin's avatar
Jessica Severin committed
157 158 159 160 161 162 163 164 165
  Arg [1] : (optional) integer $value (in seconds)
  Title   :   life_span
  Usage   :   $value = $self->life_span;
              $self->life_span($new_value);
  Description: Defines the maximum time a worker can live for. Workers are always
               allowed to complete the jobs they get, but whether they can
               do multiple rounds of work is limited by their life_span
  DefaultValue : 3600 (60 minutes)
  Returntype : integer scalar
166

Jessica Severin's avatar
Jessica Severin committed
167 168
=cut

169
sub life_span { # default life_span = 60minutes
Jessica Severin's avatar
Jessica Severin committed
170 171 172 173 174 175
  my( $self, $value ) = @_;
  $self->{'_life_span'} = 60*60 unless(defined($self->{'_life_span'}));
  $self->{'_life_span'} = $value if(defined($value));
  return $self->{'_life_span'};
}

176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
sub start_time {
    my $self = shift @_;

    if(@_) {
        $self->{'start_time'} = shift @_;
    }
    return $self->{'start_time'};
}

sub life_span_limit_reached {
    my $self = shift @_;

    if( $self->life_span() ) {
        my $alive_for_secs = time()-$self->start_time();
        if($alive_for_secs > $self->life_span() ) {
            return $alive_for_secs;
        }
    }
    return 0;
}



199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
=head2 job_limit

  Title   :   job_limit
  Arg [1] :   (optional) integer $value
  Usage   :   $value = $self->job_limit;
              $self->job_limit($new_value);
  Description: Defines the maximum number of jobs a worker can process 
               before it needs to die. A worker 'dies' when either the 
               'life_span' or 'job_limit' is exceeded.
  DefaultValue : undef (relies on life_span to limit life of worker)
  Returntype : integer scalar

=cut

sub job_limit {
  my $self=shift;
215
  $self->{'_job_limit'}=shift if(@_);
216 217 218
  return $self->{'_job_limit'};
}

219
sub work_done {
220 221 222
  my $self = shift @_;

  if(@_) {
223
    $self->{'work_done'} = shift @_;
224
  }
225
  return $self->{'work_done'} || 0;
226 227
}

228
sub more_work_done {
229 230
  my $self = shift @_;

231
  $self->{'work_done'}++;
232 233 234 235 236
}

sub job_limit_reached {
    my $self = shift @_;

237 238
    if($self->job_limit and $self->work_done >= $self->job_limit) { 
        return $self->work_done;
239 240 241 242 243 244
    }
    return 0;
}



Leo Gordon's avatar
Leo Gordon committed
245
sub worker_id {
Jessica Severin's avatar
Jessica Severin committed
246
  my( $self, $value ) = @_;
Leo Gordon's avatar
Leo Gordon committed
247 248
  $self->{'_worker_id'} = $value if($value);
  return $self->{'_worker_id'};
Jessica Severin's avatar
Jessica Severin committed
249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
}

sub host {
  my( $self, $value ) = @_;
  $self->{'_host'} = $value if($value);
  return $self->{'_host'};
}

sub process_id {
  my( $self, $value ) = @_;
  $self->{'_ppid'} = $value if($value);
  return $self->{'_ppid'};
}

sub cause_of_death {
  my( $self, $value ) = @_;
  $self->{'_cause_of_death'} = $value if($value);
  return $self->{'_cause_of_death'};
}

269 270 271 272 273 274
sub status {
  my( $self, $value ) = @_;
  $self->{'_status'} = $value if($value);
  return $self->{'_status'};
}

Jessica Severin's avatar
Jessica Severin committed
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
sub born {
  my( $self, $value ) = @_;
  $self->{'_born'} = $value if($value);
  return $self->{'_born'};
}

sub died {
  my( $self, $value ) = @_;
  $self->{'_died'} = $value if($value);
  return $self->{'_died'};
}

sub last_check_in {
  my( $self, $value ) = @_;
  $self->{'_last_check_in'} = $value if($value);
  return $self->{'_last_check_in'};
}

293
=head2 hive_output_dir
294

Jessica Severin's avatar
Jessica Severin committed
295
  Arg [1] : (optional) string directory path
296 297 298 299 300 301
  Title   :   hive_output_dir
  Usage   :   $hive_output_dir = $self->hive_output_dir;
              $self->hive_output_dir($hive_output_dir);
  Description: getter/setter for the directory where STDOUT and STRERR of the hive will be redirected to.
          If it is "true", each worker will create its own subdirectory in it
          where each analysis_job will have its own .out and .err files.
Jessica Severin's avatar
Jessica Severin committed
302
  Returntype : string
303

Jessica Severin's avatar
Jessica Severin committed
304 305
=cut

306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
sub hive_output_dir {
    my $self = shift @_;

    $self->{'_hive_output_dir'} = shift @_ if(@_);
    return $self->{'_hive_output_dir'};
}

sub worker_output_dir {
    my $self = shift @_;

    if((my $worker_output_dir = $self->{'_worker_output_dir'}) and not @_) { # no need to set, just return:

        return $worker_output_dir;

    } else { # let's try to set first:
    
        if(@_) { # setter mode ignores hive_output_dir

            $worker_output_dir = shift @_;

        } elsif( my $hive_output_dir = $self->hive_output_dir ) {

            my $worker_id = $self->worker_id();

330
            $worker_output_dir = join('/', $hive_output_dir, dir_revhash($worker_id), 'worker_id_'.$worker_id );
331 332 333 334 335 336 337 338 339
        }

        if($worker_output_dir) { # will not attempt to create if set to false
            system("mkdir -p $worker_output_dir") && die "Could not create '$worker_output_dir' because: $!";
        }

        $self->{'_worker_output_dir'} = $worker_output_dir;
    }
    return $self->{'_worker_output_dir'};
Jessica Severin's avatar
Jessica Severin committed
340 341
}

342

343 344 345 346 347 348 349
sub perform_global_cleanup {
  my $self = shift;
  $self->{'_perform_global_cleanup'} = shift if(@_);
  $self->{'_perform_global_cleanup'} = 1 unless(defined($self->{'_perform_global_cleanup'}));
  return $self->{'_perform_global_cleanup'};
}

Jessica Severin's avatar
Jessica Severin committed
350 351
sub print_worker {
  my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
352
  print("WORKER: worker_id=",$self->worker_id,
Jessica Severin's avatar
Jessica Severin committed
353 354
     " analysis_id=(",$self->analysis->dbID,")",$self->analysis->logic_name,
     " host=",$self->host,
355
     " pid=",$self->process_id,
Jessica Severin's avatar
Jessica Severin committed
356 357 358 359
     "\n");
  print("  batch_size = ", $self->batch_size,"\n");
  print("  job_limit  = ", $self->job_limit,"\n") if(defined($self->job_limit));
  print("  life_span  = ", $self->life_span,"\n") if(defined($self->life_span));
360 361
  if(my $worker_output_dir = $self->worker_output_dir) {
    print("  worker_output_dir = $worker_output_dir\n");
362
  } else {
363
    print("  worker_output_dir = STDOUT/STDERR\n");
364
  }
Jessica Severin's avatar
Jessica Severin committed
365 366
}

367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388

sub worker_process_temp_directory {
  my $self = shift;
  
  unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
    #create temp directory to hold fasta databases
    $self->{'_tmp_dir'} = "/tmp/worker.$$/";
    mkdir($self->{'_tmp_dir'}, 0777);
    throw("unable to create ".$self->{'_tmp_dir'}) unless(-e $self->{'_tmp_dir'});
  }
  return $self->{'_tmp_dir'};
}


sub cleanup_worker_process_temp_directory {
  my $self = shift;
  if($self->{'_tmp_dir'}) {
    my $cmd = "rm -r ". $self->{'_tmp_dir'};
    system($cmd);
  }
}

Jessica Severin's avatar
Jessica Severin committed
389 390 391 392 393
###############################
#
# WORK section
#
###############################
394

Jessica Severin's avatar
Jessica Severin committed
395
=head2 batch_size
396

397
  Args    :   none
Jessica Severin's avatar
Jessica Severin committed
398 399 400 401 402 403 404
  Title   :   batch_size
  Usage   :   $value = $self->batch_size;
              $self->batch_size($new_value);
  Description: Defines the number of jobs that should run in batch
               before querying the database for the next job batch.  Used by the
               Hive system to manage the number of workers needed to complete a
               particular job type.
405
  DefaultValue : batch_size of analysis
Jessica Severin's avatar
Jessica Severin committed
406
  Returntype : integer scalar
407

Jessica Severin's avatar
Jessica Severin committed
408 409
=cut

410 411 412 413 414 415 416 417
sub set_worker_batch_size {
  my $self = shift;
  my $batch_size = shift;
  if(defined($batch_size)) {
    $self->{'_batch_size'} = $batch_size;
  }
}

Jessica Severin's avatar
Jessica Severin committed
418 419
sub batch_size {
  my $self = shift;
420

421 422
  my $stats = $self->analysis->stats;
  my $batch_size = $stats->batch_size;
423 424 425 426
  if(defined($self->{'_batch_size'})) {
    $batch_size = $self->{'_batch_size'};
  } 
    
427
  if(($batch_size <= 0) and ($stats->avg_msec_per_job)) {
428
    $batch_size = POSIX::ceil($MIN_BATCH_TIME / $stats->avg_msec_per_job); # num jobs in $MIN_BATCH_TIME msecs
429
  }
430 431
  $batch_size = 1 if($batch_size < 1); # make sure we grab at least one job
  
432
  if($self->job_limit and ($self->job_limit < $batch_size)) {
433 434 435
    $batch_size = $self->job_limit;
  }
  return $batch_size;
Jessica Severin's avatar
Jessica Severin committed
436 437 438
}


439 440 441 442 443 444 445 446 447
=head2 run

  Title   :   run
  Usage   :   $worker->run;
  Description: 
    This is a self looping autonomous function to process jobs.
    First all STDOUT/STDERR is rediected, then looping commences.
    Looping consists of 
      1) claiming jobs,
448 449
      2) processing those jobs through an instance of the 'module class' of 
         the analysis asigned to this worker,  
450 451 452 453 454 455 456 457 458 459
      3) updating the analysis_job, analysis_stats, and hive tables to track the 
         progress of the job, the analysis and this worker.
    Looping stops when any one of these are met:
      1) there is no more jobs to process 
      2) job_limit is reached
      3) life_span has been reached.
  Returntype : none

=cut

Jessica Severin's avatar
Jessica Severin committed
460 461 462
sub run
{
  my $self = shift;
463
  my $specific_job = $self->_specific_job;
Jessica Severin's avatar
Jessica Severin committed
464

465
  if( my $worker_output_dir = $self->worker_output_dir ) {
Jessica Severin's avatar
Jessica Severin committed
466 467
    open OLDOUT, ">&STDOUT";
    open OLDERR, ">&STDERR";
468 469
    open WORKER_STDOUT, ">${worker_output_dir}/worker.out";
    open WORKER_STDERR, ">${worker_output_dir}/worker.err";
Jessica Severin's avatar
Jessica Severin committed
470 471 472 473 474
    close STDOUT;
    close STDERR;
    open STDOUT, ">&WORKER_STDOUT";
    open STDERR, ">&WORKER_STDERR";
  }
475
  $self->print_worker();
Jessica Severin's avatar
Jessica Severin committed
476

477
  $self->db->dbc->disconnect_when_inactive(0);
Jessica Severin's avatar
Jessica Severin committed
478

479 480 481
  do { # Worker's lifespan loop (ends only when the worker dies)
    my $batches_start = time() * 1000;
    my $batches_end = $batches_start;
482 483 484
    $self->{fetch_time} = 0;
    $self->{run_time} = 0;
    $self->{write_time} = 0;
Jessica Severin's avatar
Jessica Severin committed
485

486
    BATCHES: {  # note: in order to label a do{} loop you have to enclose it in an extra block
487
    do {    # Worker's "batches loop" exists to prevent logging the status too frequently.
488
                     # If a batch took less than $MIN_BATCH_TIME to run, the Worker keeps taking&running more batches.
489 490 491 492

      my $jobs = $specific_job
        ? [ $self->queen->worker_reclaim_job($self,$specific_job) ]
        : $self->queen->worker_grab_jobs($self);
Jessica Severin's avatar
Jessica Severin committed
493

494
      $self->queen->worker_check_in($self); #will sync analysis_stats if needed
Jessica Severin's avatar
Jessica Severin committed
495

496
      $self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
497

498 499 500 501 502 503 504 505
      if($self->debug) {
        $self->analysis->stats->print_stats;
        print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n");
      }

      foreach my $job (@{$jobs}) {
        $job->print_job if($self->debug); 

506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529
        $self->start_job_output_redirection($job);
        eval {  # capture any death event
            $self->run_module_with_job($job);
        };
        my $error_msg = $@;
        $self->stop_job_output_redirection($job);

        if($error_msg) {
            my $job_id               = $job->dbID();
            my $job_status_when_died = $job->status();
            warn "Job with id=$job_id died in status '$job_status_when_died' for the following reason: $error_msg\n";
            $self->db()->get_JobErrorAdaptor()->register_error($job_id, $error_msg);
            $job->update_status('FAILED');

            if($job->lethal) {    # either a compilation error or other job-sanctioned contamination
                warn "Job's error has contaminated the Worker, so the Worker will now die\n";
                $self->cause_of_death('CONTAMINATED');
                last BATCHES;
            }
        } else {
            if(my $semaphored_job_id = $job->semaphored_job_id) {
                $job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );    # step-unblock the semaphore after job is (successfully) done
            }
            $self->more_work_done;
530
        }
531
      }
532 533 534 535 536 537 538 539 540 541
      $batches_end = time() * 1000;

      if( $specific_job ) {
            $self->cause_of_death('JOB_LIMIT'); 
      } elsif( my $jobs_completed = $self->job_limit_reached()) {
            print "job_limit reached (completed $jobs_completed jobs)\n";
            $self->cause_of_death('JOB_LIMIT'); 
      } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
            print "life_span limit reached (alive for $alive_for_secs secs)\n";
            $self->cause_of_death('LIFESPAN'); 
Leo Gordon's avatar
Leo Gordon committed
542
      }
543
    } while (!$self->cause_of_death and $batches_end-$batches_start < $MIN_BATCH_TIME);
544
    } # this is the extra block enclosing a labelled do{} loop
545

546 547 548
        # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
        # so they are not allowed to happen too frequently (not before $MIN_BATCH_TIME of work has been done)
        #
549
    $self->db->get_AnalysisStatsAdaptor->interval_update_work_done($self->analysis->dbID,
550
        $self->work_done, $batches_end-$batches_start, $self);
551

552 553 554
    if (!$self->cause_of_death
    and $self->analysis->stats->hive_capacity >= 0
    and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity) {
555 556 557 558 559 560 561
      my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
                "WHERE num_running_workers > hive_capacity AND analysis_id = " . $self->analysis->stats->analysis_id;
      my $row_count = $self->queen->dbc->do($sql);
      if ($row_count == 1) {
        $self->cause_of_death('HIVE_OVERLOAD');
      }
    }
562
  } while (!$self->cause_of_death); # /Worker's lifespan loop
Leo Gordon's avatar
Leo Gordon committed
563

Leo Gordon's avatar
Leo Gordon committed
564
  $self->queen->dbc->do("UPDATE hive SET status = 'DEAD' WHERE worker_id = ".$self->worker_id);
565 566 567
  
  if($self->perform_global_cleanup) {
    #have runnable cleanup any global/process files/data it may have created
568
    $self->cleanup_worker_process_temp_directory;
569
  }
Jessica Severin's avatar
Jessica Severin committed
570

571
  $self->queen->register_worker_death($self);
Jessica Severin's avatar
Jessica Severin committed
572

573 574
  $self->analysis->stats->print_stats if($self->debug);

575
  printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count);
576
  print("total jobs completed : ", $self->work_done, "\n");
577
  
578
  if( $self->worker_output_dir() ) {
Jessica Severin's avatar
Jessica Severin committed
579 580 581 582 583 584 585 586 587 588
    close STDOUT;
    close STDERR;
    close WORKER_STDOUT;
    close WORKER_STDERR;
    open STDOUT, ">&", \*OLDOUT;
    open STDERR, ">&", \*OLDERR;
  }
}


589 590
sub run_module_with_job {
  my ($self, $job) = @_;
Jessica Severin's avatar
Jessica Severin committed
591

592 593
  my ($start_time, $end_time);

594 595 596 597 598 599 600
  $self->enter_status('COMPILATION');
  $job->update_status('COMPILATION');
  $job->lethal(1);  # if it dies in this state, it will kill the Worker
  my $runObj = $self->analysis->process or die "Unknown compilation error";
  $job->lethal(0);  # not dangerous anymore

  my $native_hive_process = $runObj->isa("Bio::EnsEMBL::Hive::Process");
Jessica Severin's avatar
Jessica Severin committed
601
  
602
  my $init_time = time() * 1000;
603 604
  $self->queen->dbc->query_count(0);

605
  #pass the input_id from the job into the Process object
606
  if($native_hive_process) {
607 608
    $runObj->input_job($job);
    $runObj->queen($self->queen);
609 610
    $runObj->worker($self);
    $runObj->debug($self->debug);
611 612 613 614
  } else {
    $runObj->input_id($job->input_id);
    $runObj->db($self->db);
  }
615 616 617

  my $analysis_stats = $self->analysis->stats;

618
  $self->enter_status('GET_INPUT');
619
  $job->update_status('GET_INPUT');
620
  print("\nGET_INPUT\n") if($self->debug); 
621 622

  $start_time = time() * 1000;
Jessica Severin's avatar
Jessica Severin committed
623
  $runObj->fetch_input;
624 625
  $end_time = time() * 1000;
  $self->{fetch_time} += $end_time - $start_time;
Jessica Severin's avatar
Jessica Severin committed
626

627
  $self->enter_status('RUN');
628
  $job->update_status('RUN');
629
  print("\nRUN\n") if($self->debug); 
630 631

  $start_time = time() * 1000;
Jessica Severin's avatar
Jessica Severin committed
632
  $runObj->run;
633 634
  $end_time = time() * 1000;
  $self->{run_time} += $end_time - $start_time;
Jessica Severin's avatar
Jessica Severin committed
635

636
  if($self->execute_writes) {
637
    $self->enter_status('WRITE_OUTPUT');
638 639
    $job->update_status('WRITE_OUTPUT');
    print("\nWRITE_OUTPUT\n") if($self->debug); 
640 641

    $start_time = time() * 1000;
642
    $runObj->write_output;
643 644
    $end_time = time() * 1000;
    $self->{write_time} += $end_time - $start_time;
645 646 647 648 649

    if( $native_hive_process and $runObj->autoflow_inputjob ) {
            printf("AUTOFLOW input->output\n") if($self->debug);
            $runObj->dataflow_output_id();
    }
650 651 652
  } else {
    print("\n\n!!!! NOT write_output\n\n\n") if($self->debug); 
  }
653

654
  $job->query_count($self->queen->dbc->query_count);
655 656
  $job->runtime_msec(time()*1000 - $init_time);

657 658
  $job->update_status('DONE');
  $self->enter_status('READY');
Jessica Severin's avatar
Jessica Severin committed
659 660
}

661 662 663 664
sub enter_status {
  my ($self, $status) = @_;
  return $self->queen->enter_status($self, $status);
}
Jessica Severin's avatar
Jessica Severin committed
665

666
sub start_job_output_redirection {
667 668 669 670
    my $self = shift;
    my $job  = shift or return;

    my $job_adaptor = $job->adaptor or return;
Jessica Severin's avatar
Jessica Severin committed
671

672
    if( my $worker_output_dir = $self->worker_output_dir ) {
Jessica Severin's avatar
Jessica Severin committed
673

674 675
        $job->stdout_file( $worker_output_dir . '/job_id_' . $job->dbID . '.out' );
        $job->stderr_file( $worker_output_dir . '/job_id_' . $job->dbID . '.err' );
Jessica Severin's avatar
Jessica Severin committed
676

677 678
        close STDOUT;
        open STDOUT, ">".$job->stdout_file;
Jessica Severin's avatar
Jessica Severin committed
679

680 681
        close STDERR;
        open STDERR, ">".$job->stderr_file;
682

683 684
        $job_adaptor->store_out_files($job);
    }
Jessica Severin's avatar
Jessica Severin committed
685 686 687
}


688
sub stop_job_output_redirection {
689 690
    my $self = shift;
    my $job  = shift or return;
Jessica Severin's avatar
Jessica Severin committed
691

692
    my $job_adaptor = $job->adaptor or return;
693

694
    if( $self->worker_output_dir ) {
695

696 697 698
        # the following flushes $job->stderr_file and $job->stdout_file
        open STDOUT, ">&WORKER_STDOUT";
        open STDERR, ">&WORKER_STDERR";
Jessica Severin's avatar
Jessica Severin committed
699

700 701 702 703 704 705 706 707
        if(-z $job->stdout_file) {
            unlink $job->stdout_file;
            $job->stdout_file('');
        }
        if(-z $job->stderr_file) {
            unlink $job->stderr_file;
            $job->stderr_file('');
        }
708

709 710
        $job_adaptor->store_out_files($job);
    }
Jessica Severin's avatar
Jessica Severin committed
711 712 713
}


Leo Gordon's avatar
Leo Gordon committed
714 715
# Does not seem to be used anywhere?
#
716 717 718 719 720 721 722 723 724 725
sub check_system_load {
  my $self = shift;

  my $host = hostname;
  my $numCpus = `grep -c '^process' /proc/cpuinfo`;
  print("host: $host  cpus:$numCpus\n");

  return 1;  #everything ok
}

726 727 728 729 730
sub _specific_job {
  my $self = shift;
  $self->{'_specific_job'} = shift if(@_);
  return $self->{'_specific_job'};
}
731

Jessica Severin's avatar
Jessica Severin committed
732
1;