Process.pm 23.2 KB
Newer Older
1 2 3 4
=pod 

=head1 NAME

5
    Bio::EnsEMBL::Hive::Process
6

7
=head1 DESCRIPTION
8

9 10 11 12
    Abstract superclass.  Each Process makes up the individual building blocks 
    of the system.  Instances of these processes are created in a hive workflow 
    graph of Analysis entries that are linked together with dataflow and 
    AnalysisCtrl rules.
13
  
14
    Instances of these Processes are created by the system as work is done.
15 16
    The newly created Process will have preset $self->db, $self->dbc, $self->input_id
    and several other variables. 
17 18 19 20 21 22
    From this input and configuration data, each Process can then proceed to 
    do something.  The flow of execution within a Process is:
        pre_cleanup() if($retry_count>0);   # clean up databases/filesystem before subsequent attempts
        fetch_input();                      # fetch the data from databases/filesystems
        run();                              # perform the main computation 
        write_output();                     # record the results in databases/filesystems
23
        post_healthcheck();                 # check if we got the expected result (optional)
24 25 26 27 28 29 30 31 32 33 34
        post_cleanup();                     # destroy all non-trivial data structures after the job is done
    The developer can implement their own versions of
    pre_cleanup, fetch_input, run, write_output, and post_cleanup to do what they need.  

    The entire system is based around the concept of a workflow graph which
    can split and loop back on itself.  This is accomplished by dataflow
    rules (similar to Unix pipes) that connect one Process (or analysis) to others.
    Where a Unix command line program can send output on STDOUT STDERR pipes, 
    a hive Process has access to unlimited pipes referenced by numerical 
    branch_codes. This is accomplished within the Process via 
    $self->dataflow_output_id(...);  
35
  
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
    The design philosophy is that each Process does its work and creates output, 
    but it doesn't worry about where the input came from, or where its output 
    goes. If the system has dataflow pipes connected, then the output jobs 
    have purpose, if not - the output work is thrown away.  The workflow graph 
    'controls' the behaviour of the system, not the processes.  The processes just 
    need to do their job.  The design of the workflow graph is based on the knowledge 
    of what each Process does so that the graph can be correctly constructed.
    The workflow graph can be constructed a priori or can be constructed and 
    modified by intelligent Processes as the system runs.


    The Hive is based on AI concepts and modeled on the social structure and 
    behaviour of a honey bee hive. So where a worker honey bee's purpose is
    (go find pollen, bring back to hive, drop off pollen, repeat), an ensembl-hive 
    worker's purpose is (find a job, create a Process for that job, run it,
    drop off output job(s), repeat).  While most workflow systems are based 
    on 'smart' central controllers and external control of 'dumb' processes, 
    the Hive is based on 'dumb' workflow graphs and job kiosk, and 'smart' workers 
    (autonomous agents) who are self configuring and figure out for themselves what 
    needs to be done, and then do it.  The workers are based around a set of 
    emergent behaviour rules which allow a predictible system behaviour to emerge 
    from what otherwise might appear at first glance to be a chaotic system. There 
    is an inherent asynchronous disconnect between one worker and the next.  
    Work (or jobs) are simply 'posted' on a blackboard or kiosk within the hive 
    database where other workers can find them.  
    The emergent behaviour rules of a worker are:
    1) If a job is posted, someone needs to do it.
    2) Don't grab something that someone else is working on
    3) Don't grab more than you can handle
    4) If you grab a job, it needs to be finished correctly
    5) Keep busy doing work
    6) If you fail, do the best you can to report back

    For further reading on the AI principles employed in this design see:
        http://en.wikipedia.org/wiki/Autonomous_Agent
        http://en.wikipedia.org/wiki/Emergence

=head1 LICENSE

75
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Brandon Walts's avatar
Brandon Walts committed
76
    Copyright [2016-2019] EMBL-European Bioinformatics Institute
77 78 79 80 81 82 83 84 85

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
86 87 88

=head1 CONTACT

89
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
90 91 92

=head1 APPENDIX

93 94
    The rest of the documentation details each of the object methods. 
    Internal methods are usually preceded with a _
95 96 97

=cut

98

99 100 101
package Bio::EnsEMBL::Hive::Process;

use strict;
102
use warnings;
103

