Process.pm 16.3 KB
Newer Older
1 2 3 4
=pod 

=head1 NAME

5
    Bio::EnsEMBL::Hive::Process
6

7
=head1 DESCRIPTION
8

9 10 11 12
    Abstract superclass.  Each Process makes up the individual building blocks 
    of the system.  Instances of these processes are created in a hive workflow 
    graph of Analysis entries that are linked together with dataflow and 
    AnalysisCtrl rules.
13
  
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
    Instances of these Processes are created by the system as work is done.
    The newly created Process will have preset $self->db, $self->dbc, 
    $self->input_id, $self->analysis and several other variables. 
    From this input and configuration data, each Process can then proceed to 
    do something.  The flow of execution within a Process is:
        pre_cleanup() if($retry_count>0);   # clean up databases/filesystem before subsequent attempts
        fetch_input();                      # fetch the data from databases/filesystems
        run();                              # perform the main computation 
        write_output();                     # record the results in databases/filesystems
        post_cleanup();                     # destroy all non-trivial data structures after the job is done
    The developer can implement their own versions of
    pre_cleanup, fetch_input, run, write_output, and post_cleanup to do what they need.  

    The entire system is based around the concept of a workflow graph which
    can split and loop back on itself.  This is accomplished by dataflow
    rules (similar to Unix pipes) that connect one Process (or analysis) to others.
    Where a Unix command line program can send output on STDOUT STDERR pipes, 
    a hive Process has access to unlimited pipes referenced by numerical 
    branch_codes. This is accomplished within the Process via 
    $self->dataflow_output_id(...);  
34
  
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
    The design philosophy is that each Process does its work and creates output, 
    but it doesn't worry about where the input came from, or where its output 
    goes. If the system has dataflow pipes connected, then the output jobs 
    have purpose, if not - the output work is thrown away.  The workflow graph 
    'controls' the behaviour of the system, not the processes.  The processes just 
    need to do their job.  The design of the workflow graph is based on the knowledge 
    of what each Process does so that the graph can be correctly constructed.
    The workflow graph can be constructed a priori or can be constructed and 
    modified by intelligent Processes as the system runs.


    The Hive is based on AI concepts and modeled on the social structure and 
    behaviour of a honey bee hive. So where a worker honey bee's purpose is
    (go find pollen, bring back to hive, drop off pollen, repeat), an ensembl-hive 
    worker's purpose is (find a job, create a Process for that job, run it,
    drop off output job(s), repeat).  While most workflow systems are based 
    on 'smart' central controllers and external control of 'dumb' processes, 
    the Hive is based on 'dumb' workflow graphs and job kiosk, and 'smart' workers 
    (autonomous agents) who are self configuring and figure out for themselves what 
    needs to be done, and then do it.  The workers are based around a set of 
    emergent behaviour rules which allow a predictible system behaviour to emerge 
    from what otherwise might appear at first glance to be a chaotic system. There 
    is an inherent asynchronous disconnect between one worker and the next.  
    Work (or jobs) are simply 'posted' on a blackboard or kiosk within the hive 
    database where other workers can find them.  
    The emergent behaviour rules of a worker are:
    1) If a job is posted, someone needs to do it.
    2) Don't grab something that someone else is working on
    3) Don't grab more than you can handle
    4) If you grab a job, it needs to be finished correctly
    5) Keep busy doing work
    6) If you fail, do the best you can to report back

    For further reading on the AI principles employed in this design see:
        http://en.wikipedia.org/wiki/Autonomous_Agent
        http://en.wikipedia.org/wiki/Emergence

=head1 LICENSE

74
    Copyright [1999-2014] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
75 76 77 78 79 80 81 82 83

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
84 85 86

=head1 CONTACT

87
    Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
88 89 90

=head1 APPENDIX

91 92
    The rest of the documentation details each of the object methods. 
    Internal methods are usually preceded with a _
93 94 95

=cut

96

97 98 99
package Bio::EnsEMBL::Hive::Process;

use strict;
100
use warnings;
101

102
use Bio::EnsEMBL::Registry;
103
use Bio::EnsEMBL::Utils::Argument ('rearrange');
104
use Bio::EnsEMBL::Utils::Exception ('throw');
105

106
use Bio::EnsEMBL::Hive::DBSQL::DBConnection;
107
use Bio::EnsEMBL::Hive::Utils ('stringify', 'go_figure_dbc');
108
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
109

