AnalysisStatsAdaptor.pm 14.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
# Perl module for Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
#
# Date of creation: 22.03.2004
# Original Creator : Jessica Severin <jessica@ebi.ac.uk>
#
# Copyright EMBL-EBI 2004
#
# You may distribute this module under the same terms as perl itself

10
=pod
11
12

=head1 NAME
13

14
  Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor
15
16

=head1 SYNOPSIS
17

18
19
20
21
  $analysisStatsAdaptor = $db_adaptor->get_AnalysisStatsAdaptor;
  $analysisStatsAdaptor = $analysisStats->adaptor;

=head1 DESCRIPTION
22

23
24
25
26
  Module to encapsulate all db access for persistent class AnalysisStats.
  There should be just one per application and database connection.

=head1 CONTACT
27
28

  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
29
30

=head1 APPENDIX
31

32
33
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
34
35
36
37
38
39
40
41

=cut



package Bio::EnsEMBL::Hive::DBSQL::AnalysisStatsAdaptor;

use strict;
42

43
use Bio::EnsEMBL::Utils::Argument;
44
45
use Bio::EnsEMBL::Utils::Exception ('throw');
use Bio::EnsEMBL::Hive::AnalysisStats;
46

47
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
48
49


50
=head2 fetch_by_analysis_id
51

52
53
  Arg [1]    : int $id
               the unique database identifier for the feature to be obtained
54
  Example    : $feat = $adaptor->fetch_by_analysis_id(1234);
55
56
57
58
59
  Description: Returns the feature created from the database defined by the
               the id $id.
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats
  Exceptions : thrown if $id is not defined
  Caller     : general
60

61
62
=cut

63
sub fetch_by_analysis_id {
64
65
66
  my ($self,$id) = @_;

  unless(defined $id) {
67
    throw("fetch_by_analysis_id must have an id");
68
69
70
71
72
73
  }

  my $constraint = "ast.analysis_id = $id";

  #return first element of _generic_fetch list
  my ($obj) = @{$self->_generic_fetch($constraint)};
74
75
76
77

  if(!defined($obj)) {
    throw("unable to fetch analysis_stats for analysis_id = $id\n");
  }
78
79
80
81
  return $obj;
}


82
83
sub fetch_all_by_suitability_rc_id_meadow_type {
    my ($self, $resource_class_id, $meadow_type) = @_;
84

85
86
87
88
    my $join = [[ ['analysis_base', 'a'], " ast.analysis_id=a.analysis_id "
                                                .( $resource_class_id ? "AND a.resource_class_id=$resource_class_id " : '')
                                                .( $meadow_type       ? "AND (a.meadow_type IS NULL OR a.meadow_type='$meadow_type') " : '')
               ]];
Leo Gordon's avatar
Leo Gordon committed
89

90
91
92
        # the ones that clearly have work to do:
        #
    my $primary_results = $self->_generic_fetch(
93
        "ast.num_required_workers>0 AND ast.status in ('READY', 'WORKING')" ,
94
        $join ,
95
        'ORDER BY a.priority DESC, ' . ( ($self->dbc->driver eq 'mysql') ? 'RAND()' : 'RANDOM()' ),
96
97
98
99
100
    );

        # the ones that may have work to do after a sync:
        #
    my $secondary_results = $self->_generic_fetch(
101
        "ast.status in ('LOADING', 'BLOCKED', 'ALL_CLAIMED', 'SYNCHING')" ,
102
103
104
105
106
        $join ,
        'ORDER BY last_update',     # FIXME: could mix in a.priority if sync is not too expensive?
    );

    return [ @$primary_results, @$secondary_results ];
107
108
}

109

110
111
112
113
114
115
116
117
118
=head2 refresh

  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Description: reload the AnalysisStats object from the database
  Returntype : Bio::EnsEMBL::Hive::AnalysisStats object - same one with reloaded data

=cut


