AnalysisStatsAdaptor.pm 14.7 KB
Newer Older
1 2 3 4 5 6 7 8 9
# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
#
# Date of creation: 22.03.2004
# Original Creator : Jessica Severin <jessica@ebi.ac.uk>
#
# Copyright EMBL-EBI 2004
#
# You may distribute this module under the same terms as perl itself

10
=pod
11 12

=head1 NAME
13

14
  Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
15 16

=head1 SYNOPSIS
17

18 19 20 21
  $analysisStatsAdaptor = $db_adaptor->get_AnalysisStatsAdaptor;
  $analysisStatsAdaptor = $analysisStats->adaptor;

=head1 DESCRIPTION
22

23 24 25 26
  Module to encapsulate all db access for persistent class AnalysisStats.
  There should be just one per application and database connection.

=head1 CONTACT
27 28

  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
29 30

=head1 APPENDIX
31

32 33
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
34 35 36 37 38 39 40 41

=cut



package Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;

use strict;
42

43
use Bio::EnsEMBL::Utils::Argument;
44 45
use Bio::EnsEMBL::Utils::Exception ('throw');
use Bio::EnsEMBL::Hive::AnalysisStats;
46

47
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
48 49


50
=head2 fetch_by_analysis_id
51

52 53
  Arg [1]    : int $id
               the unique database identifier for the feature to be obtained
54
  Example    : $feat = $adaptor->fetch_by_analysis_id(1234);
55 56 57 58 59
  Description: Returns the feature created from the database defined by the
               the id $id.
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats
  Exceptions : thrown if $id is not defined
  Caller     : general
60

61 62
=cut

63
sub fetch_by_analysis_id {
64 65 66
  my ($self,$id) = @_;

  unless(defined $id) {
67
    throw("fetch_by_analysis_id must have an id");
68 69 70 71 72 73
  }

  my $constraint = "ast.analysis_id = $id";

  #return first element of _generic_fetch list
  my ($obj) = @{$self->_generic_fetch($constraint)};
74 75 76 77

  if(!defined($obj)) {
    throw("unable to fetch analysis_stats for analysis_id = $id\n");
  }
78 79 80 81
  return $obj;
}


82 83
sub fetch_all_by_suitability_rc_id_meadow_type {
    my ($self, $resource_class_id, $meadow_type) = @_;
84

85 86 87 88
    my $join = [[ ['analysis_base', 'a'], " ast.analysis_id=a.analysis_id "
                                                .( $resource_class_id ? "AND a.resource_class_id=$resource_class_id " : '')
                                                .( $meadow_type       ? "AND (a.meadow_type IS NULL OR a.meadow_type='$meadow_type') " : '')
               ]];
Leo Gordon's avatar
Leo Gordon committed
89

90 91 92
        # the ones that clearly have work to do:
        #
    my $primary_results = $self->_generic_fetch(
93
        "ast.num_required_workers>0 AND ast.status in ('READY', 'WORKING')" ,
94 95 96 97 98 99 100
        $join ,
        'ORDER BY a.priority DESC, ' . ( ($self->dbc->driver eq 'sqlite') ? 'RANDOM()' : 'RAND()' ),
    );

        # the ones that may have work to do after a sync:
        #
    my $secondary_results = $self->_generic_fetch(
101
        "ast.status in ('LOADING', 'BLOCKED', 'ALL_CLAIMED', 'SYNCHING')" ,
102 103 104 105 106
        $join ,
        'ORDER BY last_update',     # FIXME: could mix in a.priority if sync is not too expensive?
    );

    return [ @$primary_results, @$secondary_results ];
107 108
}

109

110 111 112 113 114 115 116 117 118
=head2 refresh

  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Description: reload the AnalysisStats object from the database
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats object - same one with reloaded data

=cut


119
sub refresh {
120
    my ($self, $stats) = @_;
121

122
    my $new_stats = $self->fetch_by_analysis_id( $stats->analysis_id );     # fetch into a separate object
123

124
    %$stats = %$new_stats;                                                  # copy the data over
125

126
    return $stats;
127 128 129
}


130
################
131 132 133 134 135
#
# STORE / UPDATE METHODS
#
################

136 137 138 139

sub store {
    my ($self, $stats) = @_;

140
    my $sql = "INSERT INTO analysis_stats (analysis_id, batch_size, hive_capacity, status) VALUES (?, ?, ?, ?)";
141 142

    my $sth = $self->prepare($sql);
143
    $sth->execute($stats->analysis_id, $stats->batch_size, $stats->hive_capacity, $stats->status);
144 145 146 147 148 149 150 151
    $sth->finish;

    $stats->adaptor( $self );

    return $stats;
}


152
=head2 update
153

154 155 156 157 158 159
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Example    :
  Description:
  Returntype : Bio::EnsEMBL::Hive::Worker
  Exceptions :
  Caller     :
160

161 162 163 164 165
=cut

