AnalysisJobAdaptor.pm 16.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
#
# Date of creation: 22.03.2004
# Original Creator : Jessica Severin <jessica@ebi.ac.uk>
#
# Copyright EMBL-EBI 2004
#
# You may distribute this module under the same terms as perl itself

# POD documentation - main docs before the code

=head1 NAME

Jessica Severin's avatar
Jessica Severin committed
14
  Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
15 16 17 18 19 20 21 22 23 24 25 26 27

=head1 SYNOPSIS

  $analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
  $analysisJobAdaptor = $analysisJob->adaptor;

=head1 DESCRIPTION

  Module to encapsulate all db access for persistent class AnalysisJob.
  There should be just one per application and database connection.

=head1 CONTACT

Jessica Severin's avatar
Jessica Severin committed
28 29
  Contact Jessica Severin on implemetation/design detail: jessica@ebi.ac.uk
  Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk
30 31 32

=head1 APPENDIX

Jessica Severin's avatar
Jessica Severin committed
33 34
  The rest of the documentation details each of the object methods.
  Internal methods are preceded with a _
35 36 37 38 39 40 41

=cut


# Let the code begin...

package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
Jessica Severin's avatar
Jessica Severin committed
42 43 44 45 46 47 48 49

use strict;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::DBSQL::BaseAdaptor;
use Sys::Hostname;
use Data::UUID;

50 51
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
52

Jessica Severin's avatar
Jessica Severin committed
53 54
our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);

55 56 57 58 59
###############################################################################
#
#  CLASS methods
#
###############################################################################
Jessica Severin's avatar
Jessica Severin committed
60

Jessica Severin's avatar
Jessica Severin committed
61 62 63 64 65 66 67 68 69 70
=head2 CreateNewJob

  Args       : -input_id => string of input_id which will be passed to run the job
               -analysis => Bio::EnsEMBL::Analysis object from a database
               -block        => int(0,1) set blocking state of job (default = 0)
               -input_job_id => (optional) analysis_job_id of job that is creating this
                                job.  Used purely for book keeping.
  Example    : $analysis_job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
                                    -input_id => 'my input data',
                                    -analysis => $myAnalysis);
Jessica Severin's avatar
Jessica Severin committed
71 72 73
  Description: uses the analysis object to get the db connection from the adaptor to store a new
               job in a hive.  This is a class level method since it does not have any state.
               Also updates corresponding analysis_stats by incrementing total_job_count,
Jessica Severin's avatar
Jessica Severin committed
74
               unclaimed_job_count and flagging the incremental update by changing the status
Jessica Severin's avatar
Jessica Severin committed
75
               to 'LOADING' (but only if the analysis is not blocked).
Jessica Severin's avatar
Jessica Severin committed
76 77 78 79 80 81
  Returntype : int analysis_job_id on database analysis is from.
  Exceptions : thrown if either -input_id or -analysis are not properly defined
  Caller     : general

=cut