104
use File::Path qw(make_path remove_tree);
105
use JSON;
106
use Scalar::Util qw(looks_like_number);
107
use Time::HiRes qw(time);
108

109
use Bio::EnsEMBL::Hive::Utils ('stringify', 'go_figure_dbc', 'join_command_args', 'timeout');
110
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
111

112

113
sub new {
114
    my $class = shift @_;
115 116 117 118 119 120 121 122

    my $self = bless {}, $class;

    return $self;
}


sub life_cycle {
123
    my ($self) = @_;
124 125 126 127 128

    my $job = $self->input_job();
    my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    my %job_partial_timing = ();

129
    $job->incomplete(1);    # reinforce, in case the life_cycle is not run by a Worker
130 131
    $job->autoflow(1);

132
    eval {
133 134 135
        # Catch all the "warn" calls
        #$SIG{__WARN__} = sub { $self->warning(@_) };

136 137 138 139
        if( $self->can('pre_cleanup') and $job->retry_count()>0 ) {
            $self->enter_status('PRE_CLEANUP');
            $self->pre_cleanup;
        }
140

141 142
        # PRE_HEALTHCHECK can come here

143 144 145
        $self->enter_status('FETCH_INPUT');
        $partial_stopwatch->restart();
        $self->fetch_input;
146
        $job_partial_timing{'FETCH_INPUT'} = $partial_stopwatch->pause->get_elapsed;
147

148
        $self->enter_status('RUN');
149
        $partial_stopwatch->restart();
150
        $self->run;
151
        $job_partial_timing{'RUN'} = $partial_stopwatch->pause->get_elapsed;
152

153
        if($self->worker->execute_writes) {
154 155 156
            $self->enter_status('WRITE_OUTPUT');
            $partial_stopwatch->restart();
            $self->write_output;
157
            $job_partial_timing{'WRITE_OUTPUT'} = $partial_stopwatch->pause->get_elapsed;
158 159 160 161 162

            if( $self->can('post_healthcheck') ) {
                $self->enter_status('POST_HEALTHCHECK');
                $self->post_healthcheck;
            }
163
        } else {
164
            $self->say_with_header( ": *no* WRITE_OUTPUT requested, so there will be no AUTOFLOW" );
165 166
        }
    };
167 168
    # Restore the default handler
    #$SIG{__WARN__} = 'DEFAULT';
169

170 171
    if(my $life_cycle_msg = $@) {
        $job->died_somewhere( $job->incomplete );  # it will be OR'd inside
172
        Bio::EnsEMBL::Hive::Process::warning($self, $life_cycle_msg, $job->incomplete?'WORKER_ERROR':'INFO');     # In case the Runnable has redefined warning()
173
    }
174 175 176

    if( $self->can('post_cleanup') ) {   # may be run to clean up memory even after partially failed attempts
        eval {
177
            $job->incomplete(1);    # it could have been reset by a previous call to complete_early
178 179 180
            $self->enter_status('POST_CLEANUP');
            $self->post_cleanup;
        };
181 182
        if(my $post_cleanup_msg = $@) {
            $job->died_somewhere( $job->incomplete );  # it will be OR'd inside
183
            Bio::EnsEMBL::Hive::Process::warning($self, $post_cleanup_msg, $job->incomplete?'WORKER_ERROR':'INFO');   # In case the Runnable has redefined warning()
184 185
        }
    }
186

187 188 189
    unless( $job->died_somewhere ) {

        if( $self->execute_writes and $job->autoflow ) {    # AUTOFLOW doesn't have its own status so will have whatever previous state of the job
190
            $self->say_with_header( ': AUTOFLOW input->output' );
191
            $job->dataflow_output_id();
192 193
        }

194 195 196
        my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
        if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
            $job->transient_error(0);
Matthieu Muffato's avatar
Matthieu Muffato committed
197
            die "The group of semaphored jobs is incomplete ! Some fan jobs (coming from dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") are missing a job on their funnel. Check the order of your dataflow_output_id() calls.";
198
        }
199

200
        $job->incomplete(0);
201

202 203
        return \%job_partial_timing;
    }
204 205 206
}


