AnalysisStatsAdaptor.pm 15.3 KB
Newer Older
1
=pod
2 3

=head1 NAME
4

5
    Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
6 7

=head1 SYNOPSIS
8

9 10
    $analysisStatsAdaptor = $db_adaptor->get_AnalysisStatsAdaptor;
    $analysisStatsAdaptor = $analysisStats->adaptor;
11 12

=head1 DESCRIPTION
13

14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
    Module to encapsulate all db access for persistent class AnalysisStats.
    There should be just one per application and database connection.

=head1 LICENSE

    Copyright [1999-2013] Wellcome Trust Sanger Institute and the EMBL-European Bioinformatics Institute

    Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
    You may obtain a copy of the License at

         http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software distributed under the License
    is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and limitations under the License.
29 30

=head1 CONTACT
31

32
    Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
33 34

=head1 APPENDIX
35

36 37
    The rest of the documentation details each of the object methods.
    Internal methods are usually preceded with a _
38 39 40 41 42 43 44

=cut


package Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;

use strict;
45

46
use Bio::EnsEMBL::Utils::Argument;
47 48
use Bio::EnsEMBL::Utils::Exception ('throw');
use Bio::EnsEMBL::Hive::AnalysisStats;
49

50
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
51 52


53
=head2 fetch_by_analysis_id
54

55 56
  Arg [1]    : int $id
               the unique database identifier for the feature to be obtained
57
  Example    : $feat = $adaptor->fetch_by_analysis_id(1234);
58 59 60 61 62
  Description: Returns the feature created from the database defined by the
               the id $id.
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats
  Exceptions : thrown if $id is not defined
  Caller     : general
63

64 65
=cut

66
sub fetch_by_analysis_id {
67 68 69
  my ($self,$id) = @_;

  unless(defined $id) {
70
    throw("fetch_by_analysis_id must have an id");
71 72 73 74 75 76
  }

  my $constraint = "ast.analysis_id = $id";

  #return first element of _generic_fetch list
  my ($obj) = @{$self->_generic_fetch($constraint)};
77 78 79 80

  if(!defined($obj)) {
    throw("unable to fetch analysis_stats for analysis_id = $id\n");
  }
81 82 83 84
  return $obj;
}


85 86
sub fetch_all_by_suitability_rc_id_meadow_type {
    my ($self, $resource_class_id, $meadow_type) = @_;
87

88 89 90 91
    my $join = [[ ['analysis_base', 'a'], " ast.analysis_id=a.analysis_id "
                                                .( $resource_class_id ? "AND a.resource_class_id=$resource_class_id " : '')
                                                .( $meadow_type       ? "AND (a.meadow_type IS NULL OR a.meadow_type='$meadow_type') " : '')
               ]];
Leo Gordon's avatar
Leo Gordon committed
92

93 94 95
        # the ones that clearly have work to do:
        #
    my $primary_results = $self->_generic_fetch(
96
        "ast.num_required_workers>0 AND ast.status in ('READY', 'WORKING')" ,
97
        $join ,
98
        'ORDER BY a.priority DESC, ' . ( ($self->dbc->driver eq 'mysql') ? 'RAND()' : 'RANDOM()' ),
99 100 101 102 103
    );

        # the ones that may have work to do after a sync:
        #
    my $secondary_results = $self->_generic_fetch(
104
        "ast.status in ('LOADING', 'BLOCKED', 'ALL_CLAIMED', 'SYNCHING')" ,
105 106 107 108 109
        $join ,
        'ORDER BY last_update',     # FIXME: could mix in a.priority if sync is not too expensive?
    );

    return [ @$primary_results, @$secondary_results ];
110 111
}

112

113 114 115 116 117 118 119 120 121
=head2 refresh

  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Description: reload the AnalysisStats object from the database
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats object - same one with reloaded data

=cut


122
sub refresh {
123
    my ($self, $stats) = @_;
124

125
    my $new_stats = $self->fetch_by_analysis_id( $stats->analysis_id );     # fetch into a separate object
126

127
    %$stats = %$new_stats;                                                  # copy the data over
128

129
    return $stats;
130 131 132
}


133
################
134 135 136 137 138
#
# STORE / UPDATE METHODS
#
################

139 140 141 142

sub store {
    my ($self, $stats) = @_;

143
    my $sql = "INSERT INTO analysis_stats (analysis_id, batch_size, hive_capacity, status) VALUES (?, ?, ?, ?)";
144 145

    my $sth = $self->prepare($sql);
146
    $sth->execute($stats->analysis_id, $stats->batch_size, $stats->hive_capacity, $stats->status);
147 148 149 150 151 152 153 154
    $sth->finish;

    $stats->adaptor( $self );

    return $stats;
}


155
=head2 update
156

157 158 159 160 161 162
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Example    :
  Description:
  Returntype : Bio::EnsEMBL::Hive::Worker
  Exceptions :
  Caller     :
163

164 165 166 167 168
=cut

