Worker.pm 19.6 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3 4 5 6 7 8
#
# You may distribute this module under the same terms as perl itself
#
# POD documentation - main docs before the code

=pod 

=head1 NAME
9

10
  Bio::EnsEMBL::Hive::Worker
Jessica Severin's avatar
Jessica Severin committed
11 12

=head1 DESCRIPTION
13

14 15 16
  Object which encapsulates the details of how to find jobs, how to run those
  jobs, and then check the rules to create the next jobs in the chain.
  Essentially knows where to find data, how to process data, and where to
17
  put it when it is done (put in next persons INBOX) so the next Worker
18 19 20 21 22 23 24 25 26
  in the chain can find data to work on.

  Hive based processing is a concept based on a more controlled version
  of an autonomous agent type system.  Each worker is not told what to do
  (like a centralized control system - like the current pipeline system)
  but rather queries a central database for jobs (give me jobs).

  Each worker is linked to an analysis_id, registers its self on creation
  into the Hive, creates a RunnableDB instance of the Analysis->module,
27
  gets relevant configuration information from the database, does its
28
  work, creates the next layer of analysis_job entries by interfacing to
29 30 31 32 33 34 35 36 37
  the DataflowRuleAdaptor to determine the analyses it needs to pass its
  output data to and creates jobs on the database of the next analysis.
  It repeats this cycle until it has lived its lifetime or until there are no
  more jobs left to process.
  The lifetime limit is a safety limit to prevent these from 'infecting'
  a system and sitting on a compute node for longer than is socially exceptable.
  This is primarily needed on compute resources like an LSF system where jobs
  are not preempted and run until they are done.

38
  The Queens primary job is to create Workers to get the work down.
39 40 41 42 43 44
  As part of this, she is also responsible for summarizing the status of the
  analyses by querying the analysis_jobs, summarizing, and updating the
  analysis_stats table.  From this she is also responsible for monitoring and
  'unblocking' analyses via the analysis_ctrl_rules.
  The Queen is also responsible for freeing up jobs that were claimed by Workers
  that died unexpectantly so that other workers can take over the work.  
45 46

  The Beekeeper is in charge of interfacing between the Queen and a compute resource
47
  or 'compute farm'.  Its job is to query Queens if they need any workers and to
48
  send the requested number of workers to open machines via the runWorker.pl script.
49 50
  It is also responsible for interfacing with the Queen to identify workers which died
  unexpectantly so that she can free the dead workers unfinished jobs.
Jessica Severin's avatar
Jessica Severin committed
51 52 53

=head1 CONTACT

54
  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
55

Jessica Severin's avatar
Jessica Severin committed
56
=head1 APPENDIX
57

58 59
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
60 61 62

=cut

63

Jessica Severin's avatar
Jessica Severin committed
64 65 66
package Bio::EnsEMBL::Hive::Worker;

use strict;
67 68
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
69
use Sys::Hostname;
70
use Time::HiRes qw(time);
71
use POSIX;
Jessica Severin's avatar
Jessica Severin committed
72 73 74 75

use Bio::EnsEMBL::Analysis;
use Bio::EnsEMBL::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
76
use Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;
77
use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor;
Jessica Severin's avatar
Jessica Severin committed
78
use Bio::EnsEMBL::Hive::Extensions;
79
use Bio::EnsEMBL::Hive::Process;
Jessica Severin's avatar
Jessica Severin committed
80

81 82 83
## Minimum amount of time in msec that a worker should run before reporting
## back to the hive. This is used when setting the batch_size automatically.
## 120000 msec = 2 minutes
84
my $MIN_BATCH_TIME = 2*60*1000;
85

86 87 88 89 90
sub new {
  my ($class,@args) = @_;
  my $self = bless {}, $class;
  return $self;
}
Jessica Severin's avatar
Jessica Severin committed
91 92 93

sub init {
  my $self = shift;
94
  $self->start_time(time());
95
  $self->debug(0);
Jessica Severin's avatar
Jessica Severin committed
96 97 98
  return $self;
}