119
sub refresh {
120
    my ($self, $stats) = @_;
121

122
    my $new_stats = $self->fetch_by_analysis_id( $stats->analysis_id );     # fetch into a separate object
123

124
    %$stats = %$new_stats;                                                  # copy the data over
125

126
    return $stats;
127
128
129
}


130
################
131
132
133
134
135
#
# STORE / UPDATE METHODS
#
################

136
137
138
139

sub store {
    my ($self, $stats) = @_;

140
    my $sql = "INSERT INTO analysis_stats (analysis_id, batch_size, hive_capacity, status) VALUES (?, ?, ?, ?)";
141
142

    my $sth = $self->prepare($sql);
143
    $sth->execute($stats->analysis_id, $stats->batch_size, $stats->hive_capacity, $stats->status);
144
145
146
147
148
149
150
151
    $sth->finish;

    $stats->adaptor( $self );

    return $stats;
}


152
=head2 update
153

154
155
156
157
158
159
  Arg [1]    : Bio::EnsEMBL::Hive::AnalysisStats object
  Example    :
  Description:
  Returntype : Bio::EnsEMBL::Hive::Worker
  Exceptions :
  Caller     :
160

161
162
163
164
165
=cut

sub update {
  my ($self, $stats) = @_;

166
167
168
  my $hive_capacity = $stats->hive_capacity;

  if ($stats->behaviour eq "DYNAMIC") {
169
170
171
172
173

    my $max_hive_capacity = $stats->avg_input_msec_per_job
        ? int($stats->input_capacity * $stats->avg_msec_per_job / $stats->avg_input_msec_per_job)
        : $hive_capacity;

174
175
176
177
178
179
    if ($stats->avg_output_msec_per_job) {
      my $max_hive_capacity2 = int($stats->output_capacity * $stats->avg_msec_per_job / $stats->avg_output_msec_per_job);
      if ($max_hive_capacity2 < $max_hive_capacity) {
        $max_hive_capacity = $max_hive_capacity2;
      }
    }
180
181

    $stats->hive_capacity( int( ($hive_capacity+$max_hive_capacity+1)/2 ) );
182
183
  }

184
  my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
185
  $sql .= ",batch_size=" . $stats->batch_size();
186
  $sql .= ",hive_capacity=" . (defined($stats->hive_capacity()) ? $stats->hive_capacity() : 'NULL');
187

188
  $sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job();
189
190
191
  $sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job();
  $sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job();
  $sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job();
192
193
194

  unless( $self->db->hive_use_triggers() ) {
      $sql .= ",total_job_count=" . $stats->total_job_count();
195
196
      $sql .= ",semaphored_job_count=" . $stats->semaphored_job_count();
      $sql .= ",ready_job_count=" . $stats->ready_job_count();
197
198
199
      $sql .= ",done_job_count=" . $stats->done_job_count();
      $sql .= ",failed_job_count=" . $stats->failed_job_count();

200
201
202
      $stats->num_running_workers( $self->db->get_Queen->count_running_workers( $stats->analysis_id() ) );
      $sql .= ",num_running_workers=" . $stats->num_running_workers();
  }
203

204
  $sql .= ",num_required_workers=" . $stats->num_required_workers();
205
  $sql .= ",last_update=CURRENT_TIMESTAMP";
Leo Gordon's avatar
Leo Gordon committed
206
  $sql .= ",sync_lock='0'";
207
208
209
210
211
  $sql .= " WHERE analysis_id='".$stats->analysis_id."' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
212
  $sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT CURRENT_TIMESTAMP, analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id);
213
214
  $sth->execute();
  $sth->finish;
215
216
217
218
  $stats->seconds_since_last_update(0); #not exact but good enough :)
}


219
sub update_status {
220
221
222
223
224
225
226
227
228
229
230
  my ($self, $analysis_id, $status) = @_;

  my $sql = "UPDATE analysis_stats SET status='$status' ";
  $sql .= " WHERE analysis_id='$analysis_id' ";

  my $sth = $self->prepare($sql);
  $sth->execute();
  $sth->finish;
}