sub update {
  my ($self, $stats) = @_;

169 170 171
  my $hive_capacity = $stats->hive_capacity;

  if ($stats->behaviour eq "DYNAMIC") {
172 173 174 175 176

    my $max_hive_capacity = $stats->avg_input_msec_per_job
        ? int($stats->input_capacity * $stats->avg_msec_per_job / $stats->avg_input_msec_per_job)
        : $hive_capacity;

177 178 179 180 181 182
    if ($stats->avg_output_msec_per_job) {
      my $max_hive_capacity2 = int($stats->output_capacity * $stats->avg_msec_per_job / $stats->avg_output_msec_per_job);
      if ($max_hive_capacity2 < $max_hive_capacity) {
        $max_hive_capacity = $max_hive_capacity2;
      }
    }
183 184

    $stats->hive_capacity( int( ($hive_capacity+$max_hive_capacity+1)/2 ) );
185 186
  }

187
  my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
188
  $sql .= ",batch_size=" . $stats->batch_size();
189
  $sql .= ",hive_capacity=" . (defined($stats->hive_capacity()) ? $stats->hive_capacity() : 'NULL');
190

191
  $sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job();
192 193 194
  $sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job();
  $sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job();
  $sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job();
195 196 197

  unless( $self->db->hive_use_triggers() ) {
      $sql .= ",total_job_count=" . $stats->total_job_count();
198 199
      $sql .= ",semaphored_job_count=" . $stats->semaphored_job_count();
      $sql .= ",ready_job_count=" . $stats->ready_job_count();
200 201 202
      $sql .= ",done_job_count=" . $stats->done_job_count();
      $sql .= ",failed_job_count=" . $stats->failed_job_count();

203 204 205
      $stats->num_running_workers( $self->db->get_Queen->count_running_workers( $stats->analysis_id() ) );
      $sql .= ",num_running_workers=" . $stats->num_running_workers();
  }
206

207
  $sql .= ",num_required_workers=" . $stats->num_required_workers();
208
  $sql .= ",last_update=CURRENT_TIMESTAMP";
Leo Gordon's avatar
Leo Gordon committed
209
  $sql .= ",sync_lock='0'";
210 211 212 213 214
  $sql .= " WHERE analysis_id='".$stats->analysis_id."' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
215
  $sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT CURRENT_TIMESTAMP, analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id);
216 217
  $sth->execute();
  $sth->finish;
218 219 220 221
  $stats->seconds_since_last_update(0); #not exact but good enough :)
}


222
sub update_status {
223 224 225 226 227 228 229 230 231 232 233
  my ($self, $analysis_id, $status) = @_;

  my $sql = "UPDATE analysis_stats SET status='$status' ";
  $sql .= " WHERE analysis_id='$analysis_id' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}


234 235 236 237 238
=head2 interval_update_work_done

  Arg [1]     : int $analysis_id
  Arg [2]     : int $jobs_done_in_interval
  Arg [3]     : int $interval_msec
239 240 241 242 243
  Arg [4]     : int $fetching_msec
  Arg [5]     : int $running_msec
  Arg [6]     : int $writing_msec
  Arg [7]     : real $weight_factor [optional]
  Example     : $statsDBA->interval_update_work_done($analysis_id, $jobs_done, $interval_msec, $fetching_msec, $running_msec, $writing_msec);
244 245 246 247 248 249 250
  Description : does a database update to recalculate the avg_msec_per_job and done_job_count
                does an interval equation by multiplying out the previous done_job_count with the
                previous avg_msec_per_job and then expanding by new interval values to give a better average.
  Caller      : Bio::EnsEMBL::Hive::Worker

=cut

251 252
sub interval_update_work_done {
  my ($self, $analysis_id, $job_count, $interval_msec, $fetching_msec, $running_msec, $writing_msec, $weight_factor) = @_;
253 254

  $weight_factor ||= 3; # makes it more sensitive to the dynamics of the farm
255

256 257 258 259 260 261 262 263 264 265
  my $sql = $self->db->hive_use_triggers()
  ? qq{
    UPDATE analysis_stats SET
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count))
    WHERE analysis_id= $analysis_id
  }
  : qq{
266 267 268 269 270
    UPDATE analysis_stats SET
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count)), 
271
        ready_job_count = ready_job_count - $job_count, 
Leo Gordon's avatar
Leo Gordon committed
272
        done_job_count = done_job_count + $job_count 
273 274
    WHERE analysis_id= $analysis_id
  };
275

276 277 278 279
  $self->dbc->do($sql);
}


280
sub increase_running_workers {
281 282 283 284 285 286 287 288 289 290
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}


291
sub decrease_running_workers {
292 293 294 295 296 297 298 299 300
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}

301 302

sub decrease_required_workers {
303 304 305
  my $self = shift;
  my $analysis_id = shift;

306 307
  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers-1 ".
            "WHERE analysis_id='$analysis_id' ";
308

309 310 311 312
  $self->dbc->do($sql);
}


