Commit 04a22618 authored by Leo Gordon's avatar Leo Gordon
Browse files

used input_column_mapping mechanism to inherit AnalysisStatsAdaptor from ObjectAdaptor

parent 5b379206
......@@ -39,9 +39,11 @@ use strict;
use Scalar::Util ('weaken');
use Bio::EnsEMBL::Utils::Argument ('rearrange');
use Bio::EnsEMBL::Utils::Exception ('throw');
use Bio::EnsEMBL::Hive::Analysis;
use base ( 'Bio::EnsEMBL::Storable' ); # inherit dbID(), adaptor() and new() methods
## Minimum amount of time in msec that a worker should run before reporting
## back to the hive. This is used when setting the batch_size automatically.
......@@ -52,15 +54,37 @@ sub min_batch_time {
sub new {
my $class = shift;
my $self = bless {}, $class;
my ($analysis_id, $batch_size, $hive_capacity, $status) =
rearrange([qw(analysis_id batch_size hive_capacity status) ], @_);
$self->analysis_id($analysis_id) if(defined($analysis_id));
$self->batch_size($batch_size) if(defined($batch_size));
$self->hive_capacity($hive_capacity) if(defined($hive_capacity));
$self->status($status) if(defined($status));
my $self = $class->SUPER::new( @_ ); # deal with Storable stuff
my ( $analysis_id, $batch_size, $hive_capacity, $status,
$total_job_count, $semaphored_job_count, $ready_job_count, $done_job_count, $failed_job_count, $num_running_workers, $num_required_workers,
$behaviour, $input_capacity, $output_capacity, $avg_msec_per_job, $avg_input_msec_per_job, $avg_run_msec_per_job, $avg_output_msec_per_job,
$seconds_since_last_update, $sync_lock) =
rearrange([qw(analysis_id batch_size hive_capacity status
total_job_count semaphored_job_count ready_job_count done_job_count failed_job_count num_running_workers num_required_workers
behaviour input_capacity output_capacity avg_msec_per_job avg_input_msec_per_job avg_run_msec_per_job avg_output_msec_per_job
seconds_since_last_update sync_lock ) ], @_);
$self->analysis_id($analysis_id) if(defined($analysis_id));
$self->batch_size($batch_size) if(defined($batch_size));
$self->hive_capacity($hive_capacity) if(defined($hive_capacity));
$self->status($status) if(defined($status));
$self->total_job_count($total_job_count) if(defined($total_job_count));
$self->semaphored_job_count($semaphored_job_count) if(defined($semaphored_job_count));
$self->ready_job_count($ready_job_count) if(defined($ready_job_count));
$self->done_job_count($done_job_count) if(defined($done_job_count));
$self->failed_job_count($failed_job_count) if(defined($failed_job_count));
$self->num_running_workers($num_running_workers) if(defined($num_running_workers));
$self->num_required_workers($num_required_workers) if(defined($num_required_workers));
$self->behaviour($behaviour) if(defined($behaviour));
$self->input_capacity($input_capacity) if(defined($input_capacity));
$self->output_capacity($output_capacity) if(defined($output_capacity));
$self->avg_msec_per_job($avg_msec_per_job) if(defined($avg_msec_per_job));
$self->avg_input_msec_per_job($avg_input_msec_per_job) if(defined($avg_input_msec_per_job));
$self->avg_run_msec_per_job($avg_run_msec_per_job) if(defined($avg_run_msec_per_job));
$self->avg_output_msec_per_job($avg_output_msec_per_job) if(defined($avg_output_msec_per_job));
$self->seconds_since_last_update($seconds_since_last_update)if(defined($seconds_since_last_update)); # NB: this is an input_column_mapped field
$self->sync_lock($sync_lock) if(defined($sync_lock));
return $self;
}
......@@ -68,21 +92,9 @@ sub new {
## pre-settable storable object's getters/setters:
sub adaptor {
my $self = shift;
if(@_) {
$self->{'_adaptor'} = shift;
weaken $self->{'_adaptor'};
}
return $self->{'_adaptor'};
}
sub analysis_id {
sub analysis_id { # an alias
my $self = shift;
$self->{'_analysis_id'} = shift if(@_);
return $self->{'_analysis_id'};
return $self->dbID(@_);
}
sub batch_size {
......@@ -206,10 +218,15 @@ sub avg_output_msec_per_job {
}
## other storable ttributes:
## other storable attributes:
sub last_update { # this method is called by the initial store() [at which point it returns undef]
my $self = shift;
$self->{'_last_update'} = shift if(@_);
return $self->{'_last_update'};
}
sub seconds_since_last_update {
sub seconds_since_last_update { # this method is mostly used to convert between server time and local time
my( $self, $value ) = @_;
$self->{'_last_update'} = time() - $value if(defined($value));
return time() - $self->{'_last_update'};
......@@ -247,6 +264,9 @@ sub update_status {
sub get_analysis {
my $self = shift;
unless($self->{'_analysis'}) {
unless($self->analysis_id) {
throw("self->analysis_id undefined, please investigate");
}
$self->{'_analysis'} = $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id);
}
return $self->{'_analysis'};
......
......@@ -47,64 +47,49 @@ use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception ('throw');
use Bio::EnsEMBL::Hive::AnalysisStats;
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor');
=head2 fetch_by_analysis_id
Arg [1] : int $id
the unique database identifier for the feature to be obtained
Example : $feat = $adaptor->fetch_by_analysis_id(1234);
Description: Returns the feature created from the database defined by the
the id $id.
Returntype : Bio::EnsEMBL::Hive::AnalysisStats
Exceptions : thrown if $id is not defined
Caller : general
=cut
sub fetch_by_analysis_id {
my ($self,$id) = @_;
sub default_table_name {
return 'analysis_stats';
}
unless(defined $id) {
throw("fetch_by_analysis_id must have an id");
}
my $constraint = "ast.analysis_id = $id";
sub default_input_column_mapping {
my $self = shift @_;
my $driver = $self->dbc->driver();
return {
'last_update' => {
'mysql' => "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(last_update) seconds_since_last_update ",
'sqlite' => "strftime('%s','now')-strftime('%s',last_update) seconds_since_last_update ",
'pgsql' => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - last_update) seconds_since_last_update ",
}->{$driver},
};
}
#return first element of _generic_fetch list
my ($obj) = @{$self->_generic_fetch($constraint)};
if(!defined($obj)) {
throw("unable to fetch analysis_stats for analysis_id = $id\n");
}
return $obj;
sub object_class {
return 'Bio::EnsEMBL::Hive::AnalysisStats';
}
sub fetch_all_by_suitability_rc_id_meadow_type {
my ($self, $resource_class_id, $meadow_type) = @_;
my $join = [[ ['analysis_base', 'a'], " ast.analysis_id=a.analysis_id "
.( $resource_class_id ? "AND a.resource_class_id=$resource_class_id " : '')
.( $meadow_type ? "AND (a.meadow_type IS NULL OR a.meadow_type='$meadow_type') " : '')
]];
my $join_and_filter_sql = "JOIN analysis_base USING (analysis_id) WHERE "
.( $resource_class_id ? "resource_class_id=$resource_class_id AND " : '')
.( $meadow_type ? "(meadow_type IS NULL OR meadow_type='$meadow_type') AND " : '');
# the ones that clearly have work to do:
#
my $primary_results = $self->_generic_fetch(
"ast.num_required_workers>0 AND ast.status in ('READY', 'WORKING')" ,
$join ,
'ORDER BY a.priority DESC, ' . ( ($self->dbc->driver eq 'mysql') ? 'RAND()' : 'RANDOM()' ),
);
my $primary_sql = "num_required_workers>0 AND status in ('READY', 'WORKING') "
."ORDER BY priority DESC, ".( ($self->dbc->driver eq 'mysql') ? 'RAND()' : 'RANDOM()' );
# the ones that may have work to do after a sync:
#
my $secondary_results = $self->_generic_fetch(
"ast.status in ('LOADING', 'BLOCKED', 'ALL_CLAIMED', 'SYNCHING')" ,
$join ,
'ORDER BY last_update', # FIXME: could mix in a.priority if sync is not too expensive?
);
my $secondary_sql = "status in ('LOADING', 'BLOCKED', 'ALL_CLAIMED', 'SYNCHING') "
."ORDER BY last_update"; # FIXME: could mix in a.priority if sync is not too expensive?
my $primary_results = $self->fetch_all( $join_and_filter_sql . $primary_sql );
my $secondary_results = $self->fetch_all( $join_and_filter_sql . $secondary_sql );
return [ @$primary_results, @$secondary_results ];
}
......@@ -118,7 +103,6 @@ sub fetch_all_by_suitability_rc_id_meadow_type {
=cut
sub refresh {
my ($self, $stats) = @_;
......@@ -132,26 +116,11 @@ sub refresh {
################
#
# STORE / UPDATE METHODS
# UPDATE METHODS
#
################
sub store {
my ($self, $stats) = @_;
my $sql = "INSERT INTO analysis_stats (analysis_id, batch_size, hive_capacity, status) VALUES (?, ?, ?, ?)";
my $sth = $self->prepare($sql);
$sth->execute($stats->analysis_id, $stats->batch_size, $stats->hive_capacity, $stats->status);
$sth->finish;
$stats->adaptor( $self );
return $stats;
}
=head2 update
Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
......@@ -321,167 +290,5 @@ sub increase_required_workers {
}
#
# INTERNAL METHODS
#
###################
=head2 _generic_fetch
Arg [1] : (optional) string $constraint
An SQL query constraint (i.e. part of the WHERE clause)
Arg [2] : (optional) string $logic_name
the logic_name of the analysis of the features to obtain
Example : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall');
Description: Performs a database fetch and returns feature objects in
contig coordinates.
Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates
Exceptions : none
Caller : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch
=cut
sub _generic_fetch {
my ($self, $constraint, $join, $final_clause) = @_;
my @tables = $self->_tables;
my $columns = join(', ', $self->_columns());
if ($join) {
foreach my $single_join (@{$join}) {
my ($tablename, $condition, $extra_columns) = @{$single_join};
if ($tablename && $condition) {
push @tables, $tablename;
if($constraint) {
$constraint .= " AND $condition";
} else {
$constraint = " $condition";
}
}
if ($extra_columns) {
$columns .= ", " . join(', ', @{$extra_columns});
}
}
}
#construct a nice table string like 'table1 t1, table2 t2'
my $tablenames = join(', ', map({ join(' ', @$_) } @tables));
my $sql = "SELECT $columns FROM $tablenames";
my $default_where = $self->_default_where_clause;
#append a where clause if it was defined
if($constraint) {
$sql .= " WHERE $constraint ";
if($default_where) {
$sql .= " AND $default_where ";
}
} elsif($default_where) {
$sql .= " WHERE $default_where ";
}
#append additional clauses which may have been defined
$sql .= " $final_clause" if($final_clause);
#rint STDOUT $sql,"\n";
my $sth = $self->prepare($sql);
$sth->execute;
return $self->_objs_from_sth($sth);
}
sub _tables {
my $self = shift;
return (['analysis_stats', 'ast']);
}
sub _columns {
my $self = shift;
my @columns = qw (ast.analysis_id
ast.batch_size
ast.hive_capacity
ast.status
ast.total_job_count
ast.semaphored_job_count
ast.ready_job_count
ast.done_job_count
ast.failed_job_count
ast.num_running_workers
ast.num_required_workers
ast.behaviour
ast.input_capacity
ast.output_capacity
ast.avg_msec_per_job
ast.avg_input_msec_per_job
ast.avg_run_msec_per_job
ast.avg_output_msec_per_job
ast.last_update
ast.sync_lock
);
push @columns, {
'mysql' => "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ",
'sqlite' => "strftime('%s','now')-strftime('%s',ast.last_update) seconds_since_last_update ",
'pgsql' => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - ast.last_update) seconds_since_last_update ",
}->{ $self->dbc->driver };
return @columns;
}
sub _objs_from_sth {
my ($self, $sth) = @_;
my %column;
$sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } ));
my @statsArray = ();
while ($sth->fetch()) {
my $analStats = Bio::EnsEMBL::Hive::AnalysisStats->new();
$analStats->analysis_id($column{'analysis_id'});
$analStats->batch_size($column{'batch_size'});
$analStats->hive_capacity($column{'hive_capacity'});
$analStats->status($column{'status'});
$analStats->total_job_count($column{'total_job_count'});
$analStats->semaphored_job_count($column{'semaphored_job_count'});
$analStats->ready_job_count($column{'ready_job_count'});
$analStats->done_job_count($column{'done_job_count'});
$analStats->failed_job_count($column{'failed_job_count'});
$analStats->num_running_workers($column{'num_running_workers'});
$analStats->num_required_workers($column{'num_required_workers'});
$analStats->behaviour($column{'behaviour'});
$analStats->input_capacity($column{'input_capacity'});
$analStats->output_capacity($column{'output_capacity'});
$analStats->avg_msec_per_job($column{'avg_msec_per_job'});
$analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
$analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'});
$analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'});
$analStats->seconds_since_last_update($column{'seconds_since_last_update'});
$analStats->sync_lock($column{'sync_lock'});
$analStats->adaptor($self);
push @statsArray, $analStats;
}
$sth->finish;
return \@statsArray
}
1;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment