AnalysisStatsAdaptor.pm 16 KB
Newer Older
1
2
3
4
5
6
7
8
9
# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
#
# Date of creation: 22.03.2004
# Original Creator : Jessica Severin <jessica@ebi.ac.uk>
#
# Copyright EMBL-EBI 2004
#
# You may distribute this module under the same terms as perl itself

10
=pod
11
12

=head1 NAME
13

14
  Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
15
16

=head1 SYNOPSIS
17

18
19
20
21
  $analysisStatsAdaptor = $db_adaptor->get_AnalysisStatsAdaptor;
  $analysisStatsAdaptor = $analysisStats->adaptor;

=head1 DESCRIPTION
22

23
24
25
26
  Module to encapsulate all db access for persistent class AnalysisStats.
  There should be just one per application and database connection.

=head1 CONTACT
27
28

  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
29
30

=head1 APPENDIX
31

32
33
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
34
35
36
37
38
39
40
41
42

=cut



package Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;

use strict;
use Bio::EnsEMBL::Hive::AnalysisStats;
43
44
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
45

46
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
47
48


49
50
51
52
53
54
55
56
57
58
59
sub create_new_for_analysis_id_resource_class_id {
    my ($self, $analysis_id, $resource_class_id) = @_;

    my $insertion_method = ($self->dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
    my $sql = "$insertion_method INTO analysis_stats (analysis_id, resource_class_id) VALUES ($analysis_id, $resource_class_id)";
    my $sth = $self->prepare($sql);
    $sth->execute();
    $sth->finish;
}


60
=head2 fetch_by_analysis_id
61

62
63
  Arg [1]    : int $id
               the unique database identifier for the feature to be obtained
64
  Example    : $feat = $adaptor->fetch_by_analysis_id(1234);
65
66
67
68
69
  Description: Returns the feature created from the database defined by the
               the id $id.
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats
  Exceptions : thrown if $id is not defined
  Caller     : general
70

71
72
=cut

73
sub fetch_by_analysis_id {
74
75
76
  my ($self,$id) = @_;

  unless(defined $id) {
77
    throw("fetch_by_analysis_id must have an id");
78
79
80
81
82
83
  }

  my $constraint = "ast.analysis_id = $id";

  #return first element of _generic_fetch list
  my ($obj) = @{$self->_generic_fetch($constraint)};
84
85
86
87

  if(!defined($obj)) {
    throw("unable to fetch analysis_stats for analysis_id = $id\n");
  }
88
89
90
91
92
93
94
95
96
97
  return $obj;
}


sub fetch_all {
  my $self = shift;
  return $self->_generic_fetch();
}


98
sub fetch_by_needed_workers {
99
    my ($self, $limit, $resource_class_id) = @_;
100

Leo Gordon's avatar
Leo Gordon committed
101
    my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')"
102
                    .(defined($resource_class_id) ? " AND ast.resource_class_id = $resource_class_id" : '');
Leo Gordon's avatar
Leo Gordon committed
103

104
105
    my $final_clause = 'ORDER BY priority DESC, '
                        .( ($self->dbc->driver eq 'sqlite') ? 'RANDOM()' : 'RAND()' )
Leo Gordon's avatar
Leo Gordon committed
106
                        .($limit ? " LIMIT $limit" : '');
Leo Gordon's avatar
Leo Gordon committed
107

Leo Gordon's avatar
Leo Gordon committed
108
109
110
111
112
    $self->_final_clause($final_clause);
    my $results = $self->_generic_fetch($constraint);
    $self->_final_clause(''); # reset final clause for other fetches

    return $results;
113
114
}

115

Leo Gordon's avatar
Leo Gordon committed
116
sub fetch_by_statuses {
117
  my ($self, $statuses, $resource_class_id) = @_;
118

Leo Gordon's avatar
Leo Gordon committed
119
  my $constraint = 'ast.status in ('.join(', ', map { "'$_'" } @$statuses).')'
120
                   .(defined($resource_class_id) ? " AND ast.resource_class_id = $resource_class_id" : '');
121

Leo Gordon's avatar
Leo Gordon committed
122
  $self->_final_clause('ORDER BY last_update');
123
  my $results = $self->_generic_fetch($constraint);
Leo Gordon's avatar
Leo Gordon committed
124
  $self->_final_clause(''); #reset final clause for other fetches
125
126

  return $results;
127
128
}

129

130
131
132
133
134
135
136
137
138
=head2 refresh

  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Description: reload the AnalysisStats object from the database
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats object - same one with reloaded data

=cut


139
sub refresh {
140
    my ($self, $stats) = @_;
141

142
    my $new_stats = $self->fetch_by_analysis_id( $stats->analysis_id );     # fetch into a separate object
143

144
    %$stats = %$new_stats;                                                  # copy the data over
145

146
    return $stats;
147
148
149
}


150
################
151
152
153
154
155
156
#
# STORE / UPDATE METHODS
#
################

=head2 update
157

158
159
160
161
162
163
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Example    :
  Description:
  Returntype : Bio::EnsEMBL::Hive::Worker
  Exceptions :
  Caller     :
164

165
166
167
168
169
=cut

sub update {
  my ($self, $stats) = @_;

170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
  my $hive_capacity = $stats->hive_capacity;

  if ($stats->behaviour eq "DYNAMIC") {
    my $max_hive_capacity = $hive_capacity;
    if ($stats->avg_input_msec_per_job) {
      $max_hive_capacity = int($stats->input_capacity * $stats->avg_msec_per_job / $stats->avg_input_msec_per_job);
    }
    if ($stats->avg_output_msec_per_job) {
      my $max_hive_capacity2 = int($stats->output_capacity * $stats->avg_msec_per_job / $stats->avg_output_msec_per_job);
      if ($max_hive_capacity2 < $max_hive_capacity) {
        $max_hive_capacity = $max_hive_capacity2;
      }
    }
    if (($hive_capacity > $max_hive_capacity) or ($hive_capacity < $max_hive_capacity )) {
      if (abs($hive_capacity - $max_hive_capacity) > 2) {
        $stats->hive_capacity(($hive_capacity + $max_hive_capacity) / 2);
      } elsif ($hive_capacity > $max_hive_capacity) {
        $stats->hive_capacity($hive_capacity - 1);
      } elsif ($hive_capacity < $max_hive_capacity) {
        $stats->hive_capacity($hive_capacity + 1);
      }
    }
  }

194
195
  my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
  $sql .= ",batch_size=" . $stats->batch_size();
196
  $sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job();
197
198
199
  $sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job();
  $sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job();
  $sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job();
200
  $sql .= ",hive_capacity=" . $stats->hive_capacity();
201
202
  $sql .= ",max_retry_count=" . $stats->max_retry_count();
  $sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance();
203
204
205
206
207
208
209

  unless( $self->db->hive_use_triggers() ) {
      $sql .= ",total_job_count=" . $stats->total_job_count();
      $sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count();
      $sql .= ",done_job_count=" . $stats->done_job_count();
      $sql .= ",failed_job_count=" . $stats->failed_job_count();

210
211
212
      $stats->num_running_workers( $self->db->get_Queen->count_running_workers( $stats->analysis_id() ) );
      $sql .= ",num_running_workers=" . $stats->num_running_workers();
  }
213

214
  $sql .= ",num_required_workers=" . $stats->num_required_workers();
215
  $sql .= ",last_update=CURRENT_TIMESTAMP";
Leo Gordon's avatar
Leo Gordon committed
216
  $sql .= ",sync_lock='0'";
217
  $sql .= ",resource_class_id=". $stats->resource_class_id();
218
  $sql .= ",can_be_empty=". $stats->can_be_empty();
219
  $sql .= ",priority=". $stats->priority();
220
221
222
223
224
  $sql .= " WHERE analysis_id='".$stats->analysis_id."' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
225
  $sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT CURRENT_TIMESTAMP, analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id);
226
227
  $sth->execute();
  $sth->finish;
228
  $stats->seconds_since_last_update(0); #not exact but good enough :)
229

230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
}