110 111
use base ('Bio::EnsEMBL::Utils::Exception');   # provide these methods for deriving classes

112

113
sub new {
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
    my ($class, @args) = @_;

    my $self = bless {}, $class;

    my ($analysis) = rearrange([qw( ANALYSIS )], @args);
    $self->analysis($analysis) if($analysis);

    return $self;
}


sub life_cycle {
    my ($self, $worker) = @_;

    my $job = $self->input_job();
    my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    my %job_partial_timing = ();

    $job->autoflow(1);

    if( $self->can('pre_cleanup') and $job->retry_count()>0 ) {
        $self->enter_status('PRE_CLEANUP');
        $self->pre_cleanup;
    }

    $self->enter_status('FETCH_INPUT');
    $partial_stopwatch->restart();
    $self->fetch_input;
    $job_partial_timing{'FETCH_INPUT'} = $partial_stopwatch->get_elapsed();

    $self->enter_status('RUN');
    $partial_stopwatch->restart();
    $self->run;
    $job_partial_timing{'RUN'} = $partial_stopwatch->get_elapsed();

    if($self->execute_writes) {
        $self->enter_status('WRITE_OUTPUT');
        $partial_stopwatch->restart();
        $self->write_output;
        $job_partial_timing{'WRITE_OUTPUT'} = $partial_stopwatch->get_elapsed();

        if( $job->autoflow ) {
            print STDERR "\njob ".$job->dbID." : AUTOFLOW input->output\n" if($self->debug);
            $job->dataflow_output_id();
        }
    } else {
        print STDERR "\n!!! *no* WRITE_OUTPUT requested, so there will be no AUTOFLOW\n" if($self->debug); 
    }

    if( $self->can('post_cleanup') ) {   # Todo: may need to run it after the eval, to clean up the memory even after partially failed attempts?
        $self->enter_status('POST_CLEANUP');
        $self->post_cleanup;
    }

    my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
    if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
        $job->transient_error(0);
        die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
    }

    return \%job_partial_timing;
}


sub enter_status {
    my ($self, $status) = @_;

181
    my $job = $self->input_job();
182

183 184 185 186 187 188 189 190
    $job->update_status( $status );

    my $status_msg  = 'Job '.$job->dbID.' : '.$status;

    if(my $worker = $self->worker) {
        $worker->enter_status( $status, $status_msg );
    } elsif($self->debug) {
        print STDERR "Standalone$status_msg\n";
191
    }
192 193
}

194 195 196 197 198 199 200 201

##########################################
#
# methods subclasses should override 
# in order to give this process function
#
##########################################

202 203 204
=head2 strict_hash_format

    Title   :  strict_hash_format
205
    Function:  if a subclass wants more flexibility in parsing job.input_id and analysis.parameters,
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
               it should redefine this method to return 0

=cut

sub strict_hash_format {
    return 1;
}


=head2 param_defaults

    Title   :  param_defaults
    Function:  sublcass can define defaults for all params used by the RunnableDB/Process

=cut

sub param_defaults {
    return {};
}


227 228 229 230 231 232 233 234 235 236 237 238 239 240
=head2 pre_cleanup

    Title   :  pre_cleanup
    Function:  sublcass can implement functions related to cleaning up the database/filesystem after the previous unsuccessful run.
               
=cut

# sub pre_cleanup {
#    my $self = shift;
#
#    return 1;
# }


241 242 243 244 245 246 247 248 249 250 251 252
=head2 fetch_input

    Title   :  fetch_input
    Function:  sublcass can implement functions related to data fetching.
               Typical acivities would be to parse $self->input_id and read
               configuration information from $self->analysis.  Subclasses
               may also want to fetch data from databases or from files 
               within this function.

=cut

sub fetch_input {
Leo Gordon's avatar
Leo Gordon committed
253 254 255
    my $self = shift;

    return 1;
256 257
}

258

259 260 261 262 263 264 265 266 267 268 269
=head2 run

    Title   :  run
    Function:  sublcass can implement functions related to process execution.
               Typical activities include running external programs or running
               algorithms by calling perl methods.  Process may also choose to
               parse results into memory if an external program was used.

=cut

sub run {
Leo Gordon's avatar
Leo Gordon committed
270 271 272
    my $self = shift;

    return 1;
273 274
}

