AnalysisStatsAdaptor.pm 16 KB
Newer Older
1
2
3
4
5
6
7
8
9
# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
#
# Date of creation: 22.03.2004
# Original Creator : Jessica Severin <jessica@ebi.ac.uk>
#
# Copyright EMBL-EBI 2004
#
# You may distribute this module under the same terms as perl itself

10
=pod
11
12

=head1 NAME
13

14
  Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
15
16

=head1 SYNOPSIS
17

18
19
20
21
  $analysisStatsAdaptor = $db_adaptor->get_AnalysisStatsAdaptor;
  $analysisStatsAdaptor = $analysisStats->adaptor;

=head1 DESCRIPTION
22

23
24
25
26
  Module to encapsulate all db access for persistent class AnalysisStats.
  There should be just one per application and database connection.

=head1 CONTACT
27
28

  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
29
30

=head1 APPENDIX
31

32
33
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
34
35
36
37
38
39
40
41
42
43

=cut



package Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;

use strict;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::DBSQL::BaseAdaptor;
44
45
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
46
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
47

48
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
49
50


51
=head2 fetch_by_analysis_id
52

53
54
  Arg [1]    : int $id
               the unique database identifier for the feature to be obtained
55
  Example    : $feat = $adaptor->fetch_by_analysis_id(1234);
56
57
58
59
60
  Description: Returns the feature created from the database defined by the
               the id $id.
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats
  Exceptions : thrown if $id is not defined
  Caller     : general
61

62
63
=cut

64
sub fetch_by_analysis_id {
65
66
67
  my ($self,$id) = @_;

  unless(defined $id) {
68
    throw("fetch_by_analysis_id must have an id");
69
70
71
72
73
74
75
76
77
78
  }

  my $constraint = "ast.analysis_id = $id";

  #return first element of _generic_fetch list
  my ($obj) = @{$self->_generic_fetch($constraint)};
  unless(defined($obj)) {
    $self->_create_new_for_analysis_id($id);
    ($obj) = @{$self->_generic_fetch($constraint)};  
  }
79
80
81
82
83

  if(!defined($obj)) {
    throw("unable to fetch analysis_stats for analysis_id = $id\n");
  }
  
84
85
86
87
88
89
90
91
92
93
  return $obj;
}


sub fetch_all {
  my $self = shift;
  return $self->_generic_fetch();
}


94
sub fetch_by_needed_workers {
95
    my ($self, $limit, $rc_id) = @_;
96

Leo Gordon's avatar
Leo Gordon committed
97
98
    my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')"
                    .(defined($rc_id) ? " AND ast.rc_id = $rc_id" : '');
Leo Gordon's avatar
Leo Gordon committed
99

100
101
    my $final_clause = 'ORDER BY priority DESC, '
                        .( ($self->dbc->driver eq 'sqlite') ? 'RANDOM()' : 'RAND()' )
Leo Gordon's avatar
Leo Gordon committed
102
                        .($limit ? " LIMIT $limit" : '');
Leo Gordon's avatar
Leo Gordon committed
103

Leo Gordon's avatar
Leo Gordon committed
104
105
106
107
108
    $self->_final_clause($final_clause);
    my $results = $self->_generic_fetch($constraint);
    $self->_final_clause(''); # reset final clause for other fetches

    return $results;
109
110
}

111

Leo Gordon's avatar
Leo Gordon committed
112
113
sub fetch_by_statuses {
  my ($self, $statuses, $rc_id) = @_;
114

Leo Gordon's avatar
Leo Gordon committed
115
116
  my $constraint = 'ast.status in ('.join(', ', map { "'$_'" } @$statuses).')'
                   .(defined($rc_id) ? " AND ast.rc_id = $rc_id" : '');
117

Leo Gordon's avatar
Leo Gordon committed
118
  $self->_final_clause('ORDER BY last_update');
119
  my $results = $self->_generic_fetch($constraint);
Leo Gordon's avatar
Leo Gordon committed
120
  $self->_final_clause(''); #reset final clause for other fetches
121
122

  return $results;
123
124
}

125