207 208 209 210 211 212 213 214 215
sub say_with_header {
    my ($self, $msg, $important) = @_;

    $important //= $self->debug();

    if($important) {
        if(my $worker = $self->worker) {
            $worker->worker_say( $msg );
        } else {
216
            print "StandaloneJob $msg\n";
217 218 219 220 221
        }
    }
}


222 223 224
sub enter_status {
    my ($self, $status) = @_;

225
    my $job = $self->input_job();
226

227
    $job->set_and_update_status( $status );
228 229

    if(my $worker = $self->worker) {
230
        $worker->set_and_update_status( 'JOB_LIFECYCLE' );  # to ensure when_checked_in TIMESTAMP is updated
231 232 233 234 235 236 237
    }

    $self->say_with_header( '-> '.$status );
}


sub warning {
238
    my ($self, $msg, $message_class) = @_;
239

240
    $message_class = 'WORKER_ERROR' if $message_class && looks_like_number($message_class);
241
    $message_class ||= 'INFO';
242 243
    chomp $msg;

244
    $self->say_with_header( "$message_class : $msg", 1 );
245 246

    my $job = $self->input_job;
247
    my $worker = $self->worker;
248

249
    if(my $job_adaptor = ($job && $job->adaptor)) {
250
        $job_adaptor->db->get_LogMessageAdaptor()->store_job_message($job->dbID, $msg, $message_class);
251
    } elsif(my $worker_adaptor = ($worker && $worker->adaptor)) {
252
        $worker_adaptor->db->get_LogMessageAdaptor()->store_worker_message($worker, $msg, $message_class);
253
    }
254 255
}

256 257 258 259 260 261 262 263

##########################################
#
# methods subclasses should override 
# in order to give this process function
#
##########################################

264 265 266 267 268 269 270 271 272 273 274 275 276

=head2 param_defaults

    Title   :  param_defaults
    Function:  sublcass can define defaults for all params used by the RunnableDB/Process

=cut

sub param_defaults {
    return {};
}


277 278 279
#
## Function: sublcass can implement functions related to cleaning up the database/filesystem after the previous unsuccessful run.
#
280 281 282 283 284 285 286 287

# sub pre_cleanup {
#    my $self = shift;
#
#    return 1;
# }


288 289 290 291
=head2 fetch_input

    Title   :  fetch_input
    Function:  sublcass can implement functions related to data fetching.
292 293 294
               Typical acivities would be to parse $self->input_id .
               Subclasses may also want to fetch data from databases
               or from files within this function.
295 296 297 298

=cut

sub fetch_input {
Leo Gordon's avatar
Leo Gordon committed
299 300 301
    my $self = shift;

    return 1;
302 303
}

304

305 306 307 308 309 310 311 312 313 314 315
=head2 run

    Title   :  run
    Function:  sublcass can implement functions related to process execution.
               Typical activities include running external programs or running
               algorithms by calling perl methods.  Process may also choose to
               parse results into memory if an external program was used.

=cut

sub run {
Leo Gordon's avatar
Leo Gordon committed
316 317 318
    my $self = shift;

    return 1;
319 320
}

321

322 323 324 325 326 327 328 329 330 331
=head2 write_output

    Title   :  write_output
    Function:  sublcass can implement functions related to storing results.
               Typical activities including writing results into database tables
               or into files on a shared filesystem.
               
=cut

sub write_output {
Leo Gordon's avatar
Leo Gordon committed
332 333 334
    my $self = shift;

    return 1;
335 336 337
}


338 339 340 341
#
## Function:  sublcass can implement functions related to cleaning up after running one job
#               (destroying non-trivial data structures in memory).
#
342

343 344 345 346 347
#sub post_cleanup {
#    my $self = shift;
#
#    return 1;
#}
348 349 350 351 352 353 354 355 356 357


######################################################
#
# methods that subclasses can use to get access
# to hive infrastructure
#
######################################################


358
=head2 worker
359

360 361 362
    Title   :   worker
    Usage   :   my $worker = $self->worker;
    Function:   returns the Worker object this Process is run by
363
    Returns :   Bio::EnsEMBL::Hive::Worker
364 365 366

=cut

367
sub worker {
368 369 370 371
    my $self = shift;

    $self->{'_worker'} = shift if(@_);
    return $self->{'_worker'};
372 373
}

374

