AnalysisStatsAdaptor.pm 15.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
#
# Date of creation: 22.03.2004
# Original Creator : Jessica Severin <jessica@ebi.ac.uk>
#
# Copyright EMBL-EBI 2004
#
# You may distribute this module under the same terms as perl itself

10
=pod
11
12

=head1 NAME
13

14
  Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
15
16

=head1 SYNOPSIS
17

18
19
20
21
  $analysisStatsAdaptor = $db_adaptor->get_AnalysisStatsAdaptor;
  $analysisStatsAdaptor = $analysisStats->adaptor;

=head1 DESCRIPTION
22

23
24
25
26
  Module to encapsulate all db access for persistent class AnalysisStats.
  There should be just one per application and database connection.

=head1 CONTACT
27
28

  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
29
30

=head1 APPENDIX
31

32
33
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
34
35
36
37
38
39
40
41
42
43

=cut



package Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;

use strict;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::DBSQL::BaseAdaptor;
44
45
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
46
use Bio::EnsEMBL::Hive::Utils::Stopwatch;
47

48
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
49
50


51
=head2 fetch_by_analysis_id
52

53
54
  Arg [1]    : int $id
               the unique database identifier for the feature to be obtained
55
  Example    : $feat = $adaptor->fetch_by_analysis_id(1234);
56
57
58
59
60
  Description: Returns the feature created from the database defined by the
               the id $id.
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats
  Exceptions : thrown if $id is not defined
  Caller     : general
61

62
63
=cut

64
sub fetch_by_analysis_id {
65
66
67
  my ($self,$id) = @_;

  unless(defined $id) {
68
    throw("fetch_by_analysis_id must have an id");
69
70
71
72
73
74
75
76
77
78
  }

  my $constraint = "ast.analysis_id = $id";

  #return first element of _generic_fetch list
  my ($obj) = @{$self->_generic_fetch($constraint)};
  unless(defined($obj)) {
    $self->_create_new_for_analysis_id($id);
    ($obj) = @{$self->_generic_fetch($constraint)};  
  }
79
80
81
82
83

  if(!defined($obj)) {
    throw("unable to fetch analysis_stats for analysis_id = $id\n");
  }
  
84
85
86
87
88
89
90
91
92
93
  return $obj;
}


sub fetch_all {
  my $self = shift;
  return $self->_generic_fetch();
}


94
sub fetch_by_needed_workers {
Leo Gordon's avatar
Leo Gordon committed
95
    my ($self, $limit, $maximise_concurrency, $rc_id) = @_;
96

Leo Gordon's avatar
Leo Gordon committed
97
98
    my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')"
                    .(defined($rc_id) ? " AND ast.rc_id = $rc_id" : '');
Leo Gordon's avatar
Leo Gordon committed
99

Leo Gordon's avatar
Leo Gordon committed
100
101
102
103
    my $final_clause = 'ORDER BY num_running_workers'
                        .($maximise_concurrency ? '' : ' DESC')
                        .', hive_capacity DESC, analysis_id'
                        .($limit ? " LIMIT $limit" : '');
Leo Gordon's avatar
Leo Gordon committed
104

Leo Gordon's avatar
Leo Gordon committed
105
106
107
108
109
    $self->_final_clause($final_clause);
    my $results = $self->_generic_fetch($constraint);
    $self->_final_clause(''); # reset final clause for other fetches

    return $results;
110
111
}

112

Leo Gordon's avatar
Leo Gordon committed
113
114
sub fetch_by_statuses {
  my ($self, $statuses, $rc_id) = @_;
115

Leo Gordon's avatar
Leo Gordon committed
116
117
  my $constraint = 'ast.status in ('.join(', ', map { "'$_'" } @$statuses).')'
                   .(defined($rc_id) ? " AND ast.rc_id = $rc_id" : '');
118

Leo Gordon's avatar
Leo Gordon committed
119
  $self->_final_clause('ORDER BY last_update');
120
  my $results = $self->_generic_fetch($constraint);
Leo Gordon's avatar
Leo Gordon committed
121
  $self->_final_clause(''); #reset final clause for other fetches
122
123

  return $results;
124
125
}

126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151