82 83 84 85 86
sub CreateNewJob {
  my ($class, @args) = @_;

  return undef unless(scalar @args);

87 88
  my ($input_id, $analysis, $prev_analysis_job_id, $blocked) =
     rearrange([qw(INPUT_ID ANALYSIS input_job_id BLOCK )], @args);
89

90
  $prev_analysis_job_id=0 unless($prev_analysis_job_id);
91
  throw("must define input_id") unless($input_id);
92 93 94
  throw("must define analysis") unless($analysis);
  throw("analysis must be [Bio::EnsEMBL::Analysis] not a [$analysis]")
    unless($analysis->isa('Bio::EnsEMBL::Analysis'));
95 96
  throw("analysis must have adaptor connected to database")
    unless($analysis->adaptor and $analysis->adaptor->db);
97

98 99 100 101 102
  if(length($input_id) >= 255) {
    my $input_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($input_id);
    $input_id = "_ext_input_analysis_data_id $input_data_id";
  }

Jessica Severin's avatar
Jessica Severin committed
103 104 105 106 107 108
  my $sql = q{INSERT ignore into analysis_job 
              (input_id, prev_analysis_job_id,analysis_id,status)
              VALUES (?,?,?,?)};
 
  my $status ='READY';
  $status = 'BLOCKED' if($blocked);
109

110
  my $dbc = $analysis->adaptor->db->dbc;
111
  my $sth = $dbc->prepare($sql);
Jessica Severin's avatar
Jessica Severin committed
112
  $sth->execute($input_id, $prev_analysis_job_id, $analysis->dbID, $status);
113 114 115
  my $dbID = $sth->{'mysql_insertid'};
  $sth->finish;

116 117 118 119 120 121
  $dbc->do("UPDATE analysis_stats SET ".
           "total_job_count=total_job_count+1 ".
           ",unclaimed_job_count=unclaimed_job_count+1 ".
           ",status='LOADING' ".
           "WHERE status!='BLOCKED' and analysis_id='".$analysis->dbID ."'");

122 123 124 125 126 127 128 129
  return $dbID;
}

###############################################################################
#
#  INSTANCE methods
#
###############################################################################
Jessica Severin's avatar
Jessica Severin committed
130

131
=head2 fetch_by_dbID
Jessica Severin's avatar
Jessica Severin committed
132

Jessica Severin's avatar
Jessica Severin committed
133 134 135
  Arg [1]    : int $id
               the unique database identifier for the feature to be obtained
  Example    : $feat = $adaptor->fetch_by_dbID(1234);
Jessica Severin's avatar
Jessica Severin committed
136
  Description: Returns the AnalysisJob defined by the analysis_job_id $id.
Jessica Severin's avatar
Jessica Severin committed
137 138 139
  Returntype : Bio::EnsEMBL::Hive::AnalysisJob
  Exceptions : thrown if $id is not defined
  Caller     : general
Jessica Severin's avatar
Jessica Severin committed
140

Jessica Severin's avatar
Jessica Severin committed
141
=cut
Jessica Severin's avatar
Jessica Severin committed
142

Jessica Severin's avatar
Jessica Severin committed
143 144 145 146
sub fetch_by_dbID {
  my ($self,$id) = @_;

  unless(defined $id) {
147
    throw("fetch_by_dbID must have an id");
Jessica Severin's avatar
Jessica Severin committed
148 149 150 151 152 153 154 155 156 157 158 159 160 161
  }

  my @tabs = $self->_tables;

  my ($name, $syn) = @{$tabs[0]};

  #construct a constraint like 't1.table1_id = 1'
  my $constraint = "${syn}.${name}_id = $id";

  #return first element of _generic_fetch list
  my ($obj) = @{$self->_generic_fetch($constraint)};
  return $obj;
}

162

Jessica Severin's avatar
Jessica Severin committed
163
=head2 fetch_by_claim_analysis
Jessica Severin's avatar
Jessica Severin committed
164

Jessica Severin's avatar
Jessica Severin committed
165 166 167 168 169 170 171
  Arg [1]    : string job_claim (the UUID used to claim jobs)
  Arg [2]    : int analysis_id  
  Example    : $jobs = $adaptor->fetch_by_claim_analysis('c6658fde-64ab-4088-8526-2e960bd5dd60',208);
  Description: Returns a list of jobs for a claim id
  Returntype : Bio::EnsEMBL::Hive::AnalysisJob
  Exceptions : thrown if claim_id or analysis_id is not defined
  Caller     : general
Jessica Severin's avatar
Jessica Severin committed
172

Jessica Severin's avatar
Jessica Severin committed
173
=cut
Jessica Severin's avatar
Jessica Severin committed
174

