Commit 15fbc94f authored by Javier Herrero's avatar Javier Herrero
Browse files

Implement Dynamic behaviour option

parent cf90f238
......@@ -41,6 +41,12 @@ sub adaptor {
return $self->{'_adaptor'};
}
sub refresh {
my $self = shift;
return unless($self->adaptor);
$self->adaptor->refresh($self);
}
sub update {
my $self = shift;
return unless($self->adaptor);
......@@ -54,6 +60,24 @@ sub update_status {
$self->status($status);
}
sub decrease_hive_capacity {
my ($self) = @_;
return unless ($self->adaptor);
$self->adaptor->decrease_hive_capacity($self->analysis_id);
}
sub increase_hive_capacity {
my ($self) = @_;
return unless ($self->adaptor);
$self->adaptor->increase_hive_capacity($self->analysis_id);
}
sub get_running_worker_count {
my $self = shift;
return unless ($self->adaptor);
return $self->adaptor->get_running_worker_count($self);
}
sub analysis_id {
my $self = shift;
$self->{'_analysis_id'} = shift if(@_);
......@@ -91,6 +115,27 @@ sub avg_msec_per_job {
return $self->{'_avg_msec_per_job'};
}
sub avg_input_msec_per_job {
my $self = shift;
$self->{'_avg_input_msec_per_job'} = shift if(@_);
$self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'});
return $self->{'_avg_input_msec_per_job'};
}
sub avg_run_msec_per_job {
my $self = shift;
$self->{'_avg_run_msec_per_job'} = shift if(@_);
$self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'});
return $self->{'_avg_run_msec_per_job'};
}
sub avg_output_msec_per_job {
my $self = shift;
$self->{'_avg_output_msec_per_job'} = shift if(@_);
$self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'});
return $self->{'_avg_output_msec_per_job'};
}
sub cpu_minutes_remaining {
my $self = shift;
return ($self->avg_msec_per_job * $self->unclaimed_job_count / 60000);
......@@ -102,6 +147,24 @@ sub hive_capacity {
return $self->{'_hive_capacity'};
}
sub behaviour {
my $self = shift;
$self->{'_behaviour'} = shift if(@_);
return $self->{'_behaviour'};
}
sub input_capacity {
my $self = shift;
$self->{'_input_capacity'} = shift if(@_);
return $self->{'_input_capacity'};
}
sub output_capacity {
my $self = shift;
$self->{'_output_capacity'} = shift if(@_);
return $self->{'_output_capacity'};
}
sub total_job_count {
my $self = shift;
$self->{'_total_job_count'} = shift if(@_);
......@@ -156,6 +219,12 @@ sub remaining_job_count {
- $self->failed_job_count;
}
sub num_running_workers {
my $self = shift;
$self->{'_num_running_workers'} = shift if(@_);
return $self->{'_num_running_workers'};
}
sub num_required_workers {
my $self = shift;
$self->{'_num_required_workers'} = shift if(@_);
......
......@@ -500,7 +500,7 @@ sub claim_jobs_for_worker {
so they can be rerun.
Jobs in state CLAIMED as simply reset back to READY.
If jobs was in a 'working' state (GET_INPUT, RUN, WRITE_OUTPUT))
the retry_count is incremented and the status set back to READY.
the retry_count is increased and the status set back to READY.
If the retry_count >= $max_retry_count (3 by default) the job is set
to 'FAILED' and not rerun again.
Exceptions : $worker must be defined
......
......@@ -120,6 +120,32 @@ sub fetch_by_status {
return $results;
}
sub refresh {
my ($self, $stats) = @_;
my $constraint = "ast.analysis_id = " . $stats->analysis_id;
#return first element of _generic_fetch list
$stats = @{$self->_generic_fetch($constraint)};
return $stats;
}
sub get_running_worker_count {
my ($self, $stats) = @_;
my $sql = "SELECT count(*) FROM hive WHERE cause_of_death='' and analysis_id=?";
my $sth = $self->prepare($sql);
$sth->execute($stats->analysis_id);
my ($liveCount) = $sth->fetchrow_array();
$sth->finish;
return $liveCount;
}
#
# STORE / UPDATE METHODS
#
......@@ -139,9 +165,38 @@ sub fetch_by_status {
sub update {
my ($self, $stats) = @_;
my $running_worker_count = $self->get_running_worker_count($stats);
$stats->num_running_workers($running_worker_count);
my $hive_capacity = $stats->hive_capacity;
if ($stats->behaviour eq "DYNAMIC") {
my $max_hive_capacity = $hive_capacity;
if ($stats->avg_input_msec_per_job) {
$max_hive_capacity = int($stats->input_capacity * $stats->avg_msec_per_job / $stats->avg_input_msec_per_job);
}
if ($stats->avg_output_msec_per_job) {
my $max_hive_capacity2 = int($stats->output_capacity * $stats->avg_msec_per_job / $stats->avg_output_msec_per_job);
if ($max_hive_capacity2 < $max_hive_capacity) {
$max_hive_capacity = $max_hive_capacity2;
}
}
if (($hive_capacity > $max_hive_capacity) or ($hive_capacity < $max_hive_capacity )) {
if (abs($hive_capacity - $max_hive_capacity) > 2) {
$stats->hive_capacity(($hive_capacity + $max_hive_capacity) / 2);
} elsif ($hive_capacity > $max_hive_capacity) {
$stats->hive_capacity($hive_capacity - 1);
} elsif ($hive_capacity < $max_hive_capacity) {
$stats->hive_capacity($hive_capacity + 1);
}
}
}
my $sql = "UPDATE analysis_stats SET status='".$stats->status."' ";
$sql .= ",batch_size=" . $stats->batch_size();
$sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job();
$sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job();
$sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job();
$sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job();
$sql .= ",hive_capacity=" . $stats->hive_capacity();
$sql .= ",total_job_count=" . $stats->total_job_count();
$sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count();
......@@ -149,6 +204,7 @@ sub update {
$sql .= ",max_retry_count=" . $stats->max_retry_count();
$sql .= ",failed_job_count=" . $stats->failed_job_count();
$sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance();
$sql .= ",num_running_workers=" . $stats->num_running_workers();
$sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",last_update=NOW()";
$sql .= ",sync_lock=''";
......@@ -157,7 +213,11 @@ sub update {
my $sth = $self->prepare($sql);
$sth->execute();
$sth->finish;
$sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT now(), analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id);
$sth->execute();
$sth->finish;
$stats->seconds_since_last_update(0); #not exact but good enough :)
}
......@@ -189,19 +249,78 @@ sub update_status
sub interval_update_work_done
{
my ($self, $analysis_id, $job_count, $interval) = @_;
my ($self, $analysis_id, $job_count, $interval, $worker) = @_;
my $sql = "UPDATE analysis_stats SET ".
"unclaimed_job_count = unclaimed_job_count - $job_count, ".
"avg_msec_per_job = (((done_job_count*avg_msec_per_job/3) + $interval) / (done_job_count/3 + $job_count)), ".
"avg_msec_per_job = (((done_job_count*avg_msec_per_job)/3 + $interval) / (done_job_count/3 + $job_count)), ".
"avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/3 + ".
($worker->{fetch_time}).") / (done_job_count/3 + $job_count)), ".
"avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/3 + ".
($worker->{run_time}).") / (done_job_count/3 + $job_count)), ".
"avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/3 + ".
($worker->{write_time}).") / (done_job_count/3 + $job_count)), ".
"done_job_count = done_job_count + $job_count ".
"WHERE analysis_id= $analysis_id";
" WHERE analysis_id= $analysis_id";
$self->dbc->do($sql);
}
sub decrement_needed_workers
sub decrease_hive_capacity
{
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats ".
" SET hive_capacity = hive_capacity - 1, ".
" num_required_workers = IF(num_required_workers > 0, num_required_workers - 1, 0) ".
" WHERE analysis_id='$analysis_id' and hive_capacity > 1";
$self->dbc->do($sql);
}
sub increase_hive_capacity
{
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats ".
" SET hive_capacity = hive_capacity + 1, num_required_workers = 1".
" WHERE analysis_id='$analysis_id' and hive_capacity <= 500 and num_required_workers = 0";
$self->dbc->do($sql);
}
sub increase_running_workers
{
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ".
" WHERE analysis_id='$analysis_id'";
$self->dbc->do($sql);
}
sub decrease_running_workers
{
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
" WHERE analysis_id='$analysis_id'";
$self->dbc->do($sql);
}
sub decrease_needed_workers
{
my $self = shift;
my $analysis_id = shift;
......@@ -213,7 +332,7 @@ sub decrement_needed_workers
}
sub increment_needed_workers
sub increase_needed_workers
{
my $self = shift;
my $analysis_id = shift;
......@@ -311,13 +430,20 @@ sub _columns {
ast.status
ast.batch_size
ast.avg_msec_per_job
ast.avg_input_msec_per_job
ast.avg_run_msec_per_job
ast.avg_output_msec_per_job
ast.hive_capacity
ast.behaviour
ast.input_capacity
ast.output_capacity
ast.total_job_count
ast.unclaimed_job_count
ast.done_job_count
ast.max_retry_count
ast.failed_job_count
ast.failed_job_tolerance
ast.num_running_workers
ast.num_required_workers
ast.last_update
ast.sync_lock
......@@ -342,21 +468,28 @@ sub _objs_from_sth {
$analStats->sync_lock($column{'sync_lock'});
$analStats->batch_size($column{'batch_size'});
$analStats->avg_msec_per_job($column{'avg_msec_per_job'});
$analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
$analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'});
$analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'});
$analStats->hive_capacity($column{'hive_capacity'});
$analStats->behaviour($column{'behaviour'});
$analStats->input_capacity($column{'input_capacity'});
$analStats->output_capacity($column{'output_capacity'});
$analStats->total_job_count($column{'total_job_count'});
$analStats->unclaimed_job_count($column{'unclaimed_job_count'});
$analStats->done_job_count($column{'done_job_count'});
$analStats->max_retry_count($column{'max_retry_count'});
$analStats->failed_job_count($column{'failed_job_count'});
$analStats->failed_job_tolerance($column{'failed_job_tolerance'});
$analStats->num_running_workers($column{'num_running_workers'});
$analStats->num_required_workers($column{'num_required_workers'});
$analStats->seconds_since_last_update($column{'seconds_since_last_update'});
$analStats->adaptor($self);
push @statsArray, $analStats;
push @statsArray, $analStats;
}
$sth->finish;
return \@statsArray
}
......
......@@ -121,7 +121,8 @@ sub create_new_worker {
#go into autonomous mode
return undef if($self->get_hive_current_load() >= 1.1);
$analStatsDBA->decrement_needed_workers($analysisStats->analysis_id);
$analStatsDBA->decrease_needed_workers($analysisStats->analysis_id);
$analStatsDBA->increase_running_workers($analysisStats->analysis_id);
$analysisStats->print_stats;
if($analysisStats->status eq 'BLOCKED') {
......@@ -168,8 +169,13 @@ sub register_worker_death {
# if called without a defined cause_of_death, assume catastrophic failure
$worker->cause_of_death('FATALITY') unless(defined($worker->cause_of_death));
unless ($worker->cause_of_death() eq "HIVE_OVERLOAD") {
## HIVE_OVERLOAD occurs after a successful update of the analysis_stats teble. (c.f. Worker.pm)
$worker->analysis->stats->adaptor->decrease_running_workers($worker->analysis->stats->analysis_id);
}
my $sql = "UPDATE hive SET died=now(), last_check_in=now()";
$sql .= " ,status='DEAD'";
$sql .= " ,work_done='" . $worker->work_done . "'";
$sql .= " ,cause_of_death='". $worker->cause_of_death ."'";
$sql .= " WHERE hive_id='" . $worker->hive_id ."'";
......@@ -190,7 +196,7 @@ sub register_worker_death {
if($self->safe_synchronize_AnalysisStats($worker->analysis->stats)->status ne 'DONE') {
# since I'm dying I should make sure there is someone to take my place after I'm gone ...
# above synch still sees me as a 'living worker' so I need to compensate for that
$self->db->get_AnalysisStatsAdaptor->increment_needed_workers($worker->analysis->dbID);
$self->db->get_AnalysisStatsAdaptor->increase_needed_workers($worker->analysis->dbID);
}
}
......@@ -433,6 +439,7 @@ sub synchronize_AnalysisStats {
return $analysisStats unless($analysisStats);
return $analysisStats unless($analysisStats->analysis_id);
$analysisStats->refresh(); ## Need to get the new hive_capacity for dynamic analyses
$analysisStats->total_job_count(0);
$analysisStats->unclaimed_job_count(0);
$analysisStats->done_job_count(0);
......@@ -444,7 +451,10 @@ sub synchronize_AnalysisStats {
my $sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id);
my $hive_capacity = $analysisStats->hive_capacity;
while (my ($status, $count)=$sth->fetchrow_array()) {
# print STDERR "$status - $count\n";
my $total = $analysisStats->total_job_count();
$analysisStats->total_job_count($total + $count);
......@@ -466,13 +476,13 @@ sub synchronize_AnalysisStats {
}
$analysisStats->num_required_workers($numWorkers);
}
if($status eq 'DONE') { $analysisStats->done_job_count($count); }
if($status eq 'FAILED') { $analysisStats->failed_job_count($count); }
if ($status eq 'DONE') { $analysisStats->done_job_count($count); }
if ($status eq 'FAILED') { $analysisStats->failed_job_count($count); }
}
$sth->finish;
$self->check_blocking_control_rules_for_AnalysisStats($analysisStats);
if($analysisStats->status ne 'BLOCKED') {
$analysisStats->determine_status();
}
......@@ -482,11 +492,7 @@ sub synchronize_AnalysisStats {
#
if($analysisStats->hive_capacity > 0) {
my $sql = "SELECT count(*) FROM hive WHERE cause_of_death='' and analysis_id=?";
$sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id);
my($liveCount)=$sth->fetchrow_array();
$sth->finish;
my $liveCount = $analysisStats->get_running_worker_count();
my $numWorkers = $analysisStats->num_required_workers;
......@@ -496,9 +502,9 @@ sub synchronize_AnalysisStats {
$analysisStats->num_required_workers($numWorkers);
}
$analysisStats->update; #update and release sync_lock
return $analysisStats;
}
......@@ -596,6 +602,12 @@ sub get_num_running_workers {
return $runningCount;
}
sub enter_status {
my ($self, $worker, $status) = @_;
$self->dbc->do("UPDATE hive SET status = '$status' WHERE hive_id = ".$worker->hive_id);
}
=head2 get_num_needed_workers
Arg[1] : Bio::EnsEMBL::Analysis object (optional)
......@@ -908,6 +920,7 @@ sub _columns {
h.host
h.process_id
h.work_done
h.status
h.born
h.last_check_in
h.died
......@@ -932,6 +945,7 @@ sub _objs_from_sth {
$worker->host($column{'host'});
$worker->process_id($column{'process_id'});
$worker->work_done($column{'work_done'});
$worker->status($column{'status'});
$worker->born($column{'born'});
$worker->last_check_in($column{'last_check_in'});
$worker->died($column{'died'});
......
......@@ -66,9 +66,21 @@ our @ISA = qw(Bio::EnsEMBL::Hive::Process);
sub fetch_input {
my $self = shift;
# Initialise values
$self->divisor(2);
$self->value(1);
$self->time_fetching(0);
$self->time_running(0);
$self->time_writting(0);
# Read parameters and input
$self->get_params($self->parameters);
$self->get_params($self->input_id);
# Sleep as required
sleep($self->time_fetching);
return 1;
}
......@@ -82,6 +94,11 @@ sub fetch_input {
sub run
{
my $self = shift;
# Sleep as required
sleep($self->time_running);
# Fail if modulus of $value and $divisor is 0
my $divisor = $self->divisor();
my $value = $self->value();
if (!$divisor or !defined($value)) {
......@@ -89,6 +106,7 @@ sub run
} elsif ($value % $divisor == 0) {
die "$value % $divisor is 0 => die!\n";
}
return 1;
}
......@@ -101,6 +119,10 @@ sub run
sub write_output {
my $self = shift;
# Sleep as required
sleep($self->time_writting);
return 1;
}
......@@ -149,6 +171,72 @@ sub value {
}
=head2 time_fetching
Arg [1] : (optional) $time_fetching
Example : $object->time_fetching($time_fetching);
Example : $time_fetching = $object->time_fetching();
Description : Getter/setter for the time_fetching attribute
Returntype :
Exceptions : none
Caller : general
Status : Stable
=cut
sub time_fetching {
my $self = shift;
if (@_) {
$self->{_time_fetching} = shift;
}
return $self->{_time_fetching};
}
=head2 time_running
Arg [1] : (optional) $time_running
Example : $object->time_running($time_running);
Example : $time_running = $object->time_running();
Description : Getter/setter for the time_running attribute
Returntype :
Exceptions : none
Caller : general
Status : Stable
=cut
sub time_running {
my $self = shift;
if (@_) {
$self->{_time_running} = shift;
}
return $self->{_time_running};
}
=head2 time_writting
Arg [1] : (optional) $time_writting
Example : $object->time_writting($time_writting);
Example : $time_writting = $object->time_writting();
Description : Getter/setter for the time_writting attribute
Returntype :
Exceptions : none
Caller : general
Status : Stable
=cut
sub time_writting {
my $self = shift;
if (@_) {
$self->{_time_writting} = shift;
}
return $self->{_time_writting};
}
=head2 get_params
=cut
......@@ -158,7 +246,7 @@ sub get_params {
my $param_string = shift;
return unless($param_string);
print("parsing parameter string : ",$param_string,"\n");
# print("parsing parameter string : ",$param_string,"\n");
my $params = eval($param_string);
return unless($params);
......@@ -169,6 +257,15 @@ sub get_params {
if(defined($params->{'value'})) {
$self->value($params->{'value'});
}
if(defined($params->{'time_fetching'})) {
$self->time_fetching($params->{'time_fetching'});
}
if(defined($params->{'time_running'})) {
$self->time_running($params->{'time_running'});