sub refresh {
  my ($self, $stats) = @_;

  my $constraint = "ast.analysis_id = " . $stats->analysis_id;

  #return first element of _generic_fetch list
  $stats = @{$self->_generic_fetch($constraint)};

  return $stats;
}


sub get_running_worker_count {
  my ($self, $stats) = @_;

  my $sql = "SELECT count(*) FROM hive WHERE cause_of_death='' and analysis_id=?";
  my $sth = $self->prepare($sql);
  $sth->execute($stats->analysis_id);
  my ($liveCount) = $sth->fetchrow_array();
  $sth->finish;

  return $liveCount;
}


152
153
154
155
156
157
#
# STORE / UPDATE METHODS
#
################

=head2 update
158

159
160
161
162
163
164
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Example    :
  Description:
  Returntype : Bio::EnsEMBL::Hive::Worker
  Exceptions :
  Caller     :
165

166
167
168
169
170
=cut

sub update {
  my ($self, $stats) = @_;

171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
  my $running_worker_count = $self->get_running_worker_count($stats);
  $stats->num_running_workers($running_worker_count);
  my $hive_capacity = $stats->hive_capacity;

  if ($stats->behaviour eq "DYNAMIC") {
    my $max_hive_capacity = $hive_capacity;
    if ($stats->avg_input_msec_per_job) {
      $max_hive_capacity = int($stats->input_capacity * $stats->avg_msec_per_job / $stats->avg_input_msec_per_job);
    }
    if ($stats->avg_output_msec_per_job) {
      my $max_hive_capacity2 = int($stats->output_capacity * $stats->avg_msec_per_job / $stats->avg_output_msec_per_job);
      if ($max_hive_capacity2 < $max_hive_capacity) {
        $max_hive_capacity = $max_hive_capacity2;
      }
    }
    if (($hive_capacity > $max_hive_capacity) or ($hive_capacity < $max_hive_capacity )) {
      if (abs($hive_capacity - $max_hive_capacity) > 2) {
        $stats->hive_capacity(($hive_capacity + $max_hive_capacity) / 2);
      } elsif ($hive_capacity > $max_hive_capacity) {
        $stats->hive_capacity($hive_capacity - 1);
      } elsif ($hive_capacity < $max_hive_capacity) {
        $stats->hive_capacity($hive_capacity + 1);
      }
    }
  }

197
198
  my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
  $sql .= ",batch_size=" . $stats->batch_size();
199
  $sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job();
200
201
202
  $sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job();
  $sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job();
  $sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job();
203
204
205
206
  $sql .= ",hive_capacity=" . $stats->hive_capacity();
  $sql .= ",total_job_count=" . $stats->total_job_count();
  $sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count();
  $sql .= ",done_job_count=" . $stats->done_job_count();
207
  $sql .= ",max_retry_count=" . $stats->max_retry_count();
208
  $sql .= ",failed_job_count=" . $stats->failed_job_count();
209
  $sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance();
210
  $sql .= ",num_running_workers=" . $stats->num_running_workers();
211
212
  $sql .= ",num_required_workers=" . $stats->num_required_workers();
  $sql .= ",last_update=NOW()";
Leo Gordon's avatar
Leo Gordon committed
213
  $sql .= ",sync_lock='0'";
Leo Gordon's avatar
Leo Gordon committed
214
  $sql .= ",rc_id=". $stats->rc_id();
215
216
217
218
219
  $sql .= " WHERE analysis_id='".$stats->analysis_id."' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
220
221
222
  $sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT now(), analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id);
  $sth->execute();
  $sth->finish;
223
  $stats->seconds_since_last_update(0); #not exact but good enough :)
224

225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
}


