Process.pm 14.6 KB
Newer Older
1
# You may distribute this module under the same terms as perl itself #
2 3 4 5 6 7 8 9 10

=pod 

=head1 NAME

  Bio::EnsEMBL::Hive::Process

=head1 SYNOPSIS

11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
  Abstract superclass.  Each Process makes up the individual building blocks 
  of the system.  Instances of these processes are created in a hive workflow 
  graph of Analysis entries that are linked together with dataflow and 
  AnalysisCtrl rules.
  
  Instances of these Processes are created by the system as work is done.
  The newly created Process will have preset $self->queen, $self->dbc, 
  $self->input_id, $self->analysis and several other variables. 
  From this input and configuration data, each Process can then proceed to 
  do something.  The flow of execution within a Process is:
    fetch_input();
    run();
    write_output();
    DESTROY
  The developer can implement their own versions of fetch_input, run, 
  write_output, and DESTROY to do what they need.  
  
  The entire system is based around the concept of a workflow graph which
  can split and loop back on itself.  This is accomplished by dataflow
  rules (or pipes) that connect one Process (or analysis) to others.
  Where a unix commandline program can send output on STDOUT STDERR pipes, 
  a hive Process has access to unlimited pipes referenced by numerical 
  branch_codes. This is accomplished within the Process via 
  $self->dataflow_output_id(...);  
  
  The design philosophy is that each Process does it's work and creates output, 
  but it doesn't worry about where the input came from, or where it's output 
  goes. If the system has dataflow pipes connected, then the output jobs 
  have purpose, if not the output work is thrown away.  The workflow graph 
  'controls' the behaviour of the system, not the processes.  The processes just 
  need to do their job.  The design of the workflow graph is based on the knowledge 
  of what each Process does so that the graph can be correctly constructed.
  The workflow graph can be constructed a priori or can be constructed and 
  modified by intelligent Processes as the system runs.
  
  
  The Hive is based on AI concepts and modeled on the social structure and 
  behaviour of a honey bee hive. So where a worker honey bee's purpose is
  (go find pollen, bring back to hive, drop off pollen, repeat), an ensembl-hive 
  worker's purpose is (find a job, create a Process for that job, run it,
  drop off output job(s), repeat).  While most workflow systems are based 
  on 'smart' central controllers and external control of 'dumb' processes, 
  the Hive is based on 'dumb' workflow graphs and job kiosk, and 'smart' workers 
  (autonomous agents) who are self configuring and figure out for themselves what 
  needs to be done, and then do it.  The workers are based around a set of 
  emergent behaviour rules which allow a predictible system behaviour to emerge 
  from what otherwise might appear at first glance to be a chaotic system. There 
  is an inherent asynchronous disconnect between one worker and the next.  
  Work (or jobs) are simply 'posted' on a blackboard or kiosk within the hive 
  database where other workers can find them.  
  The emergent behaviour rules of a worker are:
     1) If a job is posted, someone needs to do it.
     2) Don't grab something that someone else is working on
     3) Don't grab more than you can handle
     4) If you grab a job, it needs to be finished correctly
     5) Keep busy doing work
     6) If you fail, do the best you can to report back
  For further reading on the AI principles employed in this design see:
     http://en.wikipedia.org/wiki/Autonomous_Agent
     http://en.wikipedia.org/wiki/Emergence
  
72 73 74

=head1 CONTACT

75
  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
76 77 78 79 80 81 82 83

=head1 APPENDIX

  The rest of the documentation details each of the object methods. 
  Internal methods are usually preceded with a _

=cut

84 85


86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
my $g_hive_process_workdir;  # a global directory location for the process using this module

package Bio::EnsEMBL::Hive::Process;

use strict;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::AnalysisJob;

sub new {
  my ($class,@args) = @_;
  my $self = bless {}, $class;
  
  my ($analysis) = rearrange([qw( ANALYSIS )], @args);
  $self->analysis($analysis) if($analysis);
  
  return $self;
}

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180

##########################################
#
# methods subclasses should override 
# in order to give this process function
#
##########################################