313
sub increase_required_workers {
314 315 316 317 318 319 320
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers+1 ".
            "WHERE analysis_id='$analysis_id' ";

  $self->dbc->do($sql);
321 322 323
}


324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
#
# INTERNAL METHODS
#
###################

=head2 _generic_fetch

  Arg [1]    : (optional) string $constraint
               An SQL query constraint (i.e. part of the WHERE clause)
  Arg [2]    : (optional) string $logic_name
               the logic_name of the analysis of the features to obtain
  Example    : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
  Description: Performs a database fetch and returns feature objects in
               contig coordinates.
  Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
  Exceptions : none
  Caller     : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch

=cut
  
sub _generic_fetch {
345
  my ($self, $constraint, $join, $final_clause) = @_;
346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
  
  my @tables = $self->_tables;
  my $columns = join(', ', $self->_columns());
  
  if ($join) {
    foreach my $single_join (@{$join}) {
      my ($tablename, $condition, $extra_columns) = @{$single_join};
      if ($tablename && $condition) {
        push @tables, $tablename;
        
        if($constraint) {
          $constraint .= " AND $condition";
        } else {
          $constraint = " $condition";
        }
      } 
      if ($extra_columns) {
        $columns .= ", " . join(', ', @{$extra_columns});
      }
    }
  }
      
  #construct a nice table string like 'table1 t1, table2 t2'
  my $tablenames = join(', ', map({ join(' ', @$_) } @tables));

  my $sql = "SELECT $columns FROM $tablenames";

  my $default_where = $self->_default_where_clause;

  #append a where clause if it was defined
  if($constraint) { 
    $sql .= " WHERE $constraint ";
    if($default_where) {
      $sql .= " AND $default_where ";
    }
  } elsif($default_where) {
    $sql .= " WHERE $default_where ";
  }

  #append additional clauses which may have been defined
386
  $sql .= " $final_clause" if($final_clause);
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403
  #rint STDOUT $sql,"\n";

  my $sth = $self->prepare($sql);
  $sth->execute;  


  return $self->_objs_from_sth($sth);
}

sub _tables {
  my $self = shift;

  return (['analysis_stats', 'ast']);
}

sub _columns {
  my $self = shift;
404

405 406 407
  my @columns = qw (ast.analysis_id
                    ast.batch_size
                    ast.hive_capacity
408 409
                    ast.status

410
                    ast.total_job_count
411 412
                    ast.semaphored_job_count
                    ast.ready_job_count
413
                    ast.done_job_count
414
                    ast.failed_job_count
415
                    ast.num_running_workers
416
                    ast.num_required_workers
417 418 419 420 421 422 423 424 425 426

                    ast.behaviour
                    ast.input_capacity
                    ast.output_capacity

                    ast.avg_msec_per_job
                    ast.avg_input_msec_per_job
                    ast.avg_run_msec_per_job
                    ast.avg_output_msec_per_job

427
                    ast.last_update
428
                    ast.sync_lock
429
                   );
430

431 432 433 434 435 436 437
    push @columns, {
            'mysql'     => "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ",
            'sqlite'    => "strftime('%s','now')-strftime('%s',ast.last_update) seconds_since_last_update ",
            'pgsql'     => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - ast.last_update) seconds_since_last_update ",
        }->{ $self->dbc->driver };

    return @columns;            
438 439 440 441 442 443 444 445 446 447 448
}

sub _objs_from_sth {
  my ($self, $sth) = @_;
  
  my %column;
  $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));

  my @statsArray = ();

  while ($sth->fetch()) {
449
    my $analStats = Bio::EnsEMBL::Hive::AnalysisStats->new();
450

451
    $analStats->analysis_id($column{'analysis_id'});
452 453
    $analStats->batch_size($column{'batch_size'});
    $analStats->hive_capacity($column{'hive_capacity'});
454 455
    $analStats->status($column{'status'});

456
    $analStats->total_job_count($column{'total_job_count'});
457 458
    $analStats->semaphored_job_count($column{'semaphored_job_count'});
    $analStats->ready_job_count($column{'ready_job_count'});
459
    $analStats->done_job_count($column{'done_job_count'});
460
    $analStats->failed_job_count($column{'failed_job_count'});
461
    $analStats->num_running_workers($column{'num_running_workers'});
462
    $analStats->num_required_workers($column{'num_required_workers'});
463 464 465 466 467 468 469 470 471 472

    $analStats->behaviour($column{'behaviour'});
    $analStats->input_capacity($column{'input_capacity'});
    $analStats->output_capacity($column{'output_capacity'});

    $analStats->avg_msec_per_job($column{'avg_msec_per_job'});
    $analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
    $analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'});
    $analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'});

473
    $analStats->seconds_since_last_update($column{'seconds_since_last_update'});
474
    $analStats->sync_lock($column{'sync_lock'});
475

476 477
    $analStats->adaptor($self);

478
    push @statsArray, $analStats;
479 480
  }
  $sth->finish;
481

482 483 484 485 486 487
  return \@statsArray
}


1;