AnalysisJobAdaptor.pm 10.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor
#
# Date of creation: 22.03.2004
# Original Creator : Jessica Severin <jessica@ebi.ac.uk>
#
# Copyright EMBL-EBI 2004
#
# You may distribute this module under the same terms as perl itself

# POD documentation - main docs before the code

=head1 NAME

Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor

=head1 SYNOPSIS

  $analysisJobAdaptor = $db_adaptor->get_AnalysisJobAdaptor;
  $analysisJobAdaptor = $analysisJob->adaptor;

=head1 DESCRIPTION

  Module to encapsulate all db access for persistent class AnalysisJob.
  There should be just one per application and database connection.

=head1 CONTACT

    Contact Jessica Severin on implemetation/design detail: jessica@ebi.ac.uk
    Contact Ewan Birney on EnsEMBL in general: birney@sanger.ac.uk

=head1 APPENDIX

The rest of the documentation details each of the object methods. Internal methods are usually preceded with a _

=cut


# Let the code begin...

package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
Jessica Severin's avatar
Jessica Severin committed
41 42 43 44 45 46 47 48

use strict;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::DBSQL::BaseAdaptor;
use Sys::Hostname;
use Data::UUID;

49 50 51
use Bio::EnsEMBL::Utils::Argument qw(rearrange);
use Bio::EnsEMBL::Utils::Exception qw(throw warning);

Jessica Severin's avatar
Jessica Severin committed
52 53
our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);

54 55 56 57 58
###############################################################################
#
#  CLASS methods
#
###############################################################################
Jessica Severin's avatar
Jessica Severin committed
59

60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
sub CreateNewJob {
  my ($class, @args) = @_;

  return undef unless(scalar @args);

  my ($input_id, $analysis, $input_analysis_job_id, $blocked) =
     rearrange([qw(INPUT_ID ANALYSIS input_job_id BLOCK )], @args);

  $input_analysis_job_id=0 unless($input_analysis_job_id);
  throw("must define input_id") unless($input_id);
  throw("must define analysis") unless($analysis);
  throw("analysis must be [Bio::EnsEMBL::Analysis] not a [$analysis]")
    unless($analysis->isa('Bio::EnsEMBL::Analysis'));

  my $sql = "INSERT ignore into analysis_job ".
            " SET input_id=\"$input_id\" ".
            " ,input_analysis_job_id='$input_analysis_job_id' ".
            " ,analysis_id='".$analysis->dbID ."' ";
  $sql .= " ,status='BLOCKED', job_claim='BLOCKED'" if($blocked);

  my $dbc = $analysis->adaptor->db;
  my $sth = $dbc->prepare($sql);
  $sth->execute();
  my $dbID = $sth->{'mysql_insertid'};
  $sth->finish;

  return $dbID;
}

###############################################################################
#
#  INSTANCE methods
#
###############################################################################
Jessica Severin's avatar
Jessica Severin committed
94

95
=head2 fetch_by_dbID
Jessica Severin's avatar
Jessica Severin committed
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
  Arg [1]    : int $id
               the unique database identifier for the feature to be obtained
  Example    : $feat = $adaptor->fetch_by_dbID(1234);
  Description: Returns the feature created from the database defined by the
               the id $id.
  Returntype : Bio::EnsEMBL::Hive::AnalysisJob
  Exceptions : thrown if $id is not defined
  Caller     : general
=cut
sub fetch_by_dbID {
  my ($self,$id) = @_;

  unless(defined $id) {
    $self->throw("fetch_by_dbID must have an id");
  }

  my @tabs = $self->_tables;

  my ($name, $syn) = @{$tabs[0]};

  #construct a constraint like 't1.table1_id = 1'
  my $constraint = "${syn}.${name}_id = $id";

  #return first element of _generic_fetch list
  my ($obj) = @{$self->_generic_fetch($constraint)};
  return $obj;
}

124

Jessica Severin's avatar
Jessica Severin committed
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
=head2 fetch_by_claim_analysis
  Arg [1]    : string job_claim (the UUID used to claim jobs)
  Arg [2]    : int analysis_id  
  Example    : $jobs = $adaptor->fetch_by_claim_analysis('c6658fde-64ab-4088-8526-2e960bd5dd60',208);
  Description: Returns a list of jobs for a claim id
  Returntype : Bio::EnsEMBL::Hive::AnalysisJob
  Exceptions : thrown if claim_id or analysis_id is not defined
  Caller     : general
