Process.pm 16.7 KB
Newer Older
1 2 3 4
=pod 

=head1 NAME

5
    Bio::EnsEMBL::Hive::Process
6

7
=head1 DESCRIPTION
8

9 10 11 12
    Abstract superclass.  Each Process makes up the individual building blocks 
    of the system.  Instances of these processes are created in a hive workflow 
    graph of Analysis entries that are linked together with dataflow and 
    AnalysisCtrl rules.
13
  
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
    Instances of these Processes are created by the system as work is done.
    The newly created Process will have preset $self->db, $self->dbc, 
    $self->input_id, $self->analysis and several other variables. 
    From this input and configuration data, each Process can then proceed to 
    do something.  The flow of execution within a Process is:
        pre_cleanup() if($retry_count>0);   # clean up databases/filesystem before subsequent attempts
        fetch_input();                      # fetch the data from databases/filesystems
        run();                              # perform the main computation 
        write_output();                     # record the results in databases/filesystems
        post_cleanup();                     # destroy all non-trivial data structures after the job is done
    The developer can implement their own versions of
    pre_cleanup, fetch_input, run, write_output, and post_cleanup to do what they need.  

    The entire system is based around the concept of a workflow graph which
    can split and loop back on itself.  This is accomplished by dataflow
    rules (similar to Unix pipes) that connect one Process (or analysis) to others.
    Where a Unix command line program can send output on STDOUT STDERR pipes, 
    a hive Process has access to unlimited pipes referenced by numerical 
    branch_codes. This is accomplished within the Process via 
    $self->dataflow_output_id(...);  
34
  
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
    The design philosophy is that each Process does its work and creates output, 
    but it doesn't worry about where the input came from, or where its output 
    goes. If the system has dataflow pipes connected, then the output jobs 
    have purpose, if not - the output work is thrown away.  The workflow graph 
    'controls' the behaviour of the system, not the processes.  The processes just 
    need to do their job.  The design of the workflow graph is based on the knowledge 
    of what each Process does so that the graph can be correctly constructed.
    The workflow graph can be constructed a priori or can be constructed and 
    modified by intelligent Processes as the system runs.


    The Hive is based on AI concepts and modeled on the social structure and 
    behaviour of a honey bee hive. So where a worker honey bee's purpose is
    (go find pollen, bring back to hive, drop off pollen, repeat), an ensembl-hive 
    worker's purpose is (find a job, create a Process for that job, run it,
    drop off output job(s), repeat).  While most workflow systems are based 
    on 'smart' central controllers and external control of 'dumb' processes, 
    the Hive is based on 'dumb' workflow graphs and job kiosk, and 'smart' workers 
    (autonomous agents) who are self configuring and figure out for themselves what 
    needs to be done, and then do it.  The workers are based around a set of 
    emergent behaviour rules which allow a predictible system behaviour to emerge 
    from what otherwise might appear at first glance to be a chaotic system. There 
    is an inherent asynchronous disconnect between one worker and the next.  
    Work (or jobs) are simply 'posted' on a blackboard or kiosk within the hive 
    database where other workers can find them.  
    The emergent behaviour rules of a worker are:
    1) If a job is posted, someone needs to do it.
    2) Don't grab something that someone else is working on
    3) Don't grab more than you can handle
    4) If you grab a job, it needs to be finished correctly
    5) Keep busy doing work
    6) If you fail, do the best you can to report back

    For further reading on the AI principles employed in this design see:
        http://en.wikipedia.org/wiki/Autonomous_Agent
        http://en.wikipedia.org/wiki/Emergence

=head1 LICENSE

74
    Copyright [1999-2015] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute
Matthieu Muffato's avatar
Matthieu Muffato committed
75
    Copyright [2016-2018] EMBL-European Bioinformatics Institute
76 77 78 79 80 81 82 83 84

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
85 86 87

=head1 CONTACT

88
    Please subscribe to the Hive mailing list:  http://listserver.ebi.ac.uk/mailman/listinfo/ehive-users  to discuss Hive-related questions or to be notified of our updates
89 90 91

=head1 APPENDIX

92 93
    The rest of the documentation details each of the object methods. 
    Internal methods are usually preceded with a _
94 95 96

=cut

97

98 99 100
package Bio::EnsEMBL::Hive::Process;

use strict;
101
use warnings;
102

103
use Bio::EnsEMBL::Registry;
104
use Bio::EnsEMBL::Utils::Argument ('rearrange');
105
use Bio::EnsEMBL::Utils::Exception ('throw');
106