sub update {
  my ($self, $stats) = @_;

166 167 168
  my $hive_capacity = $stats->hive_capacity;

  if ($stats->behaviour eq "DYNAMIC") {
169 170 171 172 173

    my $max_hive_capacity = $stats->avg_input_msec_per_job
        ? int($stats->input_capacity * $stats->avg_msec_per_job / $stats->avg_input_msec_per_job)
        : $hive_capacity;

174 175 176 177 178 179
    if ($stats->avg_output_msec_per_job) {
      my $max_hive_capacity2 = int($stats->output_capacity * $stats->avg_msec_per_job / $stats->avg_output_msec_per_job);
      if ($max_hive_capacity2 < $max_hive_capacity) {
        $max_hive_capacity = $max_hive_capacity2;
      }
    }
180 181

    $stats->hive_capacity( int( ($hive_capacity+$max_hive_capacity+1)/2 ) );
182 183
  }

184
  my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
185
  $sql .= ",batch_size=" . $stats->batch_size();
186
  $sql .= ",hive_capacity=" . (defined($stats->hive_capacity()) ? $stats->hive_capacity() : 'NULL');
187

188
  $sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job();
189 190 191
  $sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job();
  $sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job();
  $sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job();
192 193 194

  unless( $self->db->hive_use_triggers() ) {
      $sql .= ",total_job_count=" . $stats->total_job_count();
195 196
      $sql .= ",semaphored_job_count=" . $stats->semaphored_job_count();
      $sql .= ",ready_job_count=" . $stats->ready_job_count();
197 198 199
      $sql .= ",done_job_count=" . $stats->done_job_count();
      $sql .= ",failed_job_count=" . $stats->failed_job_count();

200 201 202
      $stats->num_running_workers( $self->db->get_Queen->count_running_workers( $stats->analysis_id() ) );
      $sql .= ",num_running_workers=" . $stats->num_running_workers();
  }
203

204
  $sql .= ",num_required_workers=" . $stats->num_required_workers();
205
  $sql .= ",last_update=CURRENT_TIMESTAMP";
Leo Gordon's avatar
Leo Gordon committed
206
  $sql .= ",sync_lock='0'";
207 208 209 210 211
  $sql .= " WHERE analysis_id='".$stats->analysis_id."' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
212
  $sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT CURRENT_TIMESTAMP, analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id);
213 214
  $sth->execute();
  $sth->finish;
215 216 217 218
  $stats->seconds_since_last_update(0); #not exact but good enough :)
}


219
sub update_status {
220 221 222 223 224 225 226 227 228 229 230
  my ($self, $analysis_id, $status) = @_;

  my $sql = "UPDATE analysis_stats SET status='$status' ";
  $sql .= " WHERE analysis_id='$analysis_id' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}


231 232 233 234 235
=head2 interval_update_work_done

  Arg [1]     : int $analysis_id
  Arg [2]     : int $jobs_done_in_interval
  Arg [3]     : int $interval_msec
236 237 238 239 240
  Arg [4]     : int $fetching_msec
  Arg [5]     : int $running_msec
  Arg [6]     : int $writing_msec
  Arg [7]     : real $weight_factor [optional]
  Example     : $statsDBA->interval_update_work_done($analysis_id, $jobs_done, $interval_msec, $fetching_msec, $running_msec, $writing_msec);
241 242 243 244 245 246 247
  Description : does a database update to recalculate the avg_msec_per_job and done_job_count
                does an interval equation by multiplying out the previous done_job_count with the
                previous avg_msec_per_job and then expanding by new interval values to give a better average.
  Caller      : Bio::EnsEMBL::Hive::Worker

=cut

248 249
sub interval_update_work_done {
  my ($self, $analysis_id, $job_count, $interval_msec, $fetching_msec, $running_msec, $writing_msec, $weight_factor) = @_;
250 251

  $weight_factor ||= 3; # makes it more sensitive to the dynamics of the farm
252

253 254 255 256 257 258 259 260 261 262
  my $sql = $self->db->hive_use_triggers()
  ? qq{
    UPDATE analysis_stats SET
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count))
    WHERE analysis_id= $analysis_id
  }
  : qq{
263 264 265 266 267
    UPDATE analysis_stats SET
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count)), 
268
        ready_job_count = ready_job_count - $job_count, 
Leo Gordon's avatar
Leo Gordon committed
269
        done_job_count = done_job_count + $job_count 
270 271
    WHERE analysis_id= $analysis_id
  };
272

273 274 275 276
  $self->dbc->do($sql);
}


277
sub increase_running_workers {
278 279 280 281 282 283 284 285 286 287
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}


288
sub decrease_running_workers {
289 290 291 292 293 294 295 296 297
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}

298 299

sub decrease_required_workers {
300 301 302
  my $self = shift;
  my $analysis_id = shift;

303 304
  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers-1 ".
            "WHERE analysis_id='$analysis_id' ";
305

306 307 308 309
  $self->dbc->do($sql);
}