Jessica Severin's avatar
Jessica Severin committed
175 176 177
sub fetch_by_claim_analysis {
  my ($self,$claim,$analysis_id) = @_;

178 179
  throw("fetch_by_claim_analysis must have claim ID") unless($claim);
  throw("fetch_by_claim_analysis must have analysis_id") unless($analysis_id);
180
  my $constraint = "a.status='CLAIMED' AND a.job_claim='$claim' AND a.analysis_id='$analysis_id'";
Jessica Severin's avatar
Jessica Severin committed
181 182 183 184 185
  return $self->_generic_fetch($constraint);
}


=head2 fetch_all
Jessica Severin's avatar
Jessica Severin committed
186

Jessica Severin's avatar
Jessica Severin committed
187 188
  Arg        : None
  Example    : 
Jessica Severin's avatar
Jessica Severin committed
189
  Description: fetches all jobs from database
Jessica Severin's avatar
Jessica Severin committed
190 191 192
  Returntype : 
  Exceptions : 
  Caller     : 
Jessica Severin's avatar
Jessica Severin committed
193

Jessica Severin's avatar
Jessica Severin committed
194
=cut
Jessica Severin's avatar
Jessica Severin committed
195

Jessica Severin's avatar
Jessica Severin committed
196 197 198 199 200 201
sub fetch_all {
  my $self = shift;

  return $self->_generic_fetch();
}

202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
=head2 fetch_all_failed_jobs

  Arg [1]    : (optional) int $analysis_id
  Example    : $failed_jobs = $adaptor->fetch_all_failed_jobs;
               $failed_jobs = $adaptor->fetch_all_failed_jobs($analysis->dbID);
  Description: Returns a list of all jobs with status 'FAILED'.  If an $analysis_id 
               is specified it will limit the search accordingly.
  Returntype : reference to list of Bio::EnsEMBL::Hive::AnalysisJob objects
  Exceptions : none
  Caller     : user processes

=cut

sub fetch_all_failed_jobs {
  my ($self,$analysis_id) = @_;

  my $constraint = "a.status='FAILED'";
  $constraint .= " AND a.analysis_id=$analysis_id" if($analysis_id);
  return $self->_generic_fetch($constraint);
}
Jessica Severin's avatar
Jessica Severin committed
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276


#
# INTERNAL METHODS
#
###################

sub _generic_fetch {
  my ($self, $constraint, $join) = @_;
  
  my @tables = $self->_tables;
  my $columns = join(', ', $self->_columns());
  
  if ($join) {
    foreach my $single_join (@{$join}) {
      my ($tablename, $condition, $extra_columns) = @{$single_join};
      if ($tablename && $condition) {
        push @tables, $tablename;
        
        if($constraint) {
          $constraint .= " AND $condition";
        } else {
          $constraint = " $condition";
        }
      } 
      if ($extra_columns) {
        $columns .= ", " . join(', ', @{$extra_columns});
      }
    }
  }
      
  #construct a nice table string like 'table1 t1, table2 t2'
  my $tablenames = join(', ', map({ join(' ', @$_) } @tables));

  my $sql = "SELECT $columns FROM $tablenames";

  my $default_where = $self->_default_where_clause;
  my $final_clause = $self->_final_clause;

  #append a where clause if it was defined
  if($constraint) { 
    $sql .= " WHERE $constraint ";
    if($default_where) {
      $sql .= " AND $default_where ";
    }
  } elsif($default_where) {
    $sql .= " WHERE $default_where ";
  }

  #append additional clauses which may have been defined
  $sql .= " $final_clause";

  my $sth = $self->prepare($sql);
  $sth->execute;  

277
  #print STDOUT $sql,"\n";
Jessica Severin's avatar
Jessica Severin committed
278 279 280 281

  return $self->_objs_from_sth($sth);
}

282

Jessica Severin's avatar
Jessica Severin committed
283 284 285
sub _tables {
  my $self = shift;

286
  return (['analysis_job', 'a']);
Jessica Severin's avatar
Jessica Severin committed
287 288
}

289