99
sub queen {
Jessica Severin's avatar
Jessica Severin committed
100
  my $self = shift;
101 102
  $self->{'_queen'} = shift if(@_);
  return $self->{'_queen'};
Jessica Severin's avatar
Jessica Severin committed
103 104 105 106 107 108
}
sub db {
  my $self = shift;
  $self->{'_db'} = shift if(@_);
  return $self->{'_db'};
}
109 110 111 112 113
sub beekeeper {
  my $self = shift;
  $self->{'_beekeeper'} = shift if(@_);
  return $self->{'_beekeeper'};
}
114 115 116 117 118 119
sub debug {
  my $self = shift;
  $self->{'_debug'} = shift if(@_);
  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));
  return $self->{'_debug'};
}
120 121 122 123 124 125
sub execute_writes {
  my $self = shift;
  $self->{'_execute_writes'} = shift if(@_);
  $self->{'_execute_writes'}=1 unless(defined($self->{'_execute_writes'}));
  return $self->{'_execute_writes'};
}
Jessica Severin's avatar
Jessica Severin committed
126 127

=head2 analysis
128

Jessica Severin's avatar
Jessica Severin committed
129 130 131 132 133 134 135
  Arg [1] : (optional) Bio::EnsEMBL::Analysis $value
  Title   :   analysis
  Usage   :   $value = $self->analysis;
              $self->analysis($$analysis);
  Description: Get/Set analysis object of this Worker
  DefaultValue : undef
  Returntype : Bio::EnsEMBL::Analysis object
136

Jessica Severin's avatar
Jessica Severin committed
137 138 139 140 141 142 143
=cut

sub analysis {
  my $self = shift;
  my $analysis = shift;

  if(defined($analysis)) {
144
    throw("analysis arg must be a [Bio::EnsEMBL::Analysis] not a [$analysis]")
Jessica Severin's avatar
Jessica Severin committed
145 146 147 148 149 150 151 152 153
       unless($analysis->isa('Bio::EnsEMBL::Analysis'));
    $self->{'_analysis'} = $analysis;
  }

  return $self->{'_analysis'};
}


=head2 life_span
154

Jessica Severin's avatar
Jessica Severin committed
155 156 157 158 159 160 161 162 163
  Arg [1] : (optional) integer $value (in seconds)
  Title   :   life_span
  Usage   :   $value = $self->life_span;
              $self->life_span($new_value);
  Description: Defines the maximum time a worker can live for. Workers are always
               allowed to complete the jobs they get, but whether they can
               do multiple rounds of work is limited by their life_span
  DefaultValue : 3600 (60 minutes)
  Returntype : integer scalar
164

Jessica Severin's avatar
Jessica Severin committed
165 166
=cut

167
sub life_span { # default life_span = 60minutes
Jessica Severin's avatar
Jessica Severin committed
168 169 170 171 172 173
  my( $self, $value ) = @_;
  $self->{'_life_span'} = 60*60 unless(defined($self->{'_life_span'}));
  $self->{'_life_span'} = $value if(defined($value));
  return $self->{'_life_span'};
}

174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
sub start_time {
    my $self = shift @_;

    if(@_) {
        $self->{'start_time'} = shift @_;
    }
    return $self->{'start_time'};
}

sub life_span_limit_reached {
    my $self = shift @_;

    if( $self->life_span() ) {
        my $alive_for_secs = time()-$self->start_time();
        if($alive_for_secs > $self->life_span() ) {
            return $alive_for_secs;
        }
    }
    return 0;
}



197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
=head2 job_limit

  Title   :   job_limit
  Arg [1] :   (optional) integer $value
  Usage   :   $value = $self->job_limit;
              $self->job_limit($new_value);
  Description: Defines the maximum number of jobs a worker can process 
               before it needs to die. A worker 'dies' when either the 
               'life_span' or 'job_limit' is exceeded.
  DefaultValue : undef (relies on life_span to limit life of worker)
  Returntype : integer scalar