sub update_status
{
  my ($self, $analysis_id, $status) = @_;

  my $sql = "UPDATE analysis_stats SET status='$status' ";
  $sql .= " WHERE analysis_id='$analysis_id' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}


246
247
248
249
250
=head2 interval_update_work_done

  Arg [1]     : int $analysis_id
  Arg [2]     : int $jobs_done_in_interval
  Arg [3]     : int $interval_msec
251
252
253
254
255
  Arg [4]     : int $fetching_msec
  Arg [5]     : int $running_msec
  Arg [6]     : int $writing_msec
  Arg [7]     : real $weight_factor [optional]
  Example     : $statsDBA->interval_update_work_done($analysis_id, $jobs_done, $interval_msec, $fetching_msec, $running_msec, $writing_msec);
256
257
258
259
260
261
262
  Description : does a database update to recalculate the avg_msec_per_job and done_job_count
                does an interval equation by multiplying out the previous done_job_count with the
                previous avg_msec_per_job and then expanding by new interval values to give a better average.
  Caller      : Bio::EnsEMBL::Hive::Worker

=cut

263
264
sub interval_update_work_done {
  my ($self, $analysis_id, $job_count, $interval_msec, $fetching_msec, $running_msec, $writing_msec, $weight_factor) = @_;
265
266

  $weight_factor ||= 3; # makes it more sensitive to the dynamics of the farm
267

268
269
270
271
272
273
274
275
276
277
  my $sql = $self->db->hive_use_triggers()
  ? qq{
    UPDATE analysis_stats SET
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count))
    WHERE analysis_id= $analysis_id
  }
  : qq{
278
279
280
281
282
    UPDATE analysis_stats SET
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count)), 
283
        unclaimed_job_count = unclaimed_job_count - $job_count, 