375 376 377
sub execute_writes {
    my $self = shift;

378
    return $self->worker->execute_writes(@_);
379 380 381
}


382 383 384 385 386 387 388 389 390 391
=head2 db

    Title   :   db
    Usage   :   my $hiveDBA = $self->db;
    Function:   returns DBAdaptor to Hive database
    Returns :   Bio::EnsEMBL::Hive::DBSQL::DBAdaptor

=cut

sub db {
392 393
    my $self = shift;

394
    return $self->worker->adaptor && $self->worker->adaptor->db(@_);
395 396
}

397

398 399 400 401 402
=head2 dbc

    Title   :   dbc
    Usage   :   my $hiveDBConnection = $self->dbc;
    Function:   returns DBConnection to Hive database
403
    Returns :   Bio::EnsEMBL::Hive::DBSQL::DBConnection
404 405 406

=cut

407
sub dbc {
408 409
    my $self = shift;

410
    return $self->db && $self->db->dbc;
411 412
}

413

414
=head2 data_dbc
415

416 417
    Title   :   data_dbc
    Usage   :   my $data_dbc = $self->data_dbc;
418
    Function:   returns a Bio::EnsEMBL::Hive::DBSQL::DBConnection object (the "current" one by default, but can be set up otherwise)
419
    Returns :   Bio::EnsEMBL::Hive::DBSQL::DBConnection
420 421 422

=cut

423
sub data_dbc {
424 425
    my $self = shift @_;

426 427 428
    my $given_db_conn   = shift @_ || ($self->param_is_defined('db_conn') ? $self->param('db_conn') : $self);
    my $given_ref = ref( $given_db_conn );
    my $given_signature = ($given_ref eq 'ARRAY' or $given_ref eq 'HASH') ? stringify ( $given_db_conn ) : "$given_db_conn";
429

430 431 432 433 434 435 436
    if (!$self->param_is_defined('db_conn') and !$self->db and !$self->dbc) {
        # go_figure_dbc won't be able to create a DBConnection, so let's
        # just print a nicer error message
        $self->input_job->transient_error(0);
        throw('In standaloneJob mode, $self->data_dbc requires the -db_conn parameter to be defined on the command-line');
    }

437 438
    if( !$self->{'_cached_db_signature'} or ($self->{'_cached_db_signature'} ne $given_signature) ) {
        $self->{'_cached_db_signature'} = $given_signature;
439
        $self->{'_cached_data_dbc'} = go_figure_dbc( $given_db_conn );
440 441
    }

442
    return $self->{'_cached_data_dbc'};
443 444
}

445

446 447 448
=head2 run_system_command

    Title   :  run_system_command
Matthieu Muffato's avatar
Matthieu Muffato committed
449 450 451 452
    Arg[1]  :  (string or arrayref) Command to be run
    Arg[2]  :  (hashref, optional) Options, amongst:
                 - use_bash_pipefail: when enabled, a command with pipes will require all sides to succeed
                 - use_bash_errexit: when enabled, will stop at the first failure (otherwise commands such as "do_something_that_fails; do_something_that_succeeds" would return 0)
453
                 - timeout: the maximum number of seconds the command can run for. Will return the exit code -2 if the command has to be aborted
454 455 456 457
    Usage   :  my $return_code = $self->run_system_command('script.sh with many_arguments');   # Command as a single string
               my $return_code = $self->run_system_command(['script.sh', 'arg1', 'arg2']);     # Command as an array-ref
               my ($return_code, $stderr, $string_command) = $self->run_system_command(['script.sh', 'arg1', 'arg2']);     # Same in list-context. $string_command will be "script.sh arg1 arg2"
               my $return_code = $self->run_system_command('script1.sh with many_arguments | script2.sh', {'use_bash_pipefail' => 1});  # Command with pipes evaluated in a bash "pipefail" environment
Matthieu Muffato's avatar
Matthieu Muffato committed
458
    Function:  Runs a command given as a single-string or an array-ref. The second argument is a list of options
459 460 461 462 463 464 465
    Returns :  Returns the return-code in scalar context, or a triplet (return-code, standard-error, command) in list context

=cut