=head2 fetch_input

    Title   :  fetch_input
    Function:  sublcass can implement functions related to data fetching.
               Typical acivities would be to parse $self->input_id and read
               configuration information from $self->analysis.  Subclasses
               may also want to fetch data from databases or from files 
               within this function.

=cut

sub fetch_input {
  my $self = shift;
  return 1;
}

=head2 run

    Title   :  run
    Function:  sublcass can implement functions related to process execution.
               Typical activities include running external programs or running
               algorithms by calling perl methods.  Process may also choose to
               parse results into memory if an external program was used.

=cut

sub run {
  my $self = shift;
  return 1;
}

=head2 write_output

    Title   :  write_output
    Function:  sublcass can implement functions related to storing results.
               Typical activities including writing results into database tables
               or into files on a shared filesystem.
               
=cut

sub write_output {
  my $self = shift;
  return 1;
}

=head2 DESTROY

    Title   :  DESTROY
    Function:  sublcass can implement functions related to cleanup and release.
               Typical activities includes freeing datastructures or 
	       closing files. 

=cut

sub DESTROY {
  my $self = shift;
  $self->SUPER::DESTROY if $self->can("SUPER::DESTROY");
}


######################################################
#
# methods that subclasses can use to get access
# to hive infrastructure
#
######################################################


181 182 183
=head2 queen

    Title   :   queen
184 185
    Usage   :   my $hiveDBA = $self->queen;
    Function:   returns the 'Queen' this Process was created by
186 187 188 189 190 191 192 193 194 195
    Returns :   Bio::EnsEMBL::Hive::Queen

=cut

sub queen {
  my $self = shift;
  $self->{'_queen'} = shift if(@_);
  return $self->{'_queen'};
}

196 197 198 199 200 201
sub worker {
  my $self = shift;
  $self->{'_worker'} = shift if(@_);
  return $self->{'_worker'};
}

202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
=head2 db

    Title   :   db
    Usage   :   my $hiveDBA = $self->db;
    Function:   returns DBAdaptor to Hive database
    Returns :   Bio::EnsEMBL::Hive::DBSQL::DBAdaptor

=cut

sub db {
  my $self = shift;
  return undef unless($self->queen);
  return $self->queen->db;
}

217 218 219 220 221 222 223 224 225
=head2 dbc

    Title   :   dbc
    Usage   :   my $hiveDBConnection = $self->dbc;
    Function:   returns DBConnection to Hive database
    Returns :   Bio::EnsEMBL::DBSQL::DBConnection

=cut

226 227 228 229 230 231 232 233 234 235
sub dbc {
  my $self = shift;
  return undef unless($self->queen);
  return $self->queen->dbc;
}

=head2 analysis

    Title   :  analysis
    Usage   :  $self->analysis;
236 237
    Function:  Returns the Analysis object associated with this
               instance of the Process.
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
    Returns :  Bio::EnsEMBL::Analysis object

=cut

sub analysis {
  my ($self, $analysis) = @_;

  if($analysis) {
    throw("Not a Bio::EnsEMBL::Analysis object")
      unless ($analysis->isa("Bio::EnsEMBL::Analysis"));
    $self->{'_analysis'} = $analysis;
  }
  return $self->{'_analysis'};
}

=head2 input_job

    Title   :  input_job
256
    Function:  Returns the AnalysisJob to be run by this process
257 258
               Subclasses should treat this as a read_only object.          
    Returns :  Bio::EnsEMBL::Hive::AnalysisJob object
259 260 261 262 263 264 265 266 267 268 269 270 271

=cut

sub input_job {
  my( $self, $job ) = @_;
  if($job) {
    throw("Not a Bio::EnsEMBL::Hive::AnalysisJob object")
        unless ($job->isa("Bio::EnsEMBL::Hive::AnalysisJob"));
    $self->{'_input_job'} = $job;
  }
  return $self->{'_input_job'};
}

