Commit adfd7254 authored by Leo Gordon's avatar Leo Gordon
Browse files

make AnalysisStats a rearrangeable EnsEMBL-style constructor, a proper store...

make AnalysisStats a rearrangeable EnsEMBL-style constructor, a proper store method and other preparations
parent 582cca3a
......@@ -28,8 +28,10 @@ package Bio::EnsEMBL::Hive::AnalysisStats;
use strict;
use Scalar::Util ('weaken');
use Bio::EnsEMBL::Utils::Argument; # import 'rearrange()'
use Bio::EnsEMBL::Hive::Analysis;
## Minimum amount of time in msec that a worker should run before reporting
## back to the hive. This is used when setting the batch_size automatically.
sub min_batch_time {
......@@ -38,242 +40,260 @@ sub min_batch_time {
sub new {
my ($class,@args) = @_;
my $self = bless {}, $class;
return $self;
my $class = shift;
my $self = bless {}, $class;
my ($analysis_id, $batch_size, $hive_capacity, $failed_job_tolerance, $max_retry_count, $can_be_empty, $priority, $status) =
rearrange([qw(analysis_id batch_size hive_capacity failed_job_tolerance max_retry_count can_be_empty priority status) ], @_);
$self->analysis_id($analysis_id) if(defined($analysis_id));
$self->batch_size($batch_size) if(defined($batch_size));
$self->hive_capacity($hive_capacity) if(defined($hive_capacity));
$self->failed_job_tolerance($failed_job_tolerance) if(defined($failed_job_tolerance));
$self->max_retry_count($max_retry_count) if(defined($max_retry_count));
$self->can_be_empty($can_be_empty) if(defined($can_be_empty));
$self->priority($priority) if(defined($priority));
$self->status($status) if(defined($status));
return $self;
}
## pre-settable storable object's getters/setters:
sub adaptor {
my $self = shift @_;
my $self = shift;
if(@_) {
$self->{'_adaptor'} = shift @_;
$self->{'_adaptor'} = shift;
weaken $self->{'_adaptor'};
}
return $self->{'_adaptor'};
}
sub refresh {
sub analysis_id {
my $self = shift;
return $self->adaptor && $self->adaptor->refresh($self);
$self->{'_analysis_id'} = shift if(@_);
return $self->{'_analysis_id'};
}
sub update {
my $self = shift;
return unless($self->adaptor);
$self->adaptor->update($self);
sub batch_size {
my $self = shift;
$self->{'_batch_size'} = shift if(@_);
$self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
return $self->{'_batch_size'};
}
sub update_status {
my ($self, $status ) = @_;
return unless($self->adaptor);
$self->adaptor->update_status($self->analysis_id, $status);
$self->status($status);
sub hive_capacity {
my $self = shift;
$self->{'_hive_capacity'} = shift if(@_);
$self->{'_hive_capacity'} = 1 unless(defined($self->{'_hive_capacity'}));
return $self->{'_hive_capacity'};
}
sub analysis_id {
my $self = shift;
$self->{'_analysis_id'} = shift if(@_);
return $self->{'_analysis_id'};
sub failed_job_tolerance {
my $self = shift;
$self->{'_failed_job_tolerance'} = shift if(@_);
$self->{'_failed_job_tolerance'} = 0 unless(defined($self->{'_failed_job_tolerance'}));
return $self->{'_failed_job_tolerance'};
}
sub get_analysis {
my $self = shift;
unless($self->{'_analysis'}) {
$self->{'_analysis'} = $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id);
}
return $self->{'_analysis'};
sub max_retry_count {
my $self = shift;
$self->{'_max_retry_count'} = shift if(@_);
$self->{'_max_retry_count'} = 3 unless(defined($self->{'_max_retry_count'}));
return $self->{'_max_retry_count'};
}
sub status {
my ($self, $value ) = @_;
if(defined $value) {
$self->{'_status'} = $value;
}
return $self->{'_status'};
sub can_be_empty {
my $self = shift;
$self->{'_can_be_empty'} = shift if(@_);
$self->{'_can_be_empty'} = 0 unless(defined($self->{'_can_be_empty'}));
return $self->{'_can_be_empty'};
}
sub batch_size {
sub priority {
my $self = shift;
$self->{'_batch_size'} = shift if(@_);
$self->{'_batch_size'} = 1 unless(defined($self->{'_batch_size'})); # only initialize when undefined, so if defined as 0 will stay 0
return $self->{'_batch_size'};
$self->{'_priority'} = shift if(@_);
$self->{'_priority'} = 0 unless(defined($self->{'_priority'}));
return $self->{'_priority'};
}
sub get_or_estimate_batch_size {
sub status {
my $self = shift;
$self->{'_status'} = shift if(@_);
return $self->{'_status'};
}
if( (my $batch_size = $self->batch_size())>0 ) { # set to positive or not set (and auto-initialized within $self->batch_size)
return $batch_size;
# otherwise it is a request for dynamic estimation:
} elsif( my $avg_msec_per_job = $self->avg_msec_per_job() ) { # further estimations from collected stats
$avg_msec_per_job = 100 if($avg_msec_per_job<100);
return POSIX::ceil( $self->min_batch_time() / $avg_msec_per_job );
## non-presettable storable object's getters/setters:
} else { # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
return -$batch_size || 1;
}
}
sub avg_msec_per_job {
my $self = shift;
$self->{'_avg_msec_per_job'} = shift if(@_);
$self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
return $self->{'_avg_msec_per_job'};
my $self = shift;
$self->{'_avg_msec_per_job'} = shift if(@_);
$self->{'_avg_msec_per_job'}=0 unless($self->{'_avg_msec_per_job'});
return $self->{'_avg_msec_per_job'};
}
sub avg_input_msec_per_job {
my $self = shift;
$self->{'_avg_input_msec_per_job'} = shift if(@_);
$self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
return $self->{'_avg_input_msec_per_job'};
my $self = shift;
$self->{'_avg_input_msec_per_job'} = shift if(@_);
$self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
return $self->{'_avg_input_msec_per_job'};
}
sub avg_run_msec_per_job {
my $self = shift;
$self->{'_avg_run_msec_per_job'} = shift if(@_);
$self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
return $self->{'_avg_run_msec_per_job'};
my $self = shift;
$self->{'_avg_run_msec_per_job'} = shift if(@_);
$self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
return $self->{'_avg_run_msec_per_job'};
}
sub avg_output_msec_per_job {
my $self = shift;
$self->{'_avg_output_msec_per_job'} = shift if(@_);
$self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
return $self->{'_avg_output_msec_per_job'};
}
sub cpu_minutes_remaining {
my $self = shift;
return ($self->avg_msec_per_job * $self->unclaimed_job_count / 60000);
}
sub hive_capacity {
my $self = shift;
$self->{'_hive_capacity'} = shift if(@_);
return $self->{'_hive_capacity'};
my $self = shift;
$self->{'_avg_output_msec_per_job'} = shift if(@_);
$self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
return $self->{'_avg_output_msec_per_job'};
}
sub behaviour {
my $self = shift;
$self->{'_behaviour'} = shift if(@_);
return $self->{'_behaviour'};
my $self = shift;
$self->{'_behaviour'} = shift if(@_);
return $self->{'_behaviour'};
}
sub input_capacity {
my $self = shift;
$self->{'_input_capacity'} = shift if(@_);
return $self->{'_input_capacity'};
my $self = shift;
$self->{'_input_capacity'} = shift if(@_);
return $self->{'_input_capacity'};
}
sub output_capacity {
my $self = shift;
$self->{'_output_capacity'} = shift if(@_);
return $self->{'_output_capacity'};
my $self = shift;
$self->{'_output_capacity'} = shift if(@_);
return $self->{'_output_capacity'};
}
sub total_job_count {
my $self = shift;
$self->{'_total_job_count'} = shift if(@_);
return $self->{'_total_job_count'};
my $self = shift;
$self->{'_total_job_count'} = shift if(@_);
return $self->{'_total_job_count'};
}
sub unclaimed_job_count {
my $self = shift;
$self->{'_unclaimed_job_count'} = shift if(@_);
return $self->{'_unclaimed_job_count'};
my $self = shift;
$self->{'_unclaimed_job_count'} = shift if(@_);
return $self->{'_unclaimed_job_count'};
}
sub done_job_count {
my $self = shift;
$self->{'_done_job_count'} = shift if(@_);
return $self->{'_done_job_count'};
my $self = shift;
$self->{'_done_job_count'} = shift if(@_);
return $self->{'_done_job_count'};
}
sub failed_job_count {
my $self = shift;
$self->{'_failed_job_count'} = shift if(@_);
$self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
return $self->{'_failed_job_count'};
my $self = shift;
$self->{'_failed_job_count'} = shift if(@_);
$self->{'_failed_job_count'} = 0 unless(defined($self->{'_failed_job_count'}));
return $self->{'_failed_job_count'};
}
sub max_retry_count {
my $self = shift;
$self->{'_max_retry_count'} = shift if(@_);
$self->{'_max_retry_count'} = 3 unless(defined($self->{'_max_retry_count'}));
return $self->{'_max_retry_count'};
sub num_running_workers {
my $self = shift;
$self->{'_num_running_workers'} = shift if(@_);
return $self->{'_num_running_workers'};
}
sub failed_job_tolerance {
my $self = shift;
$self->{'_failed_job_tolerance'} = shift if(@_);
$self->{'_failed_job_tolerance'} = 0 unless(defined($self->{'_failed_job_tolerance'}));
return $self->{'_failed_job_tolerance'};
sub num_required_workers {
my $self = shift;
$self->{'_num_required_workers'} = shift if(@_);
return $self->{'_num_required_workers'};
}
sub running_job_count {
my $self = shift;
return $self->total_job_count
- $self->done_job_count
- $self->unclaimed_job_count
- $self->failed_job_count;
sub seconds_since_last_update {
my( $self, $value ) = @_;
$self->{'_last_update'} = time() - $value if(defined($value));
return time() - $self->{'_last_update'};
}
sub remaining_job_count {
my $self = shift;
return $self->total_job_count
- $self->done_job_count
- $self->failed_job_count;
sub sync_lock {
my $self = shift;
$self->{'_sync_lock'} = shift if(@_);
return $self->{'_sync_lock'};
}
sub num_running_workers {
my $self = shift;
$self->{'_num_running_workers'} = shift if(@_);
return $self->{'_num_running_workers'};
# non-storable attributes and other helper-methods:
sub refresh {
my $self = shift;
return $self->adaptor && $self->adaptor->refresh($self);
}
sub num_required_workers {
my $self = shift;
$self->{'_num_required_workers'} = shift if(@_);
return $self->{'_num_required_workers'};
sub update {
my $self = shift;
return unless($self->adaptor);
$self->adaptor->update($self);
}
sub seconds_since_last_update {
my( $self, $value ) = @_;
$self->{'_last_update'} = time() - $value if(defined($value));
return time() - $self->{'_last_update'};
sub update_status {
my ($self, $status ) = @_;
return unless($self->adaptor);
$self->adaptor->update_status($self->analysis_id, $status);
$self->status($status);
}
sub sync_lock {
my $self = shift;
$self->{'_sync_lock'} = shift if(@_);
return $self->{'_sync_lock'};
sub get_analysis {
my $self = shift;
unless($self->{'_analysis'}) {
$self->{'_analysis'} = $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id);
}
return $self->{'_analysis'};
}
sub can_be_empty {
sub get_or_estimate_batch_size {
my $self = shift;
$self->{'_can_be_empty'} = shift if(@_);
return $self->{'_can_be_empty'};
if( (my $batch_size = $self->batch_size())>0 ) { # set to positive or not set (and auto-initialized within $self->batch_size)
return $batch_size;
# otherwise it is a request for dynamic estimation:
} elsif( my $avg_msec_per_job = $self->avg_msec_per_job() ) { # further estimations from collected stats
$avg_msec_per_job = 100 if($avg_msec_per_job<100);
return POSIX::ceil( $self->min_batch_time() / $avg_msec_per_job );
} else { # first estimation when no stats are available (take -$batch_size as first guess, if not zero)
return -$batch_size || 1;
}
}
sub priority {
my $self = shift;
sub cpu_minutes_remaining {
my $self = shift;
return ($self->avg_msec_per_job * $self->unclaimed_job_count / 60000);
}
$self->{'_priority'} = shift if(@_);
return $self->{'_priority'};
sub running_job_count {
my $self = shift;
return $self->total_job_count
- $self->done_job_count
- $self->unclaimed_job_count
- $self->failed_job_count;
}
sub remaining_job_count {
my $self = shift;
return $self->total_job_count
- $self->done_job_count
- $self->failed_job_count;
}
sub print_stats {
my $self = shift;
my $mode = shift;
......
......@@ -46,17 +46,6 @@ use Bio::EnsEMBL::Utils::Exception;
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
sub create_new_for_analysis_id{
my ($self, $analysis_id) = @_;
my $insertion_method = ($self->dbc->driver eq 'sqlite') ? 'INSERT OR IGNORE' : 'INSERT IGNORE';
my $sql = "$insertion_method INTO analysis_stats (analysis_id) VALUES ($analysis_id)";
my $sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
}
=head2 fetch_by_analysis_id
Arg [1] : int $id
......@@ -149,6 +138,23 @@ sub refresh {
#
################
sub store {
my ($self, $stats) = @_;
my $sql = "INSERT INTO analysis_stats (analysis_id, batch_size, hive_capacity, failed_job_tolerance, max_retry_count, can_be_empty, priority, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
my $sth = $self->prepare($sql);
$sth->execute($stats->analysis_id, $stats->batch_size, $stats->hive_capacity, $stats->failed_job_tolerance,
$stats->max_retry_count, $stats->can_be_empty, $stats->priority, $stats->status);
$sth->finish;
$stats->adaptor( $self );
return $stats;
}
=head2 update
Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
......@@ -182,14 +188,14 @@ sub update {
}
my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
$sql .= ",batch_size=" . $stats->batch_size();
# $sql .= ",batch_size=" . $stats->batch_size();
$sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job();
$sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job();
$sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job();
$sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job();
$sql .= ",hive_capacity=" . $stats->hive_capacity();
$sql .= ",max_retry_count=" . $stats->max_retry_count();
$sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance();
# $sql .= ",max_retry_count=" . $stats->max_retry_count();
# $sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance();
unless( $self->db->hive_use_triggers() ) {
$sql .= ",total_job_count=" . $stats->total_job_count();
......@@ -204,8 +210,8 @@ sub update {
$sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",last_update=CURRENT_TIMESTAMP";
$sql .= ",sync_lock='0'";
$sql .= ",can_be_empty=". $stats->can_be_empty();
$sql .= ",priority=". $stats->priority();
# $sql .= ",can_be_empty=". $stats->can_be_empty();
# $sql .= ",priority=". $stats->priority();
$sql .= " WHERE analysis_id='".$stats->analysis_id."' ";
my $sth = $self->prepare($sql);
......@@ -215,12 +221,10 @@ sub update {
$sth->execute();
$sth->finish;
$stats->seconds_since_last_update(0); #not exact but good enough :)
}
sub update_status
{
sub update_status {
my ($self, $analysis_id, $status) = @_;
my $sql = "UPDATE analysis_stats SET status='$status' ";
......@@ -278,8 +282,7 @@ sub interval_update_work_done {
}
sub increase_running_workers
{
sub increase_running_workers {
my $self = shift;
my $analysis_id = shift;
......@@ -290,8 +293,7 @@ sub increase_running_workers
}
sub decrease_running_workers
{
sub decrease_running_workers {
my $self = shift;
my $analysis_id = shift;
......@@ -301,8 +303,8 @@ sub decrease_running_workers
$self->dbc->do($sql);
}
sub decrease_required_workers
{
sub decrease_required_workers {
my $self = shift;
my $analysis_id = shift;
......@@ -313,8 +315,7 @@ sub decrease_required_workers
}
sub increase_required_workers
{
sub increase_required_workers {
my $self = shift;
my $analysis_id = shift;
......@@ -472,6 +473,7 @@ sub _objs_from_sth {
$analStats->num_running_workers($column{'num_running_workers'});
$analStats->num_required_workers($column{'num_required_workers'});
$analStats->seconds_since_last_update($column{'seconds_since_last_update'});
$analStats->adaptor($self);
push @statsArray, $analStats;
......
......@@ -53,6 +53,7 @@ use Bio::EnsEMBL::Hive::Utils 'stringify'; # import 'stringify()'
use Bio::EnsEMBL::Hive::DBSQL::DBAdaptor;
use Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use Bio::EnsEMBL::Hive::Analysis;
use Bio::EnsEMBL::Hive::AnalysisStats;
use Bio::EnsEMBL::Hive::Extensions;
use base ('Bio::EnsEMBL::Hive::DependentOptions');
......@@ -422,17 +423,17 @@ sub run {
);
$analysis_adaptor->store($analysis);
$analysis_stats_adaptor->create_new_for_analysis_id($analysis->dbID);
my $stats = $analysis->stats();
$stats->batch_size( $batch_size ) if(defined($batch_size));
$stats->hive_capacity( $hive_capacity ) if(defined($hive_capacity));
$stats->failed_job_tolerance( $failed_job_tolerance ) if(defined($failed_job_tolerance));
$stats->max_retry_count( $max_retry_count ) if(defined($max_retry_count));
$stats->can_be_empty( $can_be_empty ) if(defined($can_be_empty));
$stats->priority( $priority ) if(defined($priority));
$stats->status($blocked ? 'BLOCKED' : 'READY'); # be careful, as this "soft" way of blocking may be accidentally unblocked by deep sync
$stats->update();
my $stats = Bio::EnsEMBL::Hive::AnalysisStats->new(
-analysis_id => $analysis->dbID,
-batch_size => $batch_size,
-hive_capacity => $hive_capacity,
-failed_job_tolerance => $failed_job_tolerance,
-max_retry_count => $max_retry_count,
-can_be_empty => $can_be_empty,
-priority => $priority,
-status => $blocked ? 'BLOCKED' : 'READY', # be careful, as this "soft" way of blocking may be accidentally unblocked by deep sync
);
$analysis_stats_adaptor->store($stats);
}
# now create the corresponding jobs (if there are any):
......