sub update_status
{
  my ($self, $analysis_id, $status) = @_;

  my $sql = "UPDATE analysis_stats SET status='$status' ";
  $sql .= " WHERE analysis_id='$analysis_id' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}


241
242
243
244
245
=head2 interval_update_work_done

  Arg [1]     : int $analysis_id
  Arg [2]     : int $jobs_done_in_interval
  Arg [3]     : int $interval_msec
246
247
248
249
250
  Arg [4]     : int $fetching_msec
  Arg [5]     : int $running_msec
  Arg [6]     : int $writing_msec
  Arg [7]     : real $weight_factor [optional]
  Example     : $statsDBA->interval_update_work_done($analysis_id, $jobs_done, $interval_msec, $fetching_msec, $running_msec, $writing_msec);
251
252
253
254
255
256
257
  Description : does a database update to recalculate the avg_msec_per_job and done_job_count
                does an interval equation by multiplying out the previous done_job_count with the
                previous avg_msec_per_job and then expanding by new interval values to give a better average.
  Caller      : Bio::EnsEMBL::Hive::Worker

=cut

258
259
sub interval_update_work_done {
  my ($self, $analysis_id, $job_count, $interval_msec, $fetching_msec, $running_msec, $writing_msec, $weight_factor) = @_;
260
261

  $weight_factor ||= 3; # makes it more sensitive to the dynamics of the farm
262

263
264
265
266
267
268
269
  my $sql = qq{
    UPDATE analysis_stats SET
        unclaimed_job_count = unclaimed_job_count - $job_count, 
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count)), 
Leo Gordon's avatar
Leo Gordon committed
270
        done_job_count = done_job_count + $job_count 
271
272
    WHERE analysis_id= $analysis_id
  };
273

274
275
276
277
  $self->dbc->do($sql);
}


278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329


sub decrease_hive_capacity
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats ".
      " SET hive_capacity = hive_capacity - 1, ".
      " num_required_workers = IF(num_required_workers > 0, num_required_workers - 1, 0) ".
      " WHERE analysis_id='$analysis_id' and hive_capacity > 1";

  $self->dbc->do($sql);
}


sub increase_hive_capacity
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats ".
      " SET hive_capacity = hive_capacity + 1, num_required_workers = 1".
      " WHERE analysis_id='$analysis_id' and hive_capacity <= 500 and num_required_workers = 0";

  $self->dbc->do($sql);
}


sub increase_running_workers
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}


sub decrease_running_workers
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}

330
331
332
333
334
335
336
337
338
339
sub decrease_running_workers_on_hive_overload {
    my $self        = shift;
    my $analysis_id = shift;

    my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
              "WHERE num_running_workers > hive_capacity AND analysis_id = $analysis_id ";

    my $row_count = $self->dbc->do($sql);
    return $row_count;
}
340
341

sub decrease_needed_workers
342
343
344
345
{
  my $self = shift;
  my $analysis_id = shift;

346
347
  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers-1 ".
            "WHERE analysis_id='$analysis_id' ";
348

349
350
351
352
  $self->dbc->do($sql);
}


353
sub increase_needed_workers
354
355
356
357
358
359
360
361
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers+1 ".
            "WHERE analysis_id='$analysis_id' ";

  $self->dbc->do($sql);
362
363
364
}


365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
#
# INTERNAL METHODS
#
###################

=head2 _generic_fetch

  Arg [1]    : (optional) string $constraint
               An SQL query constraint (i.e. part of the WHERE clause)
  Arg [2]    : (optional) string $logic_name
               the logic_name of the analysis of the features to obtain
  Example    : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
  Description: Performs a database fetch and returns feature objects in
               contig coordinates.
  Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
  Exceptions : none
  Caller     : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch

=cut
  
sub _generic_fetch {
  my ($self, $constraint, $join) = @_;
  
  my @tables = $self->_tables;
  my $columns = join(', ', $self->_columns());
  
  if ($join) {
    foreach my $single_join (@{$join}) {
      my ($tablename, $condition, $extra_columns) = @{$single_join};
      if ($tablename && $condition) {
        push @tables, $tablename;
        
        if($constraint) {
          $constraint .= " AND $condition";
        } else {
          $constraint = " $condition";
        }
      } 
      if ($extra_columns) {
        $columns .= ", " . join(', ', @{$extra_columns});
      }
    }
  }
      
  #construct a nice table string like 'table1 t1, table2 t2'
  my $tablenames = join(', ', map({ join(' ', @$_) } @tables));

  my $sql = "SELECT $columns FROM $tablenames";

  my $default_where = $self->_default_where_clause;
  my $final_clause = $self->_final_clause;

  #append a where clause if it was defined
  if($constraint) { 
    $sql .= " WHERE $constraint ";
    if($default_where) {
      $sql .= " AND $default_where ";
    }
  } elsif($default_where) {
    $sql .= " WHERE $default_where ";
  }

  #append additional clauses which may have been defined
  $sql .= " $final_clause";
  #rint STDOUT $sql,"\n";

  my $sth = $self->prepare($sql);
  $sth->execute;  


  return $self->_objs_from_sth($sth);
}