272 273 274 275 276 277 278 279 280 281 282
=head2 autoflow_inputjob

    Title   :  autoflow_inputjob
    Function:  Gets/sets flag for whether the input_job should
               be automatically dataflowed on branch code 1 when the
               job completes.  If the subclass manually sends a job along
               branch 1 with dataflow_output_id, the autoflow will be turned off.
    Returns :  boolean (1/0/undef)

=cut

283 284 285 286 287 288 289
sub autoflow_inputjob {
  my $self = shift;
  $self->{'_autoflow_inputjob'} = shift if(@_);
  $self->{'_autoflow_inputjob'}=1 unless(defined($self->{'_autoflow_inputjob'}));  
  return $self->{'_autoflow_inputjob'};
}

290 291 292 293 294 295 296
=head2 dataflow_output_id

    Title        :  dataflow_output_id
    Arg[1](req)  :  <string> $output_id 
    Arg[2](opt)  :  <int> $branch_code (optional, defaults to 1)
    Usage        :  $self->dataflow_output_id($output_id, $branch_code);
    Function:  
297 298
      If Process needs to create jobs, this allows it to have jobs 
      created and flowed through the dataflow rules of the workflow graph.
299
      This 'output_id' becomes the 'input_id' of the newly created job at
300
      the ends of the dataflow pipes.  The optional 'branch_code' determines
301
      which dataflow pipe(s) to flow the job through.      
302 303 304 305

=cut

sub dataflow_output_id {
306
  my ($self, $output_id, $branch_code, $blocked) = @_;
307 308 309 310

  return unless($output_id);
  return unless($self->analysis);

311 312
  $branch_code=1 unless(defined($branch_code));

313 314 315 316 317
  # Dataflow works by doing a transform from this process to the next.
  # The job starts out 'attached' to this process hence the analysis_id, branch_code, and dbID
  # are all relative to the starting point.  The dataflow process transforms the job to a 
  # different analysis_id, and moves the dbID to the previous_analysis_job_id
  
318 319 320
  my $job = new Bio::EnsEMBL::Hive::AnalysisJob;
  $job->input_id($output_id);
  $job->analysis_id($self->analysis->dbID);
321
  $job->branch_code($branch_code);
322
  $job->dbID($self->input_job->dbID);
323
  $job->status( $blocked ? 'BLOCKED' : 'READY' );
324 325 326
  
  #if process uses branch_code 1 explicitly, turn off automatic dataflow
  $self->autoflow_inputjob(0) if($branch_code==1);
327

328
  return $self->queen->flow_output_job($job);  
329 330
}

331 332 333 334 335 336 337 338 339
=head2 debug

    Title   :  debug
    Function:  Gets/sets flag for debug level. Set through Worker/runWorker.pl
               Subclasses should treat as a read_only variable.
    Returns :  integer

=cut

340 341 342 343 344 345 346 347
sub debug {
  my $self = shift;
  $self->{'_debug'} = shift if(@_);
  $self->{'_debug'}=0 unless(defined($self->{'_debug'}));  
  return $self->{'_debug'};
}


348 349 350 351 352 353 354 355 356 357 358 359 360 361
=head2 encode_hash

    Title   :  encode_hash
    Arg[1]  :  <reference to perl hash> $hash_ref 
    Function:  Simple convenience method which take a reference to a perl hash and
               returns a string which is perl code which can be converted back into
               the hash with an eval statement.  Treats all values in hash as strings
               so it will not properly encode complex data into perl.
    Usage   :  $hash_string = $self->encode_hash($has_ref);
               $hash_ref2 = eval($hash_string);
    Returns :  <string> perl code

=cut

362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
sub encode_hash {
  my $self = shift;
  my $hash_ref = shift;

  return "" unless($hash_ref);

  my $hash_string = "{";
  my @keys = sort(keys %{$hash_ref});
  foreach my $key (@keys) {
    if(defined($hash_ref->{$key})) {
      $hash_string .= "'$key'=>'" . $hash_ref->{$key} . "',";
    }
  }
  $hash_string .= "}";

  return $hash_string;
}