126
127
128
129
130
131
132
133
134
=head2 refresh

  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Description: reload the AnalysisStats object from the database
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats object - same one with reloaded data

=cut


135
sub refresh {
136
    my ($self, $stats) = @_;
137

138
    my $new_stats = $self->fetch_by_analysis_id( $stats->analysis_id );     # fetch into a separate object
139

140
    %$stats = %$new_stats;                                                  # copy the data over
141

142
    return $stats;
143
144
145
}


146
################
147
148
149
150
151
152
#
# STORE / UPDATE METHODS
#
################

=head2 update
153

154
155
156
157
158
159
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Example    :
  Description:
  Returntype : Bio::EnsEMBL::Hive::Worker
  Exceptions :
  Caller     :
160

161
162
163
164
165
=cut

sub update {
  my ($self, $stats) = @_;

166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
  my $hive_capacity = $stats->hive_capacity;

  if ($stats->behaviour eq "DYNAMIC") {
    my $max_hive_capacity = $hive_capacity;
    if ($stats->avg_input_msec_per_job) {
      $max_hive_capacity = int($stats->input_capacity * $stats->avg_msec_per_job / $stats->avg_input_msec_per_job);
    }
    if ($stats->avg_output_msec_per_job) {
      my $max_hive_capacity2 = int($stats->output_capacity * $stats->avg_msec_per_job / $stats->avg_output_msec_per_job);
      if ($max_hive_capacity2 < $max_hive_capacity) {
        $max_hive_capacity = $max_hive_capacity2;
      }
    }
    if (($hive_capacity > $max_hive_capacity) or ($hive_capacity < $max_hive_capacity )) {
      if (abs($hive_capacity - $max_hive_capacity) > 2) {
        $stats->hive_capacity(($hive_capacity + $max_hive_capacity) / 2);
      } elsif ($hive_capacity > $max_hive_capacity) {
        $stats->hive_capacity($hive_capacity - 1);
      } elsif ($hive_capacity < $max_hive_capacity) {
        $stats->hive_capacity($hive_capacity + 1);
      }
    }
  }

190
191
  my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
  $sql .= ",batch_size=" . $stats->batch_size();
192
  $sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job();
193
194
195
  $sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job();
  $sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job();
  $sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job();
196
  $sql .= ",hive_capacity=" . $stats->hive_capacity();
197
198
  $sql .= ",max_retry_count=" . $stats->max_retry_count();
  $sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance();
199
200
201
202
203
204
205

  unless( $self->db->hive_use_triggers() ) {
      $sql .= ",total_job_count=" . $stats->total_job_count();
      $sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count();
      $sql .= ",done_job_count=" . $stats->done_job_count();
      $sql .= ",failed_job_count=" . $stats->failed_job_count();

206
207
208
      $stats->num_running_workers( $self->db->get_Queen->count_running_workers( $stats->analysis_id() ) );
      $sql .= ",num_running_workers=" . $stats->num_running_workers();
  }
209

210
  $sql .= ",num_required_workers=" . $stats->num_required_workers();
211
  $sql .= ",last_update=CURRENT_TIMESTAMP";
Leo Gordon's avatar
Leo Gordon committed
212
  $sql .= ",sync_lock='0'";
Leo Gordon's avatar
Leo Gordon committed
213
  $sql .= ",rc_id=". $stats->rc_id();
214
  $sql .= ",can_be_empty=". $stats->can_be_empty();
215
  $sql .= ",priority=". $stats->priority();
216
217
218
219
220
  $sql .= " WHERE analysis_id='".$stats->analysis_id."' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
221
  $sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT CURRENT_TIMESTAMP, analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id);
222
223
  $sth->execute();
  $sth->finish;
224
  $stats->seconds_since_last_update(0); #not exact but good enough :)
225

226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
}