=cut

sub job_limit {
  my $self=shift;
213
  $self->{'_job_limit'}=shift if(@_);
214 215 216
  return $self->{'_job_limit'};
}

217
sub work_done {
218 219 220
  my $self = shift @_;

  if(@_) {
221
    $self->{'work_done'} = shift @_;
222
  }
223
  return $self->{'work_done'} || 0;
224 225
}

226
sub more_work_done {
227 228
  my $self = shift @_;

229
  $self->{'work_done'}++;
230 231 232 233 234
}

sub job_limit_reached {
    my $self = shift @_;

235 236
    if($self->job_limit and $self->work_done >= $self->job_limit) { 
        return $self->work_done;
237 238 239 240 241 242
    }
    return 0;
}



Leo Gordon's avatar
Leo Gordon committed
243
sub worker_id {
Jessica Severin's avatar
Jessica Severin committed
244
  my( $self, $value ) = @_;
Leo Gordon's avatar
Leo Gordon committed
245 246
  $self->{'_worker_id'} = $value if($value);
  return $self->{'_worker_id'};
Jessica Severin's avatar
Jessica Severin committed
247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
}

sub host {
  my( $self, $value ) = @_;
  $self->{'_host'} = $value if($value);
  return $self->{'_host'};
}

sub process_id {
  my( $self, $value ) = @_;
  $self->{'_ppid'} = $value if($value);
  return $self->{'_ppid'};
}

sub cause_of_death {
  my( $self, $value ) = @_;
  $self->{'_cause_of_death'} = $value if($value);
  return $self->{'_cause_of_death'};
}

267 268 269 270 271 272
sub status {
  my( $self, $value ) = @_;
  $self->{'_status'} = $value if($value);
  return $self->{'_status'};
}

Jessica Severin's avatar
Jessica Severin committed
273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
sub born {
  my( $self, $value ) = @_;
  $self->{'_born'} = $value if($value);
  return $self->{'_born'};
}

sub died {
  my( $self, $value ) = @_;
  $self->{'_died'} = $value if($value);
  return $self->{'_died'};
}

sub last_check_in {
  my( $self, $value ) = @_;
  $self->{'_last_check_in'} = $value if($value);
  return $self->{'_last_check_in'};
}

=head2 output_dir
292

Jessica Severin's avatar
Jessica Severin committed
293 294 295 296 297 298 299 300
  Arg [1] : (optional) string directory path
  Title   :   output_dir
  Usage   :   $value = $self->output_dir;
              $self->output_dir($new_value);
  Description: sets the directory where STDOUT and STRERR will be
	       redirected to. Each worker will create a subdirectory
	       where each analysis_job will get a .out and .err file
  Returntype : string
301

Jessica Severin's avatar
Jessica Severin committed
302 303
=cut

304 305
use Digest::MD5 qw(md5_hex);

Jessica Severin's avatar
Jessica Severin committed
306
sub output_dir {
307 308
  my ($self, $outdir) = @_;
  if ($outdir and (-d $outdir)) {
Leo Gordon's avatar
Leo Gordon committed
309 310
    my $worker_id = $self->worker_id;
    my (@hex) = md5_hex($worker_id) =~ m/\G(..)/g;
311 312
    # If you want more than one level of directories, change $hex[0]
    # below into an array slice.  e.g @hex[0..1] for two levels.
Leo Gordon's avatar
Leo Gordon committed
313
    $outdir = join('/', $outdir, $hex[0], 'worker_id' . $worker_id);
314 315
    system("mkdir -p $outdir") && die "Could not create $outdir\n";
    $self->{'_output_dir'} = $outdir;
Jessica Severin's avatar
Jessica Severin committed
316 317 318 319
  }
  return $self->{'_output_dir'};
}

320

321 322 323 324 325 326 327
sub perform_global_cleanup {
  my $self = shift;
  $self->{'_perform_global_cleanup'} = shift if(@_);
  $self->{'_perform_global_cleanup'} = 1 unless(defined($self->{'_perform_global_cleanup'}));
  return $self->{'_perform_global_cleanup'};
}