sub _tables {
  my $self = shift;

  return (['analysis_stats', 'ast']);
}

sub _columns {
  my $self = shift;
446

447
448
449
  my @columns = qw (ast.analysis_id
                    ast.status
                    ast.batch_size
450
                    ast.avg_msec_per_job
451
452
453
                    ast.avg_input_msec_per_job
                    ast.avg_run_msec_per_job
                    ast.avg_output_msec_per_job
454
                    ast.hive_capacity
455
456
457
                    ast.behaviour
                    ast.input_capacity
                    ast.output_capacity
458
459
460
                    ast.total_job_count
                    ast.unclaimed_job_count
                    ast.done_job_count
461
                    ast.max_retry_count
462
                    ast.failed_job_count
463
                    ast.failed_job_tolerance
464
                    ast.num_running_workers
465
466
                    ast.num_required_workers
                    ast.last_update
467
                    ast.sync_lock
Leo Gordon's avatar
Leo Gordon committed
468
                    ast.rc_id
469
                   );
470
  push @columns , "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
471
472
473
474
475
476
477
478
479
480
481
482
483
484
  return @columns;            
}

sub _objs_from_sth {
  my ($self, $sth) = @_;
  
  my %column;
  $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));

  my @statsArray = ();

  while ($sth->fetch()) {
    my $analStats = new Bio::EnsEMBL::Hive::AnalysisStats;

485
    $analStats->analysis_id($column{'analysis_id'});
486
    $analStats->status($column{'status'});
487
    $analStats->sync_lock($column{'sync_lock'});
Leo Gordon's avatar
Leo Gordon committed
488
    $analStats->rc_id($column{'rc_id'});
489
    $analStats->batch_size($column{'batch_size'});
490
    $analStats->avg_msec_per_job($column{'avg_msec_per_job'});
491
492
493
    $analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
    $analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'});
    $analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'});
494
    $analStats->hive_capacity($column{'hive_capacity'});
495
496
497
    $analStats->behaviour($column{'behaviour'});
    $analStats->input_capacity($column{'input_capacity'});
    $analStats->output_capacity($column{'output_capacity'});
498
499
500
    $analStats->total_job_count($column{'total_job_count'});
    $analStats->unclaimed_job_count($column{'unclaimed_job_count'});
    $analStats->done_job_count($column{'done_job_count'});
501
    $analStats->max_retry_count($column{'max_retry_count'});
502
    $analStats->failed_job_count($column{'failed_job_count'});
503
    $analStats->failed_job_tolerance($column{'failed_job_tolerance'});
504
    $analStats->num_running_workers($column{'num_running_workers'});
505
506
507
508
    $analStats->num_required_workers($column{'num_required_workers'});
    $analStats->seconds_since_last_update($column{'seconds_since_last_update'});
    $analStats->adaptor($self);

509
    push @statsArray, $analStats;
510
511
  }
  $sth->finish;
512

513
514
515
516
517
518
519
520
521
522
  return \@statsArray
}

sub _default_where_clause {
  my $self = shift;
  return '';
}

sub _final_clause {
  my $self = shift;
523
524
525
  $self->{'_final_clause'} = shift if(@_);
  $self->{'_final_clause'} = "" unless($self->{'_final_clause'});
  return $self->{'_final_clause'};
526
527
528
}


529
530
531
532
533
sub _create_new_for_analysis_id {
  my ($self, $analysis_id) = @_;

  my $sql;

534
  $sql = "INSERT ignore INTO analysis_stats (analysis_id) VALUES ($analysis_id)";
535
  #print("$sql\n");
536
537
538
539
540
541
542
  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}

1;