sub update_status
{
  my ($self, $analysis_id, $status) = @_;

  my $sql = "UPDATE analysis_stats SET status='$status' ";
  $sql .= " WHERE analysis_id='$analysis_id' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}


242
243
244
245
246
=head2 interval_update_work_done

  Arg [1]     : int $analysis_id
  Arg [2]     : int $jobs_done_in_interval
  Arg [3]     : int $interval_msec
247
248
249
250
251
  Arg [4]     : int $fetching_msec
  Arg [5]     : int $running_msec
  Arg [6]     : int $writing_msec
  Arg [7]     : real $weight_factor [optional]
  Example     : $statsDBA->interval_update_work_done($analysis_id, $jobs_done, $interval_msec, $fetching_msec, $running_msec, $writing_msec);
252
253
254
255
256
257
258
  Description : does a database update to recalculate the avg_msec_per_job and done_job_count
                does an interval equation by multiplying out the previous done_job_count with the
                previous avg_msec_per_job and then expanding by new interval values to give a better average.
  Caller      : Bio::EnsEMBL::Hive::Worker

=cut

259
260
sub interval_update_work_done {
  my ($self, $analysis_id, $job_count, $interval_msec, $fetching_msec, $running_msec, $writing_msec, $weight_factor) = @_;
261
262

  $weight_factor ||= 3; # makes it more sensitive to the dynamics of the farm
263

264
265
266
267
268
269
270
271
272
273
  my $sql = $self->db->hive_use_triggers()
  ? qq{
    UPDATE analysis_stats SET
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count))
    WHERE analysis_id= $analysis_id
  }
  : qq{
274
275
276
277
278
    UPDATE analysis_stats SET
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count)), 
279
        unclaimed_job_count = unclaimed_job_count - $job_count, 
Leo Gordon's avatar
Leo Gordon committed
280
        done_job_count = done_job_count + $job_count 
281
282
    WHERE analysis_id= $analysis_id
  };
283

284
285
286
287
  $self->dbc->do($sql);
}


288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
sub increase_running_workers
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}


sub decrease_running_workers
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}

311
sub decrease_required_workers
312
313
314
315
{
  my $self = shift;
  my $analysis_id = shift;

316
317
  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers-1 ".
            "WHERE analysis_id='$analysis_id' ";
318

319
320
321
322
  $self->dbc->do($sql);
}


323
sub increase_required_workers
324
325
326
327
328
329
330
331
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers+1 ".
            "WHERE analysis_id='$analysis_id' ";

  $self->dbc->do($sql);
332
333
334
}


335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
#
# INTERNAL METHODS
#
###################

=head2 _generic_fetch

  Arg [1]    : (optional) string $constraint
               An SQL query constraint (i.e. part of the WHERE clause)
  Arg [2]    : (optional) string $logic_name
               the logic_name of the analysis of the features to obtain
  Example    : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
  Description: Performs a database fetch and returns feature objects in
               contig coordinates.
  Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
  Exceptions : none
  Caller     : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch

=cut
  
sub _generic_fetch {
  my ($self, $constraint, $join) = @_;
  
  my @tables = $self->_tables;
  my $columns = join(', ', $self->_columns());
  
  if ($join) {
    foreach my $single_join (@{$join}) {
      my ($tablename, $condition, $extra_columns) = @{$single_join};
      if ($tablename && $condition) {
        push @tables, $tablename;
        
        if($constraint) {
          $constraint .= " AND $condition";
        } else {
          $constraint = " $condition";
        }
      } 
      if ($extra_columns) {
        $columns .= ", " . join(', ', @{$extra_columns});
      }
    }
  }
      
  #construct a nice table string like 'table1 t1, table2 t2'
  my $tablenames = join(', ', map({ join(' ', @$_) } @tables));

  my $sql = "SELECT $columns FROM $tablenames";

  my $default_where = $self->_default_where_clause;
  my $final_clause = $self->_final_clause;

  #append a where clause if it was defined
  if($constraint) { 
    $sql .= " WHERE $constraint ";
    if($default_where) {
      $sql .= " AND $default_where ";
    }
  } elsif($default_where) {
    $sql .= " WHERE $default_where ";
  }

  #append additional clauses which may have been defined
  $sql .= " $final_clause";
  #rint STDOUT $sql,"\n";

  my $sth = $self->prepare($sql);
  $sth->execute;  


  return $self->_objs_from_sth($sth);
}