Jessica Severin's avatar
Jessica Severin committed
290 291 292 293
sub _columns {
  my $self = shift;

  return qw (a.analysis_job_id  
294
             a.prev_analysis_job_id
Jessica Severin's avatar
Jessica Severin committed
295
             a.analysis_id	      
296
             a.input_id 
Jessica Severin's avatar
Jessica Severin committed
297 298 299 300 301 302
             a.job_claim  
             a.hive_id	      
             a.status 
             a.retry_count          
             a.completed
             a.branch_code
303 304
             a.runtime_msec
             a.query_count
Jessica Severin's avatar
Jessica Severin committed
305 306 307
            );
}

308 309
sub _default_where_clause {
  my $self = shift;
310
  return '';
311 312 313 314 315 316 317 318
}


sub _final_clause {
  my $self = shift;
  return 'ORDER BY retry_count';
}

319

Jessica Severin's avatar
Jessica Severin committed
320 321 322 323 324 325 326
sub _objs_from_sth {
  my ($self, $sth) = @_;
  
  my %column;
  $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));

  my @jobs = ();
327
    
Jessica Severin's avatar
Jessica Severin committed
328 329 330 331 332
  while ($sth->fetch()) {
    my $job = new Bio::EnsEMBL::Hive::AnalysisJob;

    $job->dbID($column{'analysis_job_id'});
    $job->analysis_id($column{'analysis_id'});
333
    $job->input_id($column{'input_id'});
Jessica Severin's avatar
Jessica Severin committed
334 335 336 337 338 339
    $job->job_claim($column{'job_claim'});
    $job->hive_id($column{'hive_id'});
    $job->status($column{'status'});
    $job->retry_count($column{'retry_count'});
    $job->completed($column{'completed'});
    $job->branch_code($column{'branch_code'});
340 341
    $job->runtime_msec($column{'runtime_msec'});
    $job->query_count($column{'query_count'});
Jessica Severin's avatar
Jessica Severin committed
342
    $job->adaptor($self);
343 344
    
    if($column{'input_id'} =~ /_ext_input_analysis_data_id (\d+)/) {
345
      #print("input_id was too big so stored in analysis_data table as dbID $1 -- fetching now\n");
346 347
      $job->input_id($self->db->get_AnalysisDataAdaptor->fetch_by_dbID($1));
    }
Jessica Severin's avatar
Jessica Severin committed
348 349 350 351 352 353 354 355

    push @jobs, $job;    
  }
  $sth->finish;
  
  return \@jobs
}

356

Jessica Severin's avatar
Jessica Severin committed
357 358 359 360 361 362
#
# STORE / UPDATE METHODS
#
################

=head2 update_status
Jessica Severin's avatar
Jessica Severin committed
363

Jessica Severin's avatar
Jessica Severin committed
364 365
  Arg [1]    : $analysis_id
  Example    :
Jessica Severin's avatar
Jessica Severin committed
366 367
  Description: updates the analysis_job.status in the database
  Returntype : 
Jessica Severin's avatar
Jessica Severin committed
368
  Exceptions :
Jessica Severin's avatar
Jessica Severin committed
369 370
  Caller     : general

Jessica Severin's avatar
Jessica Severin committed
371
=cut
Jessica Severin's avatar
Jessica Severin committed
372

Jessica Severin's avatar
Jessica Severin committed
373 374 375
sub update_status {
  my ($self,$job) = @_;

Jessica Severin's avatar
Jessica Severin committed
376
  my $sql = "UPDATE analysis_job SET status='".$job->status."' ";
377 378 379 380 381
  if($job->status eq 'DONE') {
    $sql .= ",completed=now(),branch_code=".$job->branch_code;
    $sql .= ",runtime_msec=".$job->runtime_msec;
    $sql .= ",query_count=".$job->query_count;
  }
Jessica Severin's avatar
Jessica Severin committed
382 383 384 385 386 387 388
  $sql .= " WHERE analysis_job_id='".$job->dbID."' ";
  
  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}