sub run_system_command {
    my ($self, $cmd, $options) = @_;

466 467
    require Capture::Tiny;

468
    $options //= {};
469
    my ($join_needed, $flat_cmd) = join_command_args($cmd);
470 471 472 473 474 475 476 477 478 479 480 481
    my @cmd_to_run;

    my $need_bash = $options->{'use_bash_pipefail'} || $options->{'use_bash_errexit'};
    if ($need_bash) {
        @cmd_to_run = ('bash',
                       $options->{'use_bash_pipefail'} ? ('-o' => 'pipefail') : (),
                       $options->{'use_bash_errexit'} ? ('-o' => 'errexit') : (),
                       '-c' => $flat_cmd);
    } else {
        # Let's use the array if possible, it saves us from running a shell
        @cmd_to_run = $join_needed ? $flat_cmd : (ref($cmd) ? @$cmd : $cmd)
    }
482 483 484 485 486 487 488

    $self->say_with_header("Command given: " . stringify($cmd));
    $self->say_with_header("Command to run: " . stringify(\@cmd_to_run));

    $self->dbc and $self->dbc->disconnect_if_idle();    # release this connection for the duration of system() call

    my $return_value;
489 490 491

    # Capture:Tiny has weird behavior if 'require'd instead of 'use'd
    # see, for example,http://www.perlmonks.org/?node_id=870439 
492
    my $starttime = time() * 1000;
493
    my ($stdout, $stderr) = Capture::Tiny::tee(sub {
494
        $return_value = timeout( sub {system(@cmd_to_run)}, $options->{'timeout'} );
495
    });
496 497
    # FIXME: on LSF we could perhaps wait a little bit for the MEM/RUNLIMIT
    # to really kick in, so that we don't return the wrong diagnostic
498
    die sprintf("Could not run '%s', got %s\nSTDERR %s\n", $flat_cmd, $return_value, $stderr) if $return_value && $options->{die_on_failure};
499

500
    return ($return_value, $stderr, $flat_cmd, $stdout, time()*1000-$starttime) if wantarray;
501 502 503 504
    return $return_value;
}


505 506 507
=head2 input_job

    Title   :  input_job
508
    Function:  Returns the AnalysisJob to be run by this process
509 510
               Subclasses should treat this as a read_only object.          
    Returns :  Bio::EnsEMBL::Hive::AnalysisJob object
511 512 513 514

=cut

sub input_job {
515 516 517 518 519 520 521 522
    my $self = shift @_;

    if(@_) {
        if(my $job = $self->{'_input_job'} = shift) {
            throw("Not a Bio::EnsEMBL::Hive::AnalysisJob object") unless ($job->isa("Bio::EnsEMBL::Hive::AnalysisJob"));
        }
    }
    return $self->{'_input_job'};
523 524 525
}


526
# ##################### subroutines that link through to Job's methods #########################
527

528
sub input_id {
529 530 531
    my $self = shift;

    return $self->input_job->input_id(@_);
532 533 534 535 536 537 538 539
}

sub param {
    my $self = shift @_;

    return $self->input_job->param(@_);
}

540 541 542
sub param_required {
    my $self = shift @_;

543 544 545 546 547 548 549
    my $prev_transient_error = $self->input_job->transient_error(); # make a note of previously set transience status
    $self->input_job->transient_error(0);                           # make sure if we die in param_required it is not transient

    my $value = $self->input_job->param_required(@_);

    $self->input_job->transient_error($prev_transient_error);       # restore the previous transience status
    return $value;
550 551
}

552 553 554 555 556 557
sub param_exists {
    my $self = shift @_;

    return $self->input_job->param_exists(@_);
}

558 559 560 561 562 563
sub param_is_defined {
    my $self = shift @_;

    return $self->input_job->param_is_defined(@_);
}

564 565 566 567 568
sub param_substitute {
    my $self = shift @_;

    return $self->input_job->param_substitute(@_);
}
569 570