107
use Bio::EnsEMBL::Hive::DBSQL::DBConnection;
108
use Bio::EnsEMBL::Hive::Utils ('stringify', 'go_figure_dbc');
109
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
110

111 112
use base ('Bio::EnsEMBL::Utils::Exception');   # provide these methods for deriving classes

113

114
sub new {
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
    my ($class, @args) = @_;

    my $self = bless {}, $class;

    my ($analysis) = rearrange([qw( ANALYSIS )], @args);
    $self->analysis($analysis) if($analysis);

    return $self;
}


sub life_cycle {
    my ($self, $worker) = @_;

    my $job = $self->input_job();
    my $partial_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
    my %job_partial_timing = ();

    $job->autoflow(1);

    if( $self->can('pre_cleanup') and $job->retry_count()>0 ) {
        $self->enter_status('PRE_CLEANUP');
        $self->pre_cleanup;
    }

    $self->enter_status('FETCH_INPUT');
    $partial_stopwatch->restart();
    $self->fetch_input;
143
    $job_partial_timing{'FETCH_INPUT'} = $partial_stopwatch->pause->get_elapsed;
144 145 146 147

    $self->enter_status('RUN');
    $partial_stopwatch->restart();
    $self->run;
148
    $job_partial_timing{'RUN'} = $partial_stopwatch->pause->get_elapsed;
149 150 151 152 153

    if($self->execute_writes) {
        $self->enter_status('WRITE_OUTPUT');
        $partial_stopwatch->restart();
        $self->write_output;
154
        $job_partial_timing{'WRITE_OUTPUT'} = $partial_stopwatch->pause->get_elapsed;
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181

        if( $job->autoflow ) {
            print STDERR "\njob ".$job->dbID." : AUTOFLOW input->output\n" if($self->debug);
            $job->dataflow_output_id();
        }
    } else {
        print STDERR "\n!!! *no* WRITE_OUTPUT requested, so there will be no AUTOFLOW\n" if($self->debug); 
    }

    if( $self->can('post_cleanup') ) {   # Todo: may need to run it after the eval, to clean up the memory even after partially failed attempts?
        $self->enter_status('POST_CLEANUP');
        $self->post_cleanup;
    }

    my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
    if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
        $job->transient_error(0);
        die "There are cached semaphored fans for which a funnel job (dataflow_rule_id(s) ".join(',',@zombie_funnel_dataflow_rule_ids).") has never been dataflown";
    }

    return \%job_partial_timing;
}


sub enter_status {
    my ($self, $status) = @_;

182
    my $job = $self->input_job();
183

184 185 186 187 188 189 190 191
    $job->update_status( $status );

    my $status_msg  = 'Job '.$job->dbID.' : '.$status;

    if(my $worker = $self->worker) {
        $worker->enter_status( $status, $status_msg );
    } elsif($self->debug) {
        print STDERR "Standalone$status_msg\n";
192
    }
193 194
}

195 196 197 198 199 200 201 202

##########################################
#
# methods subclasses should override 
# in order to give this process function
#
##########################################

203 204 205
=head2 strict_hash_format

    Title   :  strict_hash_format
206
    Function:  if a subclass wants more flexibility in parsing job.input_id and analysis.parameters,
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
               it should redefine this method to return 0

=cut

sub strict_hash_format {
    return 1;
}


=head2 param_defaults

    Title   :  param_defaults
    Function:  sublcass can define defaults for all params used by the RunnableDB/Process

=cut

sub param_defaults {
    return {};
}


228 229 230
#
## Function: sublcass can implement functions related to cleaning up the database/filesystem after the previous unsuccessful run.
#
231 232 233 234 235 236 237 238

# sub pre_cleanup {
#    my $self = shift;
#
#    return 1;
# }


239 240 241 242 243 244 245 246 247 248 249 250
=head2 fetch_input

    Title   :  fetch_input
    Function:  sublcass can implement functions related to data fetching.
               Typical acivities would be to parse $self->input_id and read
               configuration information from $self->analysis.  Subclasses
               may also want to fetch data from databases or from files 
               within this function.

=cut

sub fetch_input {
Leo Gordon's avatar
Leo Gordon committed
251 252 253
    my $self = shift;

    return 1;
254 255
}

256

257 258 259 260 261 262 263 264 265 266 267
=head2 run

    Title   :  run
    Function:  sublcass can implement functions related to process execution.
               Typical activities include running external programs or running
               algorithms by calling perl methods.  Process may also choose to
               parse results into memory if an external program was used.

=cut

sub run {
Leo Gordon's avatar
Leo Gordon committed
268 269 270
    my $self = shift;

    return 1;
271 272
}