Jessica Severin's avatar
Jessica Severin committed
328 329
sub print_worker {
  my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
330
  print("WORKER: worker_id=",$self->worker_id,
Jessica Severin's avatar
Jessica Severin committed
331 332
     " analysis_id=(",$self->analysis->dbID,")",$self->analysis->logic_name,
     " host=",$self->host,
333
     " pid=",$self->process_id,
Jessica Severin's avatar
Jessica Severin committed
334 335 336 337
     "\n");
  print("  batch_size = ", $self->batch_size,"\n");
  print("  job_limit  = ", $self->job_limit,"\n") if(defined($self->job_limit));
  print("  life_span  = ", $self->life_span,"\n") if(defined($self->life_span));
338 339 340 341 342
  if($self->output_dir) {
    print("  output_dir = ", $self->output_dir, "\n") if($self->output_dir);
  } else {
    print("  output_dir = STDOUT/STDERR\n")
  }
Jessica Severin's avatar
Jessica Severin committed
343 344
}

345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366

sub worker_process_temp_directory {
  my $self = shift;
  
  unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
    #create temp directory to hold fasta databases
    $self->{'_tmp_dir'} = "/tmp/worker.$$/";
    mkdir($self->{'_tmp_dir'}, 0777);
    throw("unable to create ".$self->{'_tmp_dir'}) unless(-e $self->{'_tmp_dir'});
  }
  return $self->{'_tmp_dir'};
}


sub cleanup_worker_process_temp_directory {
  my $self = shift;
  if($self->{'_tmp_dir'}) {
    my $cmd = "rm -r ". $self->{'_tmp_dir'};
    system($cmd);
  }
}

Jessica Severin's avatar
Jessica Severin committed
367 368 369 370 371
###############################
#
# WORK section
#
###############################
372

Jessica Severin's avatar
Jessica Severin committed
373
=head2 batch_size
374

375
  Args    :   none
Jessica Severin's avatar
Jessica Severin committed
376 377 378 379 380 381 382
  Title   :   batch_size
  Usage   :   $value = $self->batch_size;
              $self->batch_size($new_value);
  Description: Defines the number of jobs that should run in batch
               before querying the database for the next job batch.  Used by the
               Hive system to manage the number of workers needed to complete a
               particular job type.
383
  DefaultValue : batch_size of analysis
Jessica Severin's avatar
Jessica Severin committed
384
  Returntype : integer scalar
385

Jessica Severin's avatar
Jessica Severin committed
386 387
=cut

388 389 390 391 392 393 394 395
sub set_worker_batch_size {
  my $self = shift;
  my $batch_size = shift;
  if(defined($batch_size)) {
    $self->{'_batch_size'} = $batch_size;
  }
}

Jessica Severin's avatar
Jessica Severin committed
396 397
sub batch_size {
  my $self = shift;
398

399 400
  my $stats = $self->analysis->stats;
  my $batch_size = $stats->batch_size;
401 402 403 404
  if(defined($self->{'_batch_size'})) {
    $batch_size = $self->{'_batch_size'};
  } 
    
405
  if(($batch_size <= 0) and ($stats->avg_msec_per_job)) {
406
    $batch_size = POSIX::ceil($MIN_BATCH_TIME / $stats->avg_msec_per_job); # num jobs in $MIN_BATCH_TIME msecs
407
  }
408 409
  $batch_size = 1 if($batch_size < 1); # make sure we grab at least one job
  
410
  if($self->job_limit and ($self->job_limit < $batch_size)) {
411 412 413
    $batch_size = $self->job_limit;
  }
  return $batch_size;
Jessica Severin's avatar
Jessica Severin committed
414 415 416
}


417 418 419 420 421 422 423 424 425
=head2 run

  Title   :   run
  Usage   :   $worker->run;
  Description: 
    This is a self looping autonomous function to process jobs.
    First all STDOUT/STDERR is rediected, then looping commences.
    Looping consists of 
      1) claiming jobs,