380 381 382 383 384 385
=head2 worker_temp_directory

    Title   :  worker_temp_directory
    Function:  Returns a path to a directory on the local /tmp disk 
               which the subclass can use as temporary file space.
               This directory is made the first time the function is called.
386 387
               It persists for as long as the worker is alive.  This allows
               multiple jobs run by the worker to potentially share temp data.
388 389 390 391 392 393 394 395 396 397
               For example the worker (which is a single Analysis) might need
               to dump a datafile file which is needed by all jobs run through 
               this analysis.  The process can first check the worker_temp_directory
               for the file and dump it if it is missing.  This way the first job
               run by the worker will do the dump, but subsequent jobs can reuse the 
               file.
    Usage   :  $tmp_dir = $self->worker_temp_directory;
    Returns :  <string> path to a local (/tmp) directory 

=cut
398 399

sub worker_temp_directory {
400 401 402
  my $self = shift;
  return undef unless($self->worker);
  return $self->worker->worker_process_temp_directory;
403 404 405 406 407 408 409 410 411 412 413 414 415 416
}

#################################################
#
# methods to make porting from RunnableDB easier
#
#################################################

sub input_id {
  my $self = shift;
  return '' unless($self->input_job);
  return $self->input_job->input_id;
}

417 418 419 420 421 422
sub parameters {
  my $self = shift;
  return '' unless($self->analysis);
  return $self->analysis->parameters;
}

423 424 425 426 427
=head2 runnable

    Title   :   runnable
    Usage   :   $self->runnable($arg)
    Function:   Sets a runnable for this RunnableDB
428
    Returns :   arrayref of Bio::EnsEMBL::Analysis::Runnable
429 430 431 432 433 434 435 436
    Args    :   Bio::EnsEMBL::Analysis::Runnable

=cut


sub runnable {
  my ($self,$arg) = @_;

437 438
  if (!defined($self->{'runnable'})) {
      $self->{'runnable'} = [];
439 440 441
  }
  
  if (defined($arg)) {
442 443 444 445
    if ($arg->isa("Bio::EnsEMBL::Analysis::Runnable")) {
      push(@{$self->{'runnable'}},$arg);
    } else {
      &throw("[$arg] is not a Bio::EnsEMBL::Analysis::Runnable");
446
    }
447
  }
448
  return $self->{'runnable'};  
449 450 451 452 453 454 455 456 457 458 459 460
}

=head2 output

    Title   :   output
    Usage   :   $self->output()
    Function:   
    Returns :   Array of Bio::EnsEMBL::FeaturePair
    Args    :   None

=cut

461 462
sub output {
  my ($self) = @_;
463

464 465 466 467
  unless (defined $self->{'output'}) {
    $self->{'output'} = [];
    foreach my $r (@{$self->runnable}){
      push(@{$self->{'output'}}, @{$r->output});
468
    }
469
  }
470

471
  return @{$self->{'output'}};
472
}
473

Albert Vilella's avatar
Albert Vilella committed
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502
=head2 check_if_exit_cleanly

    Title   :   check_if_exit_cleanly
    Usage   :   $self->check_if_exit_cleanly()
    Function:   Check if we want to exit or kill it cleanly at the
                runnable level
    Returns :   None
    Args    :   None

=cut

sub check_if_exit_cleanly {
  my $self = shift;

  my $id = $self->input_job->dbID;
  my $honeycomb_dir = $self->{'honeycomb_dir'};
  $honeycomb_dir =~ s/\/$//;
  my $not_allowed = $honeycomb_dir . "/" . "relegate." . $id;
  my $exit_cleanly = $honeycomb_dir . "/" . "relegate.all";
  if (-e $not_allowed) {
    $self->update_status('FAILED');
    throw("This job has been relegated to be killed - $id\n");
  } elsif (-e $exit_cleanly) {
    $self->update_status('READY');
    throw("This job has been relegated to be exited - $id\n");
  }
  return undef;
}

503 504
1;

Albert Vilella's avatar
Albert Vilella committed
505