389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
sub reclaim_job {
  my $self   = shift;
  my $job    = shift;

  my $ug    = new Data::UUID;
  my $uuid  = $ug->create();
  $job->job_claim($ug->to_string( $uuid ));

  my $sql = "UPDATE analysis_job SET status='CLAIMED', job_claim=?, hive_id=? WHERE analysis_job_id=?";

  #print("$sql\n");            
  my $sth = $self->prepare($sql);
  $sth->execute($job->job_claim, $job->hive_id, $job->dbID);
  $sth->finish;
}


Jessica Severin's avatar
Jessica Severin committed
406
=head2 store_out_files
Jessica Severin's avatar
Jessica Severin committed
407

408
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisJob $job
Jessica Severin's avatar
Jessica Severin committed
409
  Example    :
Jessica Severin's avatar
Jessica Severin committed
410
  Description: if files are non-zero size, will update DB with location
Jessica Severin's avatar
Jessica Severin committed
411 412
  Returntype : 
  Exceptions :
Jessica Severin's avatar
Jessica Severin committed
413 414
  Caller     : Bio::EnsEMBL::Hive::Worker

Jessica Severin's avatar
Jessica Severin committed
415 416 417 418 419 420 421 422
=cut

sub store_out_files {
  my ($self,$job) = @_;

  return unless($job and ($job->stdout_file or $job->stderr_file));

  my $sql = "INSERT ignore INTO analysis_job_file (analysis_job_id, type, path) VALUES ";
423
  $sql .= " (" . $job->dbID. ", 'STDOUT', '". $job->stdout_file."')"  if($job->stdout_file);
Jessica Severin's avatar
Jessica Severin committed
424
  $sql .= "," if($job->stdout_file and $job->stderr_file);
425
  $sql .= " (" . $job->dbID. ", 'STDERR', '". $job->stderr_file."')"  if($job->stderr_file);
426
  #print("$sql\n");
427
  
Jessica Severin's avatar
Jessica Severin committed
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}


sub claim_jobs_for_worker {
  my $self     = shift;
  my $worker   = shift;

  throw("must define worker") unless($worker);

  my $ug    = new Data::UUID;
  my $uuid  = $ug->create();
  my $claim = $ug->to_string( $uuid );
Jessica Severin's avatar
Jessica Severin committed
443
  #print("claiming jobs for hive_id=", $worker->hive_id, " with uuid $claim\n");
Jessica Severin's avatar
Jessica Severin committed
444 445 446 447

  my $sql = "UPDATE analysis_job SET job_claim='$claim'".
            " , hive_id='". $worker->hive_id ."'".
            " , status='CLAIMED'".
448
            " WHERE job_claim='' and status='READY'".
Jessica Severin's avatar
Jessica Severin committed
449 450 451
            " AND analysis_id='" .$worker->analysis->dbID. "'".
            " LIMIT " . $worker->batch_size;

452
  #print("$sql\n");            
Jessica Severin's avatar
Jessica Severin committed
453 454 455 456 457 458 459
  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;

  return $claim;
}

460

461 462 463 464 465 466 467 468 469 470 471 472 473 474
=head2 reset_dead_jobs_for_worker

  Arg [1]    : Bio::EnsEMBL::Hive::Worker object
  Example    :
  Description: If a worker has died some of its jobs need to be reset back to 'READY'
               so they can be rerun.
               Jobs in state CLAIMED as simply reset back to READY.
               If jobs was in a 'working' state (GET_INPUT, RUN, WRITE_OUTPUT)) 
               the retry_count is incremented and the status set back to READY.
               If the retry_count >=7 the job is set to 'FAILED' and not rerun again.
  Exceptions : $worker must be defined
  Caller     : Bio::EnsEMBL::Hive::Queen

=cut
475 476 477 478 479 480