231
232
233
234
235
=head2 interval_update_work_done

  Arg [1]     : int $analysis_id
  Arg [2]     : int $jobs_done_in_interval
  Arg [3]     : int $interval_msec
236
237
238
239
240
  Arg [4]     : int $fetching_msec
  Arg [5]     : int $running_msec
  Arg [6]     : int $writing_msec
  Arg [7]     : real $weight_factor [optional]
  Example     : $statsDBA->interval_update_work_done($analysis_id, $jobs_done, $interval_msec, $fetching_msec, $running_msec, $writing_msec);
241
242
243
244
245
246
247
  Description : does a database update to recalculate the avg_msec_per_job and done_job_count
                does an interval equation by multiplying out the previous done_job_count with the
                previous avg_msec_per_job and then expanding by new interval values to give a better average.
  Caller      : Bio::EnsEMBL::Hive::Worker

=cut

248
249
sub interval_update_work_done {
  my ($self, $analysis_id, $job_count, $interval_msec, $fetching_msec, $running_msec, $writing_msec, $weight_factor) = @_;
250
251

  $weight_factor ||= 3; # makes it more sensitive to the dynamics of the farm
252

253
254
255
256
257
258
259
260
261
262
  my $sql = $self->db->hive_use_triggers()
  ? qq{
    UPDATE analysis_stats SET
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count))
    WHERE analysis_id= $analysis_id
  }
  : qq{
263
264
265
266
267
    UPDATE analysis_stats SET
        avg_msec_per_job = (((done_job_count*avg_msec_per_job)/$weight_factor + $interval_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/$weight_factor + $fetching_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/$weight_factor + $running_msec) / (done_job_count/$weight_factor + $job_count)), 
        avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/$weight_factor + $writing_msec) / (done_job_count/$weight_factor + $job_count)), 
268
        ready_job_count = ready_job_count - $job_count, 
Leo Gordon's avatar
Leo Gordon committed
269
        done_job_count = done_job_count + $job_count 
270
271
    WHERE analysis_id= $analysis_id
  };
272

273
274
275
276
  $self->dbc->do($sql);
}


277
sub increase_running_workers {
278
279
280
281
282
283
284
285
286
287
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}


288
sub decrease_running_workers {
289
290
291
292
293
294
295
296
297
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
      " WHERE analysis_id='$analysis_id'";

  $self->dbc->do($sql);
}

298
299

sub decrease_required_workers {
300
301
302
  my $self = shift;
  my $analysis_id = shift;

303
304
  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers-1 ".
            "WHERE analysis_id='$analysis_id' ";
305

306
307
308
309
  $self->dbc->do($sql);
}


310
sub increase_required_workers {
311
312
313
314
315
316
317
  my $self = shift;
  my $analysis_id = shift;

  my $sql = "UPDATE analysis_stats SET num_required_workers=num_required_workers+1 ".
            "WHERE analysis_id='$analysis_id' ";

  $self->dbc->do($sql);
318
319
320
}


321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
#
# INTERNAL METHODS
#
###################

=head2 _generic_fetch

  Arg [1]    : (optional) string $constraint
               An SQL query constraint (i.e. part of the WHERE clause)
  Arg [2]    : (optional) string $logic_name
               the logic_name of the analysis of the features to obtain
  Example    : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
  Description: Performs a database fetch and returns feature objects in
               contig coordinates.
  Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
  Exceptions : none
  Caller     : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch

=cut
  