Leo Gordon's avatar
Leo Gordon committed
284
        done_job_count = done_job_count + $job_count 
285
286
    WHERE analysis_id= $analysis_id
  };
287

288
289
290
291
  $self->dbc->do($sql);
}


292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
sub increase_running_workers
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}


sub decrease_running_workers
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}

315
sub decrease_required_workers
316
317
318
319
{
  my $self = shift;
  my $analysis_id = shift;

320
321
  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers-1 ".
            "WHERE analysis_id='$analysis_id' ";
322

323
324
325
326
  $self->dbc->do($sql);
}


327
sub increase_required_workers
328
329
330
331
332
333
334
335
{
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers+1 ".
            "WHERE analysis_id='$analysis_id' ";

  $self->dbc->do($sql);
336
337
338
}


339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
#
# INTERNAL METHODS
#
###################

=head2 _generic_fetch

  Arg [1]    : (optional) string $constraint
               An SQL query constraint (i.e. part of the WHERE clause)
  Arg [2]    : (optional) string $logic_name
               the logic_name of the analysis of the features to obtain
  Example    : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
  Description: Performs a database fetch and returns feature objects in
               contig coordinates.
  Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
  Exceptions : none
  Caller     : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch

=cut
  
sub _generic_fetch {
  my ($self, $constraint, $join) = @_;
  
  my @tables = $self->_tables;
  my $columns = join(', ', $self->_columns());
  
  if ($join) {
    foreach my $single_join (@{$join}) {
      my ($tablename, $condition, $extra_columns) = @{$single_join};
      if ($tablename && $condition) {
        push @tables, $tablename;
        
        if($constraint) {
          $constraint .= " AND $condition";
        } else {
          $constraint = " $condition";
        }
      } 
      if ($extra_columns) {
        $columns .= ", " . join(', ', @{$extra_columns});
      }
    }
  }
      
  #construct a nice table string like 'table1 t1, table2 t2'
  my $tablenames = join(', ', map({ join(' ', @$_) } @tables));

  my $sql = "SELECT $columns FROM $tablenames";

  my $default_where = $self->_default_where_clause;
  my $final_clause = $self->_final_clause;

  #append a where clause if it was defined
  if($constraint) { 
    $sql .= " WHERE $constraint ";
    if($default_where) {
      $sql .= " AND $default_where ";
    }
  } elsif($default_where) {
    $sql .= " WHERE $default_where ";
  }

  #append additional clauses which may have been defined
  $sql .= " $final_clause";
  #rint STDOUT $sql,"\n";

  my $sth = $self->prepare($sql);
  $sth->execute;  


  return $self->_objs_from_sth($sth);
}