273

274 275 276 277 278 279 280 281 282 283
=head2 write_output

    Title   :  write_output
    Function:  sublcass can implement functions related to storing results.
               Typical activities including writing results into database tables
               or into files on a shared filesystem.
               
=cut

sub write_output {
Leo Gordon's avatar
Leo Gordon committed
284 285 286
    my $self = shift;

    return 1;
287 288 289
}


290 291 292 293
#
## Function:  sublcass can implement functions related to cleaning up after running one job
#               (destroying non-trivial data structures in memory).
#
294

295 296 297 298 299
#sub post_cleanup {
#    my $self = shift;
#
#    return 1;
#}
300 301 302 303 304 305 306 307 308 309


######################################################
#
# methods that subclasses can use to get access
# to hive infrastructure
#
######################################################


310
=head2 worker
311

312 313 314
    Title   :   worker
    Usage   :   my $worker = $self->worker;
    Function:   returns the Worker object this Process is run by
315
    Returns :   Bio::EnsEMBL::Hive::Worker
316 317 318

=cut

319
sub worker {
320 321 322 323
    my $self = shift;

    $self->{'_worker'} = shift if(@_);
    return $self->{'_worker'};
324 325
}

326

327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
=head2 execute_writes

    Title   :   execute_writes
    Usage   :   $self->execute_writes( 1 );
    Function:   getter/setter for whether we want the 'write_output' method to be run
    Returns :   boolean

=cut

sub execute_writes {
    my $self = shift;

    $self->{'_execute_writes'} = shift if(@_);
    return $self->{'_execute_writes'};
}


344 345 346 347 348 349 350 351 352 353
=head2 db

    Title   :   db
    Usage   :   my $hiveDBA = $self->db;
    Function:   returns DBAdaptor to Hive database
    Returns :   Bio::EnsEMBL::Hive::DBSQL::DBAdaptor

=cut

sub db {
354 355
    my $self = shift;

356 357
    $self->{'_db'} = shift if(@_);
    return $self->{'_db'};
358 359
}

360

361 362 363 364 365
=head2 dbc

    Title   :   dbc
    Usage   :   my $hiveDBConnection = $self->dbc;
    Function:   returns DBConnection to Hive database
366
    Returns :   Bio::EnsEMBL::Hive::DBSQL::DBConnection
367 368 369

=cut

370
sub dbc {
371 372
    my $self = shift;

373
    return $self->db && $self->db->dbc;
374 375
}

376

377
=head2 data_dbc
378

379 380
    Title   :   data_dbc
    Usage   :   my $data_dbc = $self->data_dbc;
381
    Function:   returns a Bio::EnsEMBL::Hive::DBSQL::DBConnection object (the "current" one by default, but can be set up otherwise)
382
    Returns :   Bio::EnsEMBL::DBSQL::DBConnection
383 384 385

=cut

386
sub data_dbc {
387 388
    my $self = shift @_;

389 390 391
    my $given_db_conn   = shift @_ || ($self->param_is_defined('db_conn') ? $self->param('db_conn') : $self);
    my $given_ref = ref( $given_db_conn );
    my $given_signature = ($given_ref eq 'ARRAY' or $given_ref eq 'HASH') ? stringify ( $given_db_conn ) : "$given_db_conn";
392

393 394
    if( !$self->{'_cached_db_signature'} or ($self->{'_cached_db_signature'} ne $given_signature) ) {
        $self->{'_cached_db_signature'} = $given_signature;
395
        $self->{'_cached_data_dbc'} = go_figure_dbc( $given_db_conn );
396 397
    }

398
    return $self->{'_cached_data_dbc'};
399 400
}

401

402 403 404 405
=head2 analysis

    Title   :  analysis
    Usage   :  $self->analysis;
406 407
    Function:  Returns the Analysis object associated with this
               instance of the Process.
408
    Returns :  Bio::EnsEMBL::Hive::Analysis object
409 410 411 412 413 414 415

=cut

sub analysis {
  my ($self, $analysis) = @_;

  if($analysis) {
416 417
    throw("Not a Bio::EnsEMBL::Hive::Analysis object")
      unless ($analysis->isa("Bio::EnsEMBL::Hive::Analysis"));
418 419 420 421 422 423 424 425
    $self->{'_analysis'} = $analysis;
  }
  return $self->{'_analysis'};
}

=head2 input_job

    Title   :  input_job
426
    Function:  Returns the AnalysisJob to be run by this process
427 428
               Subclasses should treat this as a read_only object.          
    Returns :  Bio::EnsEMBL::Hive::AnalysisJob object