=cut
sub fetch_by_claim_analysis {
  my ($self,$claim,$analysis_id) = @_;

  $self->throw("fetch_by_claim_analysis must have claim ID") unless($claim);
  $self->throw("fetch_by_claim_analysis must have analysis_id") unless($analysis_id);
  my $constraint = "a.job_claim='$claim' AND a.analysis_id='$analysis_id'";
  return $self->_generic_fetch($constraint);
}


=head2 fetch_all
  Arg        : None
  Example    : 
  Description: 
  Returntype : 
  Exceptions : 
  Caller     : 
=cut
sub fetch_all {
  my $self = shift;

  return $self->_generic_fetch();
}



#
# INTERNAL METHODS
#
###################

=head2 _generic_fetch
  Arg [1]    : (optional) string $constraint
               An SQL query constraint (i.e. part of the WHERE clause)
  Arg [2]    : (optional) string $logic_name
               the logic_name of the analysis of the features to obtain
  Example    : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
  Description: Performs a database fetch and returns feature objects in
               contig coordinates.
  Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
  Exceptions : none
  Caller     : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch
=cut
sub _generic_fetch {
  my ($self, $constraint, $join) = @_;
  
  my @tables = $self->_tables;
  my $columns = join(', ', $self->_columns());
  
  if ($join) {
    foreach my $single_join (@{$join}) {
      my ($tablename, $condition, $extra_columns) = @{$single_join};
      if ($tablename && $condition) {
        push @tables, $tablename;
        
        if($constraint) {
          $constraint .= " AND $condition";
        } else {
          $constraint = " $condition";
        }
      } 
      if ($extra_columns) {
        $columns .= ", " . join(', ', @{$extra_columns});
      }
    }
  }
      
  #construct a nice table string like 'table1 t1, table2 t2'
  my $tablenames = join(', ', map({ join(' ', @$_) } @tables));

  my $sql = "SELECT $columns FROM $tablenames";

  my $default_where = $self->_default_where_clause;
  my $final_clause = $self->_final_clause;

  #append a where clause if it was defined
  if($constraint) { 
    $sql .= " WHERE $constraint ";
    if($default_where) {
      $sql .= " AND $default_where ";
    }
  } elsif($default_where) {
    $sql .= " WHERE $default_where ";
  }

  #append additional clauses which may have been defined
  $sql .= " $final_clause";

  my $sth = $self->prepare($sql);
  $sth->execute;  

225
  #print STDOUT $sql,"\n";
Jessica Severin's avatar
Jessica Severin committed
226 227 228 229

  return $self->_objs_from_sth($sth);
}

230

Jessica Severin's avatar
Jessica Severin committed
231 232 233 234 235 236
sub _tables {
  my $self = shift;

  return (['analysis_job', 'a']);
}

237

Jessica Severin's avatar
Jessica Severin committed
238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
sub _columns {
  my $self = shift;

  return qw (a.analysis_job_id  
             a.input_analysis_job_id
             a.analysis_id	      
             a.input_id 
             a.job_claim  
             a.hive_id	      
             a.status 
             a.retry_count          
             a.completed
             a.branch_code
            );
}

254

Jessica Severin's avatar
Jessica Severin committed
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
sub _objs_from_sth {
  my ($self, $sth) = @_;
  
  my %column;
  $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));

  my @jobs = ();

  while ($sth->fetch()) {
    my $job = new Bio::EnsEMBL::Hive::AnalysisJob;

    $job->dbID($column{'analysis_job_id'});
    $job->analysis_id($column{'analysis_id'});
    $job->input_id($column{'input_id'});
    $job->job_claim($column{'job_claim'});
    $job->hive_id($column{'hive_id'});
    $job->status($column{'status'});
    $job->retry_count($column{'retry_count'});
    $job->completed($column{'completed'});
    $job->branch_code($column{'branch_code'});
    $job->adaptor($self);

    push @jobs, $job;    
  }
  $sth->finish;
  
  return \@jobs
}

284

Jessica Severin's avatar
Jessica Severin committed
285 286 287 288 289
sub _default_where_clause {
  my $self = shift;
  return '';
}

290

Jessica Severin's avatar
Jessica Severin committed
291 292
sub _final_clause {
  my $self = shift;
293
  return 'ORDER BY retry_count';
Jessica Severin's avatar
Jessica Severin committed
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
}