426 427
      2) processing those jobs through an instance of the 'module class' of 
         the analysis asigned to this worker,  
428 429 430 431 432 433 434 435 436 437
      3) updating the analysis_job, analysis_stats, and hive tables to track the 
         progress of the job, the analysis and this worker.
    Looping stops when any one of these are met:
      1) there is no more jobs to process 
      2) job_limit is reached
      3) life_span has been reached.
  Returntype : none

=cut

Jessica Severin's avatar
Jessica Severin committed
438 439 440
sub run
{
  my $self = shift;
441
  my $specific_job = $self->_specific_job;
Jessica Severin's avatar
Jessica Severin committed
442 443 444 445

  if($self->output_dir()) {
    open OLDOUT, ">&STDOUT";
    open OLDERR, ">&STDERR";
446 447
    open WORKER_STDOUT, ">".$self->output_dir()."/worker.out";
    open WORKER_STDERR, ">".$self->output_dir()."/worker.err";
Jessica Severin's avatar
Jessica Severin committed
448 449 450 451 452
    close STDOUT;
    close STDERR;
    open STDOUT, ">&WORKER_STDOUT";
    open STDERR, ">&WORKER_STDERR";
  }
453
  $self->print_worker();
Jessica Severin's avatar
Jessica Severin committed
454

455
  $self->db->dbc->disconnect_when_inactive(0);
Jessica Severin's avatar
Jessica Severin committed
456

457 458 459 460
  do { # Worker's lifespan loop (ends only when the worker dies)
    my $batches_start = time() * 1000;
    my $batches_end = $batches_start;
    my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
461 462 463
    $self->{fetch_time} = 0;
    $self->{run_time} = 0;
    $self->{write_time} = 0;
Jessica Severin's avatar
Jessica Severin committed
464

465 466 467 468 469 470
    do {    # Worker's "batches loop" exists to prevent logging the status too frequently.
            # If a batch took less than $MIN_BATCH_TIME to run, the Worker keeps taking&running more batches.

      my $jobs = $specific_job
        ? [ $self->queen->worker_reclaim_job($self,$specific_job) ]
        : $self->queen->worker_grab_jobs($self);
Jessica Severin's avatar
Jessica Severin committed
471

472
      $self->queen->worker_check_in($self); #will sync analysis_stats if needed
Jessica Severin's avatar
Jessica Severin committed
473

474
      $self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
475

476 477 478 479 480 481 482 483 484 485 486 487 488 489
      if($self->debug) {
        $self->analysis->stats->print_stats;
        print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n");
      }

      foreach my $job (@{$jobs}) {
        $job->print_job if($self->debug); 

        $self->redirect_job_output($job);
        $self->run_module_with_job($job);
        $self->close_and_update_job_output($job);

        $self->queen->worker_register_job_done($self, $job);

490 491 492 493
        if(my $semaphored_job_id = $job->semaphored_job_id) {
            $job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );
        }

494
        $self->more_work_done;
495
      }
496 497 498 499 500 501 502 503 504 505 506
      $batches_end = time() * 1000;
      $jobs_done_by_batches_loop += scalar(@$jobs);

      if( $specific_job ) {
            $self->cause_of_death('JOB_LIMIT'); 
      } elsif( my $jobs_completed = $self->job_limit_reached()) {
            print "job_limit reached (completed $jobs_completed jobs)\n";
            $self->cause_of_death('JOB_LIMIT'); 
      } elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
            print "life_span limit reached (alive for $alive_for_secs secs)\n";
            $self->cause_of_death('LIFESPAN'); 
Leo Gordon's avatar
Leo Gordon committed
507
      }
508
    } while (!$self->cause_of_death and $batches_end-$batches_start < $MIN_BATCH_TIME);
509

510 511 512
        # The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
        # so they are not allowed to happen too frequently (not before $MIN_BATCH_TIME of work has been done)
        #