429 430 431 432 433 434 435 436 437 438 439 440 441 442

=cut

sub input_job {
  my( $self, $job ) = @_;
  if($job) {
    throw("Not a Bio::EnsEMBL::Hive::AnalysisJob object")
        unless ($job->isa("Bio::EnsEMBL::Hive::AnalysisJob"));
    $self->{'_input_job'} = $job;
  }
  return $self->{'_input_job'};
}


443
# ##################### subroutines that link through to Job's methods #########################
444

445
sub input_id {
446 447 448
    my $self = shift;

    return $self->input_job->input_id(@_);
449 450 451 452 453 454 455 456
}

sub param {
    my $self = shift @_;

    return $self->input_job->param(@_);
}

457 458 459
sub param_required {
    my $self = shift @_;

460 461 462 463 464 465 466
    my $prev_transient_error = $self->input_job->transient_error(); # make a note of previously set transience status
    $self->input_job->transient_error(0);                           # make sure if we die in param_required it is not transient

    my $value = $self->input_job->param_required(@_);

    $self->input_job->transient_error($prev_transient_error);       # restore the previous transience status
    return $value;
467 468 469 470 471 472 473 474
}

sub param_is_defined {
    my $self = shift @_;

    return $self->input_job->param_is_defined(@_);
}

475 476 477 478 479
sub param_substitute {
    my $self = shift @_;

    return $self->input_job->param_substitute(@_);
}
480

481 482 483 484 485 486
sub warning {
    my $self = shift @_;

    return $self->input_job->warning(@_);
}

487
sub dataflow_output_id {
488 489 490
    my $self = shift @_;

    return $self->input_job->dataflow_output_id(@_);
491 492
}

493

494 495 496 497 498 499 500 501 502
=head2 debug

    Title   :  debug
    Function:  Gets/sets flag for debug level. Set through Worker/runWorker.pl
               Subclasses should treat as a read_only variable.
    Returns :  integer

=cut

503
sub debug {
504 505 506 507 508
    my $self = shift;

    $self->{'_debug'} = shift if(@_);
    $self->{'_debug'}=0 unless(defined($self->{'_debug'}));  
    return $self->{'_debug'};
509 510 511
}


512 513 514 515 516 517
=head2 worker_temp_directory

    Title   :  worker_temp_directory
    Function:  Returns a path to a directory on the local /tmp disk 
               which the subclass can use as temporary file space.
               This directory is made the first time the function is called.
518 519
               It persists for as long as the worker is alive.  This allows
               multiple jobs run by the worker to potentially share temp data.
520 521 522 523 524 525 526 527 528 529
               For example the worker (which is a single Analysis) might need
               to dump a datafile file which is needed by all jobs run through 
               this analysis.  The process can first check the worker_temp_directory
               for the file and dump it if it is missing.  This way the first job
               run by the worker will do the dump, but subsequent jobs can reuse the 
               file.
    Usage   :  $tmp_dir = $self->worker_temp_directory;
    Returns :  <string> path to a local (/tmp) directory 

=cut
530 531

sub worker_temp_directory {
532 533
    my $self = shift @_;

534
    unless(defined($self->{'_tmp_dir'}) and (-e $self->{'_tmp_dir'})) {
535
        $self->{'_tmp_dir'} = $self->worker_temp_directory_name();
536 537
        mkdir($self->{'_tmp_dir'}, 0777);
        throw("unable to create a writable directory ".$self->{'_tmp_dir'}) unless(-w $self->{'_tmp_dir'});
538
    }
539
    return $self->{'_tmp_dir'};
540 541
}

542 543 544 545
sub worker_temp_directory_name {
    my $self = shift @_;

    my $username = $ENV{'USER'};
546
    my $worker_id = $self->worker ? $self->worker->dbID : "standalone.$$";
547 548 549 550 551 552 553 554 555 556 557 558
    return "/tmp/worker_${username}.${worker_id}/";
}


=head2 cleanup_worker_temp_directory

    Title   :  cleanup_worker_temp_directory
    Function:  Cleans up the directory on the local /tmp disk that is used for the
               worker. It can be used to remove files left there by previous jobs.
    Usage   :  $self->cleanup_worker_temp_directory;

=cut
559 560 561 562

sub cleanup_worker_temp_directory {
    my $self = shift @_;

563 564 565
    my $tmp_dir = $self->worker_temp_directory_name();
    if(-e $tmp_dir) {
        my $cmd = "rm -r $tmp_dir";
566 567 568 569 570
        system($cmd);
    }
}


571 572
1;