#
# STORE / UPDATE METHODS
#
################

=head2 update_status
  Arg [1]    : $analysis_id
  Example    :
  Description:
  Returntype : Bio::EnsEMBL::Hive::Worker
  Exceptions :
  Caller     :
=cut
sub update_status {
  my ($self,$job) = @_;

  my $sql = "UPDATE analysis_job ".
            " SET status='".$job->status."' ";
  $sql .= " ,completed=now(),branch_code=".$job->branch_code if($job->status eq 'DONE');
  $sql .= " WHERE analysis_job_id='".$job->dbID."' ";
  
  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}

=head2 store_out_files
  Arg [1]    : Bio::EnsEMBL::Compar::Hive::AnalysisJob $job
  Example    :
  Description: if files are non-zero size, will update DB
  Returntype : 
  Exceptions :
  Caller     :
=cut

sub store_out_files {
  my ($self,$job) = @_;

  return unless($job and ($job->stdout_file or $job->stderr_file));

  my $sql = "INSERT ignore INTO analysis_job_file (analysis_job_id, type, path) VALUES ";
338
  $sql .= " (" . $job->dbID. ", 'STDOUT', '". $job->stdout_file."')"  if($job->stdout_file);
Jessica Severin's avatar
Jessica Severin committed
339
  $sql .= "," if($job->stdout_file and $job->stderr_file);
340
  $sql .= " (" . $job->dbID. ", 'STDERR', '". $job->stderr_file."')"  if($job->stderr_file);
341
  #print("$sql\n");
342
  
Jessica Severin's avatar
Jessica Severin committed
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}


sub claim_jobs_for_worker {
  my $self     = shift;
  my $worker   = shift;

  throw("must define worker") unless($worker);

  my $ug    = new Data::UUID;
  my $uuid  = $ug->create();
  my $claim = $ug->to_string( $uuid );
358
  print("claiming jobs for hive_id=", $worker->hive_id, " with uuid $claim\n");
Jessica Severin's avatar
Jessica Severin committed
359 360 361 362 363 364 365 366

  my $sql = "UPDATE analysis_job SET job_claim='$claim'".
            " , hive_id='". $worker->hive_id ."'".
            " , status='CLAIMED'".
            " WHERE job_claim='' ".
            " AND analysis_id='" .$worker->analysis->dbID. "'".
            " LIMIT " . $worker->batch_size;

367
  #print("$sql\n");            
Jessica Severin's avatar
Jessica Severin committed
368 369 370 371 372 373 374
  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;

  return $claim;
}

375 376 377 378 379 380

sub reset_dead_jobs_for_worker {
  my $self = shift;
  my $worker = shift;
  throw("must define worker") unless($worker);

381 382 383 384 385 386 387 388 389 390 391 392
  #added hive_id index to analysis_job table which made this operation much faster

  my ($sql, $sth);
  #first just reset the claimed jobs, these don't need a retry_count index increment
  $sql = "UPDATE analysis_job SET job_claim='', hive_id=0, status='READY'".
         " WHERE status='CLAIMED'".
         " AND hive_id='" . $worker->hive_id ."'";
  $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
  #print("  done update CLAIMED\n");

393 394 395
  # an update with select on status and hive_id took 4seconds per worker to complete,
  # while doing a select followed by update on analysis_job_id returned almost instantly
  
396 397 398
  $sql = "UPDATE analysis_job SET job_claim='', hive_id=0, status='READY'".
         " ,retry_count=retry_count+1".
         " WHERE status in ('GET_INPUT','RUN','WRITE_OUTPUT')".
399
	 " AND retry_count<5".
400
         " AND hive_id='" . $worker->hive_id ."'";
401
  #print("$sql\n");
402
  $sth = $self->prepare($sql);
403 404
  $sth->execute();
  $sth->finish;
405 406 407 408 409 410 411 412 413 414 415

  $sql = "UPDATE analysis_job SET status='FAILED'".
         " ,retry_count=retry_count+1".
         " WHERE status in ('GET_INPUT','RUN','WRITE_OUTPUT')".
	 " AND retry_count>=5".
         " AND hive_id='" . $worker->hive_id ."'";
  #print("$sql\n");
  $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;

416
  #print(" done update BROKEN jobs\n");
417 418 419
}


Jessica Severin's avatar
Jessica Severin committed
420 421 422
1;