513
    $self->db->get_AnalysisStatsAdaptor->interval_update_work_done($self->analysis->dbID,
514
        $jobs_done_by_batches_loop, $batches_end-$batches_start, $self);
515

516 517 518
    if (!$self->cause_of_death
    and $self->analysis->stats->hive_capacity >= 0
    and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity) {
519 520 521 522 523 524 525
      my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
                "WHERE num_running_workers > hive_capacity AND analysis_id = " . $self->analysis->stats->analysis_id;
      my $row_count = $self->queen->dbc->do($sql);
      if ($row_count == 1) {
        $self->cause_of_death('HIVE_OVERLOAD');
      }
    }
526
  } while (!$self->cause_of_death); # /Worker's lifespan loop
Leo Gordon's avatar
Leo Gordon committed
527

Leo Gordon's avatar
Leo Gordon committed
528
  $self->queen->dbc->do("UPDATE hive SET status = 'DEAD' WHERE worker_id = ".$self->worker_id);
529 530 531
  
  if($self->perform_global_cleanup) {
    #have runnable cleanup any global/process files/data it may have created
532
    $self->cleanup_worker_process_temp_directory;
533
  }
Jessica Severin's avatar
Jessica Severin committed
534

535
  $self->queen->register_worker_death($self);
Jessica Severin's avatar
Jessica Severin committed
536

537 538
  $self->analysis->stats->print_stats if($self->debug);

539
  printf("dbc %d disconnect cycles\n", $self->db->dbc->disconnect_count);
540
  print("total jobs completed : ", $self->work_done, "\n");
541
  
Jessica Severin's avatar
Jessica Severin committed
542 543 544 545 546 547 548 549 550 551 552
  if($self->output_dir()) {
    close STDOUT;
    close STDERR;
    close WORKER_STDOUT;
    close WORKER_STDERR;
    open STDOUT, ">&", \*OLDOUT;
    open STDERR, ">&", \*OLDERR;
  }
}


553 554
sub run_module_with_job {
  my ($self, $job) = @_;
Jessica Severin's avatar
Jessica Severin committed
555

556 557
  my ($start_time, $end_time);

558
  my $runObj = $self->analysis->process;
Jessica Severin's avatar
Jessica Severin committed
559
  return 0 unless($runObj);
Leo Gordon's avatar
Leo Gordon committed
560
  return 0 unless($job and ($job->worker_id eq $self->worker_id));
Jessica Severin's avatar
Jessica Severin committed
561
  
562
  my $init_time = time() * 1000;
563 564
  $self->queen->dbc->query_count(0);

565 566 567 568
  #pass the input_id from the job into the Process object
  if($runObj->isa("Bio::EnsEMBL::Hive::Process")) { 
    $runObj->input_job($job);
    $runObj->queen($self->queen);
569 570
    $runObj->worker($self);
    $runObj->debug($self->debug);
571 572 573 574
  } else {
    $runObj->input_id($job->input_id);
    $runObj->db($self->db);
  }
575 576 577 578

  my $analysis_stats = $self->analysis->stats;

  $self->enter_status("GET_INPUT");
579
  $job->update_status('GET_INPUT');
580
  print("\nGET_INPUT\n") if($self->debug); 
581 582

  $start_time = time() * 1000;
Jessica Severin's avatar
Jessica Severin committed
583
  $runObj->fetch_input;
584 585
  $end_time = time() * 1000;
  $self->{fetch_time} += $end_time - $start_time;
Jessica Severin's avatar
Jessica Severin committed
586

587
  $self->enter_status("RUN");
588
  $job->update_status('RUN');
589
  print("\nRUN\n") if($self->debug); 
590 591

  $start_time = time() * 1000;
Jessica Severin's avatar
Jessica Severin committed
592
  $runObj->run;
593 594
  $end_time = time() * 1000;
  $self->{run_time} += $end_time - $start_time;
Jessica Severin's avatar
Jessica Severin committed
595

596
  if($self->execute_writes) {
597
    $self->enter_status("WRITE_OUTPUT");
598 599
    $job->update_status('WRITE_OUTPUT');
    print("\nWRITE_OUTPUT\n") if($self->debug); 
600 601

    $start_time = time() * 1000;
602
    $runObj->write_output;
603 604
    $end_time = time() * 1000;
    $self->{write_time} += $end_time - $start_time;
605 606 607
  } else {
    print("\n\n!!!! NOT write_output\n\n\n") if($self->debug); 
  }
608 609
  $self->enter_status("READY");

610
  $job->query_count($self->queen->dbc->query_count);
611 612
  $job->runtime_msec(time()*1000 - $init_time);

613 614
  if ($runObj->isa("Bio::EnsEMBL::Hive::Process") and $runObj->autoflow_inputjob
      and $self->execute_writes) {
615 616 617
    printf("AUTOFLOW input->output\n") if($self->debug);
    $self->queen->flow_output_job($job);
  }
618

619
  return 1;
Jessica Severin's avatar
Jessica Severin committed
620 621
}