sub reset_dead_jobs_for_worker {
  my $self = shift;
  my $worker = shift;
  throw("must define worker") unless($worker);

481 482 483 484
  #added hive_id index to analysis_job table which made this operation much faster

  my ($sql, $sth);
  #first just reset the claimed jobs, these don't need a retry_count index increment
485
  $sql = "UPDATE analysis_job SET job_claim='', status='READY'".
486 487 488 489 490 491 492
         " WHERE status='CLAIMED'".
         " AND hive_id='" . $worker->hive_id ."'";
  $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
  #print("  done update CLAIMED\n");

493 494 495
  # an update with select on status and hive_id took 4seconds per worker to complete,
  # while doing a select followed by update on analysis_job_id returned almost instantly
  
496
  $sql = "UPDATE analysis_job SET job_claim='', status='READY'".
497 498
         " ,retry_count=retry_count+1".
         " WHERE status in ('GET_INPUT','RUN','WRITE_OUTPUT')".
499
	 " AND retry_count<7".
500
         " AND hive_id='" . $worker->hive_id ."'";
501
  #print("$sql\n");
502
  $sth = $self->prepare($sql);
503 504
  $sth->execute();
  $sth->finish;
505 506 507 508

  $sql = "UPDATE analysis_job SET status='FAILED'".
         " ,retry_count=retry_count+1".
         " WHERE status in ('GET_INPUT','RUN','WRITE_OUTPUT')".
509
	 " AND retry_count>=7".
510 511 512 513 514 515
         " AND hive_id='" . $worker->hive_id ."'";
  #print("$sql\n");
  $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;

516
  #print(" done update BROKEN jobs\n");
517 518
}

519 520 521 522 523 524 525 526 527 528
=head2 reset_job_by_dbID

  Arg [1]    : int $analysis_job_id
  Example    :
  Description: Forces a job to be reset to 'READY' so it can be run again.
               Will also reset a previously 'BLOCKED' jobs to READY.
  Exceptions : $job must be defined
  Caller     : user process

=cut
529

530
sub reset_job_by_dbID {
531
  my $self = shift;
532 533
  my $analysis_job_id   = shift;
  throw("must define job") unless($analysis_job_id);
534 535 536

  my ($sql, $sth);
  #first just reset the claimed jobs, these don't need a retry_count index increment
537
  $sql = "UPDATE analysis_job SET hive_id=0, retry_count=0, job_claim='', status='READY' WHERE analysis_job_id=?";
538
  $sth = $self->prepare($sql);
539
  $sth->execute($analysis_job_id);
540 541 542 543 544
  $sth->finish;
  #print("  done update CLAIMED\n");
}


545 546 547 548 549
=head2 reset_all_jobs_for_analysis_id

  Arg [1]    : int $analysis_id
  Example    :
  Description: Resets all not BLOCKED jobs back to READY so they can be rerun.
Jessica Severin's avatar
Jessica Severin committed
550
               Needed if an analysis/process modifies the dataflow rules as the
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
              system runs.  The jobs that are flowed 'from'  will need to be reset so
              that the output data can be flowed through the new rule.  
              If one is designing a system based on a need to change rules mid-process
              it is best to make sure such 'from' analyses that need to be reset are 'Dummy'
              types so that they can 'hold' the output from the previous step and not require
              the system to actually redo processing.
  Exceptions : $analysis_id must be defined
  Caller     : user RunnableDB subclasses which build dataflow rules on the fly

=cut

sub reset_all_jobs_for_analysis_id {
  my $self        = shift;
  my $analysis_id = shift;

  throw("must define analysis_id") unless($analysis_id);

  my ($sql, $sth);
  $sql = "UPDATE analysis_job SET job_claim='', status='READY' WHERE status!='BLOCKED' and analysis_id=?";
  $sth = $self->prepare($sql);
  $sth->execute($analysis_id);
  $sth->finish;
573 574

  $self->db->get_AnalysisStatsAdaptor->update_status($analysis_id, 'LOADING');
575 576 577
}


Jessica Severin's avatar
Jessica Severin committed
578 579 580
1;