From 04a22618f1bde80ba134af29cd8b18e6987e2d62 Mon Sep 17 00:00:00 2001 From: Leo Gordon <lg4@ebi.ac.uk> Date: Fri, 17 Jan 2014 10:25:23 +0000 Subject: [PATCH] used input_column_mapping mechanism to inherit AnalysisStatsAdaptor from ObjectAdaptor --- modules/Bio/EnsEMBL/Hive/AnalysisStats.pm | 70 +++-- .../Hive/DBSQL/AnalysisStatsAdaptor.pm | 249 ++---------------- 2 files changed, 73 insertions(+), 246 deletions(-) diff --git a/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm b/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm index e9c6fbe7d..84586943c 100644 --- a/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm +++ b/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm @@ -39,9 +39,11 @@ use strict; use Scalar::Util ('weaken'); use Bio::EnsEMBL::Utils::Argument ('rearrange'); - +use Bio::EnsEMBL::Utils::Exception ('throw'); use Bio::EnsEMBL::Hive::Analysis; +use base ( 'Bio::EnsEMBL::Storable' ); # inherit dbID(), adaptor() and new() methods + ## Minimum amount of time in msec that a worker should run before reporting ## back to the hive. This is used when setting the batch_size automatically. @@ -52,15 +54,37 @@ sub min_batch_time { sub new { my $class = shift; - my $self = bless {}, $class; - - my ($analysis_id, $batch_size, $hive_capacity, $status) = - rearrange([qw(analysis_id batch_size hive_capacity status) ], @_); - $self->analysis_id($analysis_id) if(defined($analysis_id)); - $self->batch_size($batch_size) if(defined($batch_size)); - $self->hive_capacity($hive_capacity) if(defined($hive_capacity)); - $self->status($status) if(defined($status)); + my $self = $class->SUPER::new( @_ ); # deal with Storable stuff + + my ( $analysis_id, $batch_size, $hive_capacity, $status, + $total_job_count, $semaphored_job_count, $ready_job_count, $done_job_count, $failed_job_count, $num_running_workers, $num_required_workers, + $behaviour, $input_capacity, $output_capacity, $avg_msec_per_job, $avg_input_msec_per_job, $avg_run_msec_per_job, $avg_output_msec_per_job, + $seconds_since_last_update, $sync_lock) = + rearrange([qw(analysis_id batch_size hive_capacity status + total_job_count semaphored_job_count ready_job_count done_job_count failed_job_count num_running_workers num_required_workers + behaviour input_capacity output_capacity avg_msec_per_job avg_input_msec_per_job avg_run_msec_per_job avg_output_msec_per_job + seconds_since_last_update sync_lock ) ], @_); + $self->analysis_id($analysis_id) if(defined($analysis_id)); + $self->batch_size($batch_size) if(defined($batch_size)); + $self->hive_capacity($hive_capacity) if(defined($hive_capacity)); + $self->status($status) if(defined($status)); + $self->total_job_count($total_job_count) if(defined($total_job_count)); + $self->semaphored_job_count($semaphored_job_count) if(defined($semaphored_job_count)); + $self->ready_job_count($ready_job_count) if(defined($ready_job_count)); + $self->done_job_count($done_job_count) if(defined($done_job_count)); + $self->failed_job_count($failed_job_count) if(defined($failed_job_count)); + $self->num_running_workers($num_running_workers) if(defined($num_running_workers)); + $self->num_required_workers($num_required_workers) if(defined($num_required_workers)); + $self->behaviour($behaviour) if(defined($behaviour)); + $self->input_capacity($input_capacity) if(defined($input_capacity)); + $self->output_capacity($output_capacity) if(defined($output_capacity)); + $self->avg_msec_per_job($avg_msec_per_job) if(defined($avg_msec_per_job)); + $self->avg_input_msec_per_job($avg_input_msec_per_job) if(defined($avg_input_msec_per_job)); + $self->avg_run_msec_per_job($avg_run_msec_per_job) if(defined($avg_run_msec_per_job)); + $self->avg_output_msec_per_job($avg_output_msec_per_job) if(defined($avg_output_msec_per_job)); + $self->seconds_since_last_update($seconds_since_last_update)if(defined($seconds_since_last_update)); # NB: this is an input_column_mapped field + $self->sync_lock($sync_lock) if(defined($sync_lock)); return $self; } @@ -68,21 +92,9 @@ sub new { ## pre-settable storable object's getters/setters: - -sub adaptor { - my $self = shift; - - if(@_) { - $self->{'_adaptor'} = shift; - weaken $self->{'_adaptor'}; - } - return $self->{'_adaptor'}; -} - -sub analysis_id { +sub analysis_id { # an alias my $self = shift; - $self->{'_analysis_id'} = shift if(@_); - return $self->{'_analysis_id'}; + return $self->dbID(@_); } sub batch_size { @@ -206,10 +218,15 @@ sub avg_output_msec_per_job { } -## other storable ttributes: +## other storable attributes: +sub last_update { # this method is called by the initial store() [at which point it returns undef] + my $self = shift; + $self->{'_last_update'} = shift if(@_); + return $self->{'_last_update'}; +} -sub seconds_since_last_update { +sub seconds_since_last_update { # this method is mostly used to convert between server time and local time my( $self, $value ) = @_; $self->{'_last_update'} = time() - $value if(defined($value)); return time() - $self->{'_last_update'}; @@ -247,6 +264,9 @@ sub update_status { sub get_analysis { my $self = shift; unless($self->{'_analysis'}) { + unless($self->analysis_id) { + throw("self->analysis_id undefined, please investigate"); + } $self->{'_analysis'} = $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id); } return $self->{'_analysis'}; diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm index c11113731..307076aa7 100644 --- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm +++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm @@ -47,64 +47,49 @@ use Bio::EnsEMBL::Utils::Argument; use Bio::EnsEMBL::Utils::Exception ('throw'); use Bio::EnsEMBL::Hive::AnalysisStats; -use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor'); +use base ('Bio::EnsEMBL::Hive::DBSQL::ObjectAdaptor'); -=head2 fetch_by_analysis_id - - Arg [1] : int $id - the unique database identifier for the feature to be obtained - Example : $feat = $adaptor->fetch_by_analysis_id(1234); - Description: Returns the feature created from the database defined by the - the id $id. - Returntype : Bio::EnsEMBL::Hive::AnalysisStats - Exceptions : thrown if $id is not defined - Caller : general - -=cut - -sub fetch_by_analysis_id { - my ($self,$id) = @_; +sub default_table_name { + return 'analysis_stats'; +} - unless(defined $id) { - throw("fetch_by_analysis_id must have an id"); - } - my $constraint = "ast.analysis_id = $id"; +sub default_input_column_mapping { + my $self = shift @_; + my $driver = $self->dbc->driver(); + return { + 'last_update' => { + 'mysql' => "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(last_update) seconds_since_last_update ", + 'sqlite' => "strftime('%s','now')-strftime('%s',last_update) seconds_since_last_update ", + 'pgsql' => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - last_update) seconds_since_last_update ", + }->{$driver}, + }; +} - #return first element of _generic_fetch list - my ($obj) = @{$self->_generic_fetch($constraint)}; - if(!defined($obj)) { - throw("unable to fetch analysis_stats for analysis_id = $id\n"); - } - return $obj; +sub object_class { + return 'Bio::EnsEMBL::Hive::AnalysisStats'; } sub fetch_all_by_suitability_rc_id_meadow_type { my ($self, $resource_class_id, $meadow_type) = @_; - my $join = [[ ['analysis_base', 'a'], " ast.analysis_id=a.analysis_id " - .( $resource_class_id ? "AND a.resource_class_id=$resource_class_id " : '') - .( $meadow_type ? "AND (a.meadow_type IS NULL OR a.meadow_type='$meadow_type') " : '') - ]]; + my $join_and_filter_sql = "JOIN analysis_base USING (analysis_id) WHERE " + .( $resource_class_id ? "resource_class_id=$resource_class_id AND " : '') + .( $meadow_type ? "(meadow_type IS NULL OR meadow_type='$meadow_type') AND " : ''); # the ones that clearly have work to do: - # - my $primary_results = $self->_generic_fetch( - "ast.num_required_workers>0 AND ast.status in ('READY', 'WORKING')" , - $join , - 'ORDER BY a.priority DESC, ' . ( ($self->dbc->driver eq 'mysql') ? 'RAND()' : 'RANDOM()' ), - ); + my $primary_sql = "num_required_workers>0 AND status in ('READY', 'WORKING') " + ."ORDER BY priority DESC, ".( ($self->dbc->driver eq 'mysql') ? 'RAND()' : 'RANDOM()' ); # the ones that may have work to do after a sync: - # - my $secondary_results = $self->_generic_fetch( - "ast.status in ('LOADING', 'BLOCKED', 'ALL_CLAIMED', 'SYNCHING')" , - $join , - 'ORDER BY last_update', # FIXME: could mix in a.priority if sync is not too expensive? - ); + my $secondary_sql = "status in ('LOADING', 'BLOCKED', 'ALL_CLAIMED', 'SYNCHING') " + ."ORDER BY last_update"; # FIXME: could mix in a.priority if sync is not too expensive? + + my $primary_results = $self->fetch_all( $join_and_filter_sql . $primary_sql ); + my $secondary_results = $self->fetch_all( $join_and_filter_sql . $secondary_sql ); return [ @$primary_results, @$secondary_results ]; } @@ -118,7 +103,6 @@ sub fetch_all_by_suitability_rc_id_meadow_type { =cut - sub refresh { my ($self, $stats) = @_; @@ -132,26 +116,11 @@ sub refresh { ################ # -# STORE / UPDATE METHODS +# UPDATE METHODS # ################ -sub store { - my ($self, $stats) = @_; - - my $sql = "INSERT INTO analysis_stats (analysis_id, batch_size, hive_capacity, status) VALUES (?, ?, ?, ?)"; - - my $sth = $self->prepare($sql); - $sth->execute($stats->analysis_id, $stats->batch_size, $stats->hive_capacity, $stats->status); - $sth->finish; - - $stats->adaptor( $self ); - - return $stats; -} - - =head2 update Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object @@ -321,167 +290,5 @@ sub increase_required_workers { } -# -# INTERNAL METHODS -# -################### - -=head2 _generic_fetch - - Arg [1] : (optional) string $constraint - An SQL query constraint (i.e. part of the WHERE clause) - Arg [2] : (optional) string $logic_name - the logic_name of the analysis of the features to obtain - Example : $fts = $a->_generic_fetch('contig_id in (1234, 1235)', 'Swall'); - Description: Performs a database fetch and returns feature objects in - contig coordinates. - Returntype : listref of Bio::EnsEMBL::SeqFeature in contig coordinates - Exceptions : none - Caller : BaseFeatureAdaptor, ProxyDnaAlignFeatureAdaptor::_generic_fetch - -=cut - -sub _generic_fetch { - my ($self, $constraint, $join, $final_clause) = @_; - - my @tables = $self->_tables; - my $columns = join(', ', $self->_columns()); - - if ($join) { - foreach my $single_join (@{$join}) { - my ($tablename, $condition, $extra_columns) = @{$single_join}; - if ($tablename && $condition) { - push @tables, $tablename; - - if($constraint) { - $constraint .= " AND $condition"; - } else { - $constraint = " $condition"; - } - } - if ($extra_columns) { - $columns .= ", " . join(', ', @{$extra_columns}); - } - } - } - - #construct a nice table string like 'table1 t1, table2 t2' - my $tablenames = join(', ', map({ join(' ', @$_) } @tables)); - - my $sql = "SELECT $columns FROM $tablenames"; - - my $default_where = $self->_default_where_clause; - - #append a where clause if it was defined - if($constraint) { - $sql .= " WHERE $constraint "; - if($default_where) { - $sql .= " AND $default_where "; - } - } elsif($default_where) { - $sql .= " WHERE $default_where "; - } - - #append additional clauses which may have been defined - $sql .= " $final_clause" if($final_clause); - #rint STDOUT $sql,"\n"; - - my $sth = $self->prepare($sql); - $sth->execute; - - - return $self->_objs_from_sth($sth); -} - -sub _tables { - my $self = shift; - - return (['analysis_stats', 'ast']); -} - -sub _columns { - my $self = shift; - - my @columns = qw (ast.analysis_id - ast.batch_size - ast.hive_capacity - ast.status - - ast.total_job_count - ast.semaphored_job_count - ast.ready_job_count - ast.done_job_count - ast.failed_job_count - ast.num_running_workers - ast.num_required_workers - - ast.behaviour - ast.input_capacity - ast.output_capacity - - ast.avg_msec_per_job - ast.avg_input_msec_per_job - ast.avg_run_msec_per_job - ast.avg_output_msec_per_job - - ast.last_update - ast.sync_lock - ); - - push @columns, { - 'mysql' => "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ", - 'sqlite' => "strftime('%s','now')-strftime('%s',ast.last_update) seconds_since_last_update ", - 'pgsql' => "EXTRACT(EPOCH FROM CURRENT_TIMESTAMP - ast.last_update) seconds_since_last_update ", - }->{ $self->dbc->driver }; - - return @columns; -} - -sub _objs_from_sth { - my ($self, $sth) = @_; - - my %column; - $sth->bind_columns( \( @column{ @{$sth->{NAME_lc} } } )); - - my @statsArray = (); - - while ($sth->fetch()) { - my $analStats = Bio::EnsEMBL::Hive::AnalysisStats->new(); - - $analStats->analysis_id($column{'analysis_id'}); - $analStats->batch_size($column{'batch_size'}); - $analStats->hive_capacity($column{'hive_capacity'}); - $analStats->status($column{'status'}); - - $analStats->total_job_count($column{'total_job_count'}); - $analStats->semaphored_job_count($column{'semaphored_job_count'}); - $analStats->ready_job_count($column{'ready_job_count'}); - $analStats->done_job_count($column{'done_job_count'}); - $analStats->failed_job_count($column{'failed_job_count'}); - $analStats->num_running_workers($column{'num_running_workers'}); - $analStats->num_required_workers($column{'num_required_workers'}); - - $analStats->behaviour($column{'behaviour'}); - $analStats->input_capacity($column{'input_capacity'}); - $analStats->output_capacity($column{'output_capacity'}); - - $analStats->avg_msec_per_job($column{'avg_msec_per_job'}); - $analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'}); - $analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'}); - $analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'}); - - $analStats->seconds_since_last_update($column{'seconds_since_last_update'}); - $analStats->sync_lock($column{'sync_lock'}); - - $analStats->adaptor($self); - - push @statsArray, $analStats; - } - $sth->finish; - - return \@statsArray -} - - 1; -- GitLab