sub dataflow_output_id {
571 572
    my $self = shift @_;

573 574
    # Let's not spend time stringifying a large object if it's not going to be printed anyway
    $self->say_with_header('Dataflow on branch #' . ($_[1] // 1) . (defined $_[0] ? ' of ' . stringify($_[0]) : ' (no parameters -> input parameters repeated)')) if $self->debug;
575
    return $self->input_job->dataflow_output_id(@_);
576 577
}

578

579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611
=head2 dataflow_output_ids_from_json

    Title   :  dataflow_output_ids_from_json
    Arg[1]  :  File name
    Arg[2]  :  (optional) Branch number, defaults to 1 (see L<AnalysisJob::dataflow_output_id>)
    Function:  Wrapper around L<dataflow_output_id> that takes the output_ids from a JSON file.
               Each line in the JSON file is expected to be a complete JSON structure, which
               may be prefixed with a branch number

=cut

sub dataflow_output_ids_from_json {
    my ($self, $filename, $default_branch) = @_;

    my $json_formatter = JSON->new()->indent(0);
    my @output_job_ids;
    open(my $fh, '<', $filename) or die "Could not open '$filename' because: $!";
    while (my $l = $fh->getline()) {
        chomp $l;
        my $branch = $default_branch;
        my $json = $l;
        if ($l =~ /^(-?\d+)\s+(.*)$/) {
            $branch = $1;
            $json = $2;
        }
        my $hash = $json_formatter->decode($json);
        push @output_job_ids, @{ $self->dataflow_output_id($hash, $branch) };
    }
    close($fh);
    return \@output_job_ids;
}


612 613 614 615 616 617 618
sub throw {
    my $msg = pop @_;

    Bio::EnsEMBL::Hive::Utils::throw( $msg );   # this module doesn't import 'throw' to avoid namespace clash
}


619 620 621 622 623 624 625 626 627 628 629
=head2 complete_early

  Arg[1]      : (string) message
  Arg[2]      : (integer, optional) branch number
  Description : Ends the job with the given message, whilst marking the job as complete
                Dataflows to the given branch right before if a branch number if given,
                in which case the autoflow is disabled too.
  Returntype  : This function does not return

=cut

630
sub complete_early {
631
    my ($self, $msg, $branch_code) = @_;
632

633 634 635 636
    if (defined $branch_code) {
        $self->dataflow_output_id(undef, $branch_code);
        $self->input_job->autoflow(0);
    }
637 638 639 640 641
    $self->input_job->incomplete(0);
    die $msg;
}


642 643 644 645 646 647 648 649 650
=head2 debug

    Title   :  debug
    Function:  Gets/sets flag for debug level. Set through Worker/runWorker.pl
               Subclasses should treat as a read_only variable.
    Returns :  integer

=cut

651
sub debug {
652 653
    my $self = shift;

654
    return $self->worker->debug(@_);
655 656 657
}


658 659 660 661 662 663
=head2 worker_temp_directory

    Title   :  worker_temp_directory
    Function:  Returns a path to a directory on the local /tmp disk 
               which the subclass can use as temporary file space.
               This directory is made the first time the function is called.
664 665
               It persists for as long as the worker is alive.  This allows
               multiple jobs run by the worker to potentially share temp data.
666 667 668 669 670 671 672 673 674 675
               For example the worker (which is a single Analysis) might need
               to dump a datafile file which is needed by all jobs run through 
               this analysis.  The process can first check the worker_temp_directory
               for the file and dump it if it is missing.  This way the first job
               run by the worker will do the dump, but subsequent jobs can reuse the 
               file.
    Usage   :  $tmp_dir = $self->worker_temp_directory;
    Returns :  <string> path to a local (/tmp) directory 

=cut
676 677

sub worker_temp_directory {
678 679
    my $self = shift @_;

680
    unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
681
        $self->{'_tmp_dir'} = $self->worker->temp_directory_name();
682
        make_path( $self->{'_tmp_dir'}, { mode => 0777 } );
683
        throw("unable to create a writable directory ".$self->{'_tmp_dir'}) unless(-w $self->{'_tmp_dir'});
684
    }
685
    return $self->{'_tmp_dir'};
686 687
}

688 689 690 691 692 693 694 695 696

=head2 cleanup_worker_temp_directory

    Title   :  cleanup_worker_temp_directory
    Function:  Cleans up the directory on the local /tmp disk that is used for the
               worker. It can be used to remove files left there by previous jobs.
    Usage   :  $self->cleanup_worker_temp_directory;

=cut
697 698 699 700

sub cleanup_worker_temp_directory {
    my $self = shift @_;

701 702
    if(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
        remove_tree($self->{'_tmp_dir'}, {error => undef});
703 704 705 706
    }
}


707 708
1;