275

276 277 278 279 280 281 282 283 284 285
=head2 write_output

    Title   :  write_output
    Function:  sublcass can implement functions related to storing results.
               Typical activities including writing results into database tables
               or into files on a shared filesystem.
               
=cut

sub write_output {
Leo Gordon's avatar
Leo Gordon committed
286 287 288
    my $self = shift;

    return 1;
289 290 291
}


292
=head2 post_cleanup
293

294 295 296 297
    Title   :  post_cleanup
    Function:  sublcass can implement functions related to cleaning up after running one job
               (destroying non-trivial data structures in memory).
               
298 299
=cut

300 301 302 303 304
#sub post_cleanup {
#    my $self = shift;
#
#    return 1;
#}
305 306 307 308 309 310 311 312 313 314


######################################################
#
# methods that subclasses can use to get access
# to hive infrastructure
#
######################################################


315
=head2 worker
316

317 318 319
    Title   :   worker
    Usage   :   my $worker = $self->worker;
    Function:   returns the Worker object this Process is run by
320
    Returns :   Bio::EnsEMBL::Hive::Worker
321 322 323

=cut

324
sub worker {
325 326 327 328
    my $self = shift;

    $self->{'_worker'} = shift if(@_);
    return $self->{'_worker'};
329 330
}

331

332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
=head2 execute_writes

    Title   :   execute_writes
    Usage   :   $self->execute_writes( 1 );
    Function:   getter/setter for whether we want the 'write_output' method to be run
    Returns :   boolean

=cut

sub execute_writes {
    my $self = shift;

    $self->{'_execute_writes'} = shift if(@_);
    return $self->{'_execute_writes'};
}


349 350 351 352 353 354 355 356 357 358
=head2 db

    Title   :   db
    Usage   :   my $hiveDBA = $self->db;
    Function:   returns DBAdaptor to Hive database
    Returns :   Bio::EnsEMBL::Hive::DBSQL::DBAdaptor

=cut

sub db {
359 360
    my $self = shift;

361 362
    $self->{'_db'} = shift if(@_);
    return $self->{'_db'};
363 364
}

365

366 367 368 369 370
=head2 dbc

    Title   :   dbc
    Usage   :   my $hiveDBConnection = $self->dbc;
    Function:   returns DBConnection to Hive database
371
    Returns :   Bio::EnsEMBL::Hive::DBSQL::DBConnection
372 373 374

=cut

375
sub dbc {
376 377
    my $self = shift;

378
    return $self->db && $self->db->dbc;
379 380
}

381

382
=head2 data_dbc
383

384 385
    Title   :   data_dbc
    Usage   :   my $data_dbc = $self->data_dbc;
386
    Function:   returns a Bio::EnsEMBL::Hive::DBSQL::DBConnection object (the "current" one by default, but can be set up otherwise)
387
    Returns :   Bio::EnsEMBL::DBSQL::DBConnection
388 389 390

=cut

391
sub data_dbc {
392 393
    my $self = shift @_;

394 395 396
    my $given_db_conn   = shift @_ || ($self->param_is_defined('db_conn') ? $self->param('db_conn') : $self);
    my $given_ref = ref( $given_db_conn );
    my $given_signature = ($given_ref eq 'ARRAY' or $given_ref eq 'HASH') ? stringify ( $given_db_conn ) : "$given_db_conn";
397

398 399
    if( !$self->{'_cached_db_signature'} or ($self->{'_cached_db_signature'} ne $given_signature) ) {
        $self->{'_cached_db_signature'} = $given_signature;
400
        $self->{'_cached_data_dbc'} = go_figure_dbc( $given_db_conn );
401 402
    }

403
    return $self->{'_cached_data_dbc'};
404 405
}

406

407 408 409 410
=head2 analysis

    Title   :  analysis
    Usage   :  $self->analysis;
411 412
    Function:  Returns the Analysis object associated with this
               instance of the Process.
413
    Returns :  Bio::EnsEMBL::Hive::Analysis object
414 415 416 417 418 419 420

=cut

sub analysis {
  my ($self, $analysis) = @_;

  if($analysis) {
421 422
    throw("Not a Bio::EnsEMBL::Hive::Analysis object")
      unless ($analysis->isa("Bio::EnsEMBL::Hive::Analysis"));
423 424 425 426 427 428 429 430
    $self->{'_analysis'} = $analysis;
  }
  return $self->{'_analysis'};
}