sub _tables {
  my $self = shift;

  return (['analysis_stats', 'ast']);
}

sub _columns {
  my $self = shift;
420

421
422
423
  my @columns = qw (ast.analysis_id
                    ast.status
                    ast.batch_size
424
                    ast.avg_msec_per_job
425
426
427
                    ast.avg_input_msec_per_job
                    ast.avg_run_msec_per_job
                    ast.avg_output_msec_per_job
428
                    ast.hive_capacity
429
430
431
                    ast.behaviour
                    ast.input_capacity
                    ast.output_capacity
432
433
434
                    ast.total_job_count
                    ast.unclaimed_job_count
                    ast.done_job_count
435
                    ast.max_retry_count
436
                    ast.failed_job_count
437
                    ast.failed_job_tolerance
438
                    ast.num_running_workers
439
440
                    ast.num_required_workers
                    ast.last_update
441
                    ast.sync_lock
442
                    ast.resource_class_id
443
                    ast.can_be_empty
444
                    ast.priority
445
                   );
446
447
448
449

  push @columns , ($self->dbc->driver eq 'sqlite')
                    ? "strftime('%s','now')-strftime('%s',ast.last_update) seconds_since_last_update "
                    : "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
450
451
452
453
454
455
456
457
458
459
460
461
  return @columns;            
}

sub _objs_from_sth {
  my ($self, $sth) = @_;
  
  my %column;
  $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));

  my @statsArray = ();

  while ($sth->fetch()) {
462
    my $analStats = Bio::EnsEMBL::Hive::AnalysisStats->new();
463

464
    $analStats->analysis_id($column{'analysis_id'});
465
    $analStats->status($column{'status'});
466
    $analStats->sync_lock($column{'sync_lock'});
467
    $analStats->resource_class_id($column{'resource_class_id'});
468
    $analStats->can_be_empty($column{'can_be_empty'});
469
    $analStats->priority($column{'priority'});
470
    $analStats->batch_size($column{'batch_size'});
471
    $analStats->avg_msec_per_job($column{'avg_msec_per_job'});
472
473
474
    $analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
    $analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'});
    $analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'});
475
    $analStats->hive_capacity($column{'hive_capacity'});
476
477
478
    $analStats->behaviour($column{'behaviour'});
    $analStats->input_capacity($column{'input_capacity'});
    $analStats->output_capacity($column{'output_capacity'});
479
480
481
    $analStats->total_job_count($column{'total_job_count'});
    $analStats->unclaimed_job_count($column{'unclaimed_job_count'});
    $analStats->done_job_count($column{'done_job_count'});
482
    $analStats->max_retry_count($column{'max_retry_count'});
483
    $analStats->failed_job_count($column{'failed_job_count'});
484
    $analStats->failed_job_tolerance($column{'failed_job_tolerance'});
485
    $analStats->num_running_workers($column{'num_running_workers'});
486
487
488
489
    $analStats->num_required_workers($column{'num_required_workers'});
    $analStats->seconds_since_last_update($column{'seconds_since_last_update'});
    $analStats->adaptor($self);

490
    push @statsArray, $analStats;
491
492
  }
  $sth->finish;
493

494
495
496
497
498
499
500
501
502
503
  return \@statsArray
}

sub _default_where_clause {
  my $self = shift;
  return '';
}

sub _final_clause {
  my $self = shift;
504
505
506
  $self->{'_final_clause'} = shift if(@_);
  $self->{'_final_clause'} = "" unless($self->{'_final_clause'});
  return $self->{'_final_clause'};
507
508
509
}


510
511
1;