622 623 624 625
sub enter_status {
  my ($self, $status) = @_;
  return $self->queen->enter_status($self, $status);
}
Jessica Severin's avatar
Jessica Severin committed
626 627 628 629 630 631 632 633 634

sub redirect_job_output
{
  my $self = shift;
  my $job  = shift;

  my $outdir = $self->output_dir();
  return unless($outdir);
  return unless($job);
635
  return unless($job->adaptor);
Jessica Severin's avatar
Jessica Severin committed
636

637 638
  $job->stdout_file($outdir . "/job_".$job->dbID.".out");
  $job->stderr_file($outdir . "/job_".$job->dbID.".err");
Jessica Severin's avatar
Jessica Severin committed
639 640 641 642 643 644

  close STDOUT;
  open STDOUT, ">".$job->stdout_file;

  close STDERR;
  open STDERR, ">".$job->stderr_file;
645 646

  $job->adaptor->store_out_files($job) if($job->adaptor);
Jessica Severin's avatar
Jessica Severin committed
647 648 649 650 651 652 653 654 655 656
}


sub close_and_update_job_output
{
  my $self = shift;
  my $job  = shift;

  return unless($job);
  return unless($self->output_dir);
657
  return unless($job->adaptor);
Jessica Severin's avatar
Jessica Severin committed
658

659 660 661 662

  # the following flushes $job->stderr_file and $job->stdout_file
  open STDOUT, ">&WORKER_STDOUT";
  open STDERR, ">&WORKER_STDERR";
663

Jessica Severin's avatar
Jessica Severin committed
664
  if(-z $job->stdout_file) {
665
    #print("unlink zero size ", $job->stdout_file, "\n");
Jessica Severin's avatar
Jessica Severin committed
666 667 668 669
    unlink $job->stdout_file;
    $job->stdout_file('');
  }
  if(-z $job->stderr_file) {
670
    #print("unlink zero size ", $job->stderr_file, "\n");
Jessica Severin's avatar
Jessica Severin committed
671 672 673 674 675
    unlink $job->stderr_file;
    $job->stderr_file('');
  }

  $job->adaptor->store_out_files($job) if($job->adaptor);
676

Jessica Severin's avatar
Jessica Severin committed
677 678 679
}


Leo Gordon's avatar
Leo Gordon committed
680 681
# Does not seem to be used anywhere?
#
682 683 684 685 686 687 688 689 690 691
sub check_system_load {
  my $self = shift;

  my $host = hostname;
  my $numCpus = `grep -c '^process' /proc/cpuinfo`;
  print("host: $host  cpus:$numCpus\n");

  return 1;  #everything ok
}

692 693 694 695 696
sub _specific_job {
  my $self = shift;
  $self->{'_specific_job'} = shift if(@_);
  return $self->{'_specific_job'};
}
697

Jessica Severin's avatar
Jessica Severin committed
698
1;