=head2 input_job

    Title   :  input_job
431
    Function:  Returns the AnalysisJob to be run by this process
432 433
               Subclasses should treat this as a read_only object.          
    Returns :  Bio::EnsEMBL::Hive::AnalysisJob object
434 435 436 437 438 439 440 441 442 443 444 445 446 447

=cut

sub input_job {
  my( $self, $job ) = @_;
  if($job) {
    throw("Not a Bio::EnsEMBL::Hive::AnalysisJob object")
        unless ($job->isa("Bio::EnsEMBL::Hive::AnalysisJob"));
    $self->{'_input_job'} = $job;
  }
  return $self->{'_input_job'};
}


448
# ##################### subroutines that link through to Job's methods #########################
449

450
sub input_id {
451 452 453
    my $self = shift;

    return $self->input_job->input_id(@_);
454 455 456 457 458 459 460 461
}

sub param {
    my $self = shift @_;

    return $self->input_job->param(@_);
}

462 463 464
sub param_required {
    my $self = shift @_;

465 466 467 468 469 470 471
    my $prev_transient_error = $self->input_job->transient_error(); # make a note of previously set transience status
    $self->input_job->transient_error(0);                           # make sure if we die in param_required it is not transient

    my $value = $self->input_job->param_required(@_);

    $self->input_job->transient_error($prev_transient_error);       # restore the previous transience status
    return $value;
472 473 474 475 476 477 478 479
}

sub param_is_defined {
    my $self = shift @_;

    return $self->input_job->param_is_defined(@_);
}

480 481 482 483 484
sub param_substitute {
    my $self = shift @_;

    return $self->input_job->param_substitute(@_);
}
485

486 487 488 489 490 491
sub warning {
    my $self = shift @_;

    return $self->input_job->warning(@_);
}

492
sub dataflow_output_id {
493 494 495
    my $self = shift @_;

    return $self->input_job->dataflow_output_id(@_);
496 497
}

498

499 500 501 502 503 504 505 506 507
=head2 debug

    Title   :  debug
    Function:  Gets/sets flag for debug level. Set through Worker/runWorker.pl
               Subclasses should treat as a read_only variable.
    Returns :  integer

=cut

508
sub debug {
509 510 511 512 513
    my $self = shift;

    $self->{'_debug'} = shift if(@_);
    $self->{'_debug'}=0 unless(defined($self->{'_debug'}));  
    return $self->{'_debug'};
514 515 516
}


517 518 519 520 521 522
=head2 worker_temp_directory

    Title   :  worker_temp_directory
    Function:  Returns a path to a directory on the local /tmp disk 
               which the subclass can use as temporary file space.
               This directory is made the first time the function is called.
523 524
               It persists for as long as the worker is alive.  This allows
               multiple jobs run by the worker to potentially share temp data.
525 526 527 528 529 530 531 532 533 534
               For example the worker (which is a single Analysis) might need
               to dump a datafile file which is needed by all jobs run through 
               this analysis.  The process can first check the worker_temp_directory
               for the file and dump it if it is missing.  This way the first job
               run by the worker will do the dump, but subsequent jobs can reuse the 
               file.
    Usage   :  $tmp_dir = $self->worker_temp_directory;
    Returns :  <string> path to a local (/tmp) directory 

=cut
535 536

sub worker_temp_directory {
537 538
    my $self = shift @_;

539
    unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
540 541
        my $username = $ENV{'USER'};
        my $worker_id = $self->worker ? $self->worker->dbID : 'standalone';
542 543 544
        $self->{'_tmp_dir'} = "/tmp/worker_${username}.${worker_id}/";
        mkdir($self->{'_tmp_dir'}, 0777);
        throw("unable to create a writable directory ".$self->{'_tmp_dir'}) unless(-w $self->{'_tmp_dir'});
545
    }
546
    return $self->{'_tmp_dir'};
547 548
}

549 550 551 552

sub cleanup_worker_temp_directory {
    my $self = shift @_;

553 554
    if($self->{'_tmp_dir'} and (-e $self->{'_tmp_dir'}) ) {
        my $cmd = "rm -r ". $self->{'_tmp_dir'};
555 556 557 558 559
        system($cmd);
    }
}


560 561
1;