sub _tables {
  my $self = shift;

  return (['analysis_stats', 'ast']);
}

sub _columns {
  my $self = shift;
416

417
418
419
  my @columns = qw (ast.analysis_id
                    ast.status
                    ast.batch_size
420
                    ast.avg_msec_per_job
421
422
423
                    ast.avg_input_msec_per_job
                    ast.avg_run_msec_per_job
                    ast.avg_output_msec_per_job
424
                    ast.hive_capacity
425
426
427
                    ast.behaviour
                    ast.input_capacity
                    ast.output_capacity
428
429
430
                    ast.total_job_count
                    ast.unclaimed_job_count
                    ast.done_job_count
431
                    ast.max_retry_count
432
                    ast.failed_job_count
433
                    ast.failed_job_tolerance
434
                    ast.num_running_workers
435
436
                    ast.num_required_workers
                    ast.last_update
437
                    ast.sync_lock
Leo Gordon's avatar
Leo Gordon committed
438
                    ast.rc_id
439
                    ast.can_be_empty
440
                    ast.priority
441
                   );
442
443
444
445

  push @columns , ($self->dbc->driver eq 'sqlite')
                    ? "strftime('%s','now')-strftime('%s',ast.last_update) seconds_since_last_update "
                    : "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
446
447
448
449
450
451
452
453
454
455
456
457
458
459
  return @columns;            
}

sub _objs_from_sth {
  my ($self, $sth) = @_;
  
  my %column;
  $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));

  my @statsArray = ();

  while ($sth->fetch()) {
    my $analStats = new Bio::EnsEMBL::Hive::AnalysisStats;

460
    $analStats->analysis_id($column{'analysis_id'});
461
    $analStats->status($column{'status'});
462
    $analStats->sync_lock($column{'sync_lock'});
Leo Gordon's avatar
Leo Gordon committed
463
    $analStats->rc_id($column{'rc_id'});
464
    $analStats->can_be_empty($column{'can_be_empty'});
465
    $analStats->priority($column{'priority'});
466
    $analStats->batch_size($column{'batch_size'});
467
    $analStats->avg_msec_per_job($column{'avg_msec_per_job'});
468
469
470
    $analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
    $analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'});
    $analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'});
471
    $analStats->hive_capacity($column{'hive_capacity'});
472
473
474
    $analStats->behaviour($column{'behaviour'});
    $analStats->input_capacity($column{'input_capacity'});
    $analStats->output_capacity($column{'output_capacity'});
475
476
477
    $analStats->total_job_count($column{'total_job_count'});
    $analStats->unclaimed_job_count($column{'unclaimed_job_count'});
    $analStats->done_job_count($column{'done_job_count'});
478
    $analStats->max_retry_count($column{'max_retry_count'});
479
    $analStats->failed_job_count($column{'failed_job_count'});
480
    $analStats->failed_job_tolerance($column{'failed_job_tolerance'});
481
    $analStats->num_running_workers($column{'num_running_workers'});
482
483
484
485
    $analStats->num_required_workers($column{'num_required_workers'});
    $analStats->seconds_since_last_update($column{'seconds_since_last_update'});
    $analStats->adaptor($self);

486
    push @statsArray, $analStats;
487
488
  }
  $sth->finish;
489

490
491
492
493
494
495
496
497
498
499
  return \@statsArray
}

sub _default_where_clause {
  my $self = shift;
  return '';
}

sub _final_clause {
  my $self = shift;
500
501
502
  $self->{'_final_clause'} = shift if(@_);
  $self->{'_final_clause'} = "" unless($self->{'_final_clause'});
  return $self->{'_final_clause'};
503
504
505
}


506
507
508
509
510
sub _create_new_for_analysis_id {
  my ($self, $analysis_id) = @_;

  my $sql;

511
512
513
  my $insertion_method = ($self->dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';

  $sql = "$insertion_method INTO analysis_stats (analysis_id) VALUES ($analysis_id)";
514
  #print("$sql\n");
515
516
517
518
519
520
521
  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}

1;