sub _generic_fetch {
342
  my ($self, $constraint, $join, $final_clause) = @_;
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
  
  my @tables = $self->_tables;
  my $columns = join(', ', $self->_columns());
  
  if ($join) {
    foreach my $single_join (@{$join}) {
      my ($tablename, $condition, $extra_columns) = @{$single_join};
      if ($tablename && $condition) {
        push @tables, $tablename;
        
        if($constraint) {
          $constraint .= " AND $condition";
        } else {
          $constraint = " $condition";
        }
      } 
      if ($extra_columns) {
        $columns .= ", " . join(', ', @{$extra_columns});
      }
    }
  }
      
  #construct a nice table string like 'table1 t1, table2 t2'
  my $tablenames = join(', ', map({ join(' ', @$_) } @tables));

  my $sql = "SELECT $columns FROM $tablenames";

  my $default_where = $self->_default_where_clause;

  #append a where clause if it was defined
  if($constraint) { 
    $sql .= " WHERE $constraint ";
    if($default_where) {
      $sql .= " AND $default_where ";
    }
  } elsif($default_where) {
    $sql .= " WHERE $default_where ";
  }

  #append additional clauses which may have been defined
383
  $sql .= " $final_clause" if($final_clause);
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
  #rint STDOUT $sql,"\n";

  my $sth = $self->prepare($sql);
  $sth->execute;  


  return $self->_objs_from_sth($sth);
}

sub _tables {
  my $self = shift;

  return (['analysis_stats', 'ast']);
}

sub _columns {
  my $self = shift;
401

402
403
404
  my @columns = qw (ast.analysis_id
                    ast.batch_size
                    ast.hive_capacity
405
406
                    ast.status

407
                    ast.total_job_count
408
409
                    ast.semaphored_job_count
                    ast.ready_job_count
410
                    ast.done_job_count
411
                    ast.failed_job_count
412
                    ast.num_running_workers
413
                    ast.num_required_workers
414
415
416
417
418
419
420
421
422
423

                    ast.behaviour
                    ast.input_capacity
                    ast.output_capacity

                    ast.avg_msec_per_job
                    ast.avg_input_msec_per_job
                    ast.avg_run_msec_per_job
                    ast.avg_output_msec_per_job

424
                    ast.last_update
425
                    ast.sync_lock
426
                   );
427

428
429
430
431
432
433
434
    push @columns, {
            'mysql'     => "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ",
            'sqlite'    => "strftime('%s','now')-strftime('%s',ast.last_update) seconds_since_last_update ",
            'pgsql'     => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - ast.last_update) seconds_since_last_update ",
        }->{ $self->dbc->driver };

    return @columns;            
435
436
437
438
439
440
441
442
443
444
445
}

sub _objs_from_sth {
  my ($self, $sth) = @_;
  
  my %column;
  $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));

  my @statsArray = ();

  while ($sth->fetch()) {
446
    my $analStats = Bio::EnsEMBL::Hive::AnalysisStats->new();
447

448
    $analStats->analysis_id($column{'analysis_id'});
449
450
    $analStats->batch_size($column{'batch_size'});
    $analStats->hive_capacity($column{'hive_capacity'});
451
452
    $analStats->status($column{'status'});

453
    $analStats->total_job_count($column{'total_job_count'});
454
455
    $analStats->semaphored_job_count($column{'semaphored_job_count'});
    $analStats->ready_job_count($column{'ready_job_count'});
456
    $analStats->done_job_count($column{'done_job_count'});
457
    $analStats->failed_job_count($column{'failed_job_count'});
458
    $analStats->num_running_workers($column{'num_running_workers'});
459
    $analStats->num_required_workers($column{'num_required_workers'});
460
461
462
463
464
465
466
467
468
469

    $analStats->behaviour($column{'behaviour'});
    $analStats->input_capacity($column{'input_capacity'});
    $analStats->output_capacity($column{'output_capacity'});

    $analStats->avg_msec_per_job($column{'avg_msec_per_job'});
    $analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
    $analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'});
    $analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'});

470
    $analStats->seconds_since_last_update($column{'seconds_since_last_update'});
471
    $analStats->sync_lock($column{'sync_lock'});
472

473
474
    $analStats->adaptor($self);

475
    push @statsArray, $analStats;
476
477
  }
  $sth->finish;
478

479
480
481
482
483
484
  return \@statsArray
}


1;