310
sub increase_required_workers {
311 312 313 314 315 316 317
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers+1 ".
            "WHERE analysis_id='$analysis_id' ";

  $self->dbc->do($sql);
318 319 320
}


321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
#
# INTERNAL METHODS
#
###################

=head2 _generic_fetch

  Arg [1]    : (optional) string $constraint
               An SQL query constraint (i.e. part of the WHERE clause)
  Arg [2]    : (optional) string $logic_name
               the logic_name of the analysis of the features to obtain
  Example    : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
  Description: Performs a database fetch and returns feature objects in
               contig coordinates.
  Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
  Exceptions : none
  Caller     : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch

=cut
  
sub _generic_fetch {
342
  my ($self, $constraint, $join, $final_clause) = @_;
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
  
  my @tables = $self->_tables;
  my $columns = join(', ', $self->_columns());
  
  if ($join) {
    foreach my $single_join (@{$join}) {
      my ($tablename, $condition, $extra_columns) = @{$single_join};
      if ($tablename && $condition) {
        push @tables, $tablename;
        
        if($constraint) {
          $constraint .= " AND $condition";
        } else {
          $constraint = " $condition";
        }
      } 
      if ($extra_columns) {
        $columns .= ", " . join(', ', @{$extra_columns});
      }
    }
  }
      
  #construct a nice table string like 'table1 t1, table2 t2'
  my $tablenames = join(', ', map({ join(' ', @$_) } @tables));

  my $sql = "SELECT $columns FROM $tablenames";

  my $default_where = $self->_default_where_clause;

  #append a where clause if it was defined
  if($constraint) { 
    $sql .= " WHERE $constraint ";
    if($default_where) {
      $sql .= " AND $default_where ";
    }
  } elsif($default_where) {
    $sql .= " WHERE $default_where ";
  }

  #append additional clauses which may have been defined
383
  $sql .= " $final_clause" if($final_clause);
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400
  #rint STDOUT $sql,"\n";

  my $sth = $self->prepare($sql);
  $sth->execute;  


  return $self->_objs_from_sth($sth);
}

sub _tables {
  my $self = shift;

  return (['analysis_stats', 'ast']);
}

sub _columns {
  my $self = shift;
401

402 403 404
  my @columns = qw (ast.analysis_id
                    ast.batch_size
                    ast.hive_capacity
405 406
                    ast.status

407
                    ast.total_job_count
408 409
                    ast.semaphored_job_count
                    ast.ready_job_count
410
                    ast.done_job_count
411
                    ast.failed_job_count
412
                    ast.num_running_workers
413
                    ast.num_required_workers
414 415 416 417 418 419 420 421 422 423

                    ast.behaviour
                    ast.input_capacity
                    ast.output_capacity

                    ast.avg_msec_per_job
                    ast.avg_input_msec_per_job
                    ast.avg_run_msec_per_job
                    ast.avg_output_msec_per_job

424
                    ast.last_update
425
                    ast.sync_lock
426
                   );
427 428 429 430

  push @columns , ($self->dbc->driver eq 'sqlite')
                    ? "strftime('%s','now')-strftime('%s',ast.last_update) seconds_since_last_update "
                    : "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
431 432 433 434 435 436 437 438 439 440 441 442
  return @columns;            
}

sub _objs_from_sth {
  my ($self, $sth) = @_;
  
  my %column;
  $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));

  my @statsArray = ();

  while ($sth->fetch()) {
443
    my $analStats = Bio::EnsEMBL::Hive::AnalysisStats->new();
444

445
    $analStats->analysis_id($column{'analysis_id'});
446 447
    $analStats->batch_size($column{'batch_size'});
    $analStats->hive_capacity($column{'hive_capacity'});
448 449
    $analStats->status($column{'status'});

450
    $analStats->total_job_count($column{'total_job_count'});
451 452
    $analStats->semaphored_job_count($column{'semaphored_job_count'});
    $analStats->ready_job_count($column{'ready_job_count'});
453
    $analStats->done_job_count($column{'done_job_count'});
454
    $analStats->failed_job_count($column{'failed_job_count'});
455
    $analStats->num_running_workers($column{'num_running_workers'});
456
    $analStats->num_required_workers($column{'num_required_workers'});
457 458 459 460 461 462 463 464 465 466

    $analStats->behaviour($column{'behaviour'});
    $analStats->input_capacity($column{'input_capacity'});
    $analStats->output_capacity($column{'output_capacity'});

    $analStats->avg_msec_per_job($column{'avg_msec_per_job'});
    $analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
    $analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'});
    $analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'});

467
    $analStats->seconds_since_last_update($column{'seconds_since_last_update'});
468
    $analStats->sync_lock($column{'sync_lock'});
469

470 471
    $analStats->adaptor($self);

472
    push @statsArray, $analStats;
473 474
  }
  $sth->finish;
475

476 477 478 479 480 481
  return \@statsArray
}


1;