diff --git a/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm b/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm index 597fb3e49ce8790439ffd09550949c8682d39d3a..77093454ab8fd123b4ccff13de5b7db4ea4d2834 100755 --- a/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm +++ b/modules/Bio/EnsEMBL/Hive/AnalysisStats.pm @@ -41,6 +41,12 @@ sub adaptor { return $self->{'_adaptor'}; } +sub refresh { + my $self = shift; + return unless($self->adaptor); + $self->adaptor->refresh($self); +} + sub update { my $self = shift; return unless($self->adaptor); @@ -54,6 +60,24 @@ sub update_status { $self->status($status); } +sub decrease_hive_capacity { + my ($self) = @_; + return unless ($self->adaptor); + $self->adaptor->decrease_hive_capacity($self->analysis_id); +} + +sub increase_hive_capacity { + my ($self) = @_; + return unless ($self->adaptor); + $self->adaptor->increase_hive_capacity($self->analysis_id); +} + +sub get_running_worker_count { + my $self = shift; + return unless ($self->adaptor); + return $self->adaptor->get_running_worker_count($self); +} + sub analysis_id { my $self = shift; $self->{'_analysis_id'} = shift if(@_); @@ -91,6 +115,27 @@ sub avg_msec_per_job { return $self->{'_avg_msec_per_job'}; } +sub avg_input_msec_per_job { + my $self = shift; + $self->{'_avg_input_msec_per_job'} = shift if(@_); + $self->{'_avg_input_msec_per_job'}=0 unless($self->{'_avg_input_msec_per_job'}); + return $self->{'_avg_input_msec_per_job'}; +} + +sub avg_run_msec_per_job { + my $self = shift; + $self->{'_avg_run_msec_per_job'} = shift if(@_); + $self->{'_avg_run_msec_per_job'}=0 unless($self->{'_avg_run_msec_per_job'}); + return $self->{'_avg_run_msec_per_job'}; +} + +sub avg_output_msec_per_job { + my $self = shift; + $self->{'_avg_output_msec_per_job'} = shift if(@_); + $self->{'_avg_output_msec_per_job'}=0 unless($self->{'_avg_output_msec_per_job'}); + return $self->{'_avg_output_msec_per_job'}; +} + sub cpu_minutes_remaining { my $self = shift; return ($self->avg_msec_per_job * $self->unclaimed_job_count / 60000); @@ -102,6 +147,24 @@ sub hive_capacity { return $self->{'_hive_capacity'}; } +sub behaviour { + my $self = shift; + $self->{'_behaviour'} = shift if(@_); + return $self->{'_behaviour'}; +} + +sub input_capacity { + my $self = shift; + $self->{'_input_capacity'} = shift if(@_); + return $self->{'_input_capacity'}; +} + +sub output_capacity { + my $self = shift; + $self->{'_output_capacity'} = shift if(@_); + return $self->{'_output_capacity'}; +} + sub total_job_count { my $self = shift; $self->{'_total_job_count'} = shift if(@_); @@ -156,6 +219,12 @@ sub remaining_job_count { - $self->failed_job_count; } +sub num_running_workers { + my $self = shift; + $self->{'_num_running_workers'} = shift if(@_); + return $self->{'_num_running_workers'}; +} + sub num_required_workers { my $self = shift; $self->{'_num_required_workers'} = shift if(@_); diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm index c17b298553041c42bcacb0c03a7656f57cb491cb..aa8aaf81a811179776b13dde54182133e1bd80f2 100644 --- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm +++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm @@ -500,7 +500,7 @@ sub claim_jobs_for_worker { so they can be rerun. Jobs in state CLAIMED as simply reset back to READY. If jobs was in a 'working' state (GET_INPUT, RUN, WRITE_OUTPUT)) - the retry_count is incremented and the status set back to READY. + the retry_count is increased and the status set back to READY. If the retry_count >= $max_retry_count (3 by default) the job is set to 'FAILED' and not rerun again. Exceptions : $worker must be defined diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm index 796239fe5cc7442beb95c7d4f87fa7dff899da54..02c59cd368f4547bff2d90f5fb10c76ee7690361 100644 --- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm +++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisStatsAdaptor.pm @@ -120,6 +120,32 @@ sub fetch_by_status { return $results; } + +sub refresh { + my ($self, $stats) = @_; + + my $constraint = "ast.analysis_id = " . $stats->analysis_id; + + #return first element of _generic_fetch list + $stats = @{$self->_generic_fetch($constraint)}; + + return $stats; +} + + +sub get_running_worker_count { + my ($self, $stats) = @_; + + my $sql = "SELECT count(*) FROM hive WHERE cause_of_death='' and analysis_id=?"; + my $sth = $self->prepare($sql); + $sth->execute($stats->analysis_id); + my ($liveCount) = $sth->fetchrow_array(); + $sth->finish; + + return $liveCount; +} + + # # STORE / UPDATE METHODS # @@ -139,9 +165,38 @@ sub fetch_by_status { sub update { my ($self, $stats) = @_; + my $running_worker_count = $self->get_running_worker_count($stats); + $stats->num_running_workers($running_worker_count); + my $hive_capacity = $stats->hive_capacity; + + if ($stats->behaviour eq "DYNAMIC") { + my $max_hive_capacity = $hive_capacity; + if ($stats->avg_input_msec_per_job) { + $max_hive_capacity = int($stats->input_capacity * $stats->avg_msec_per_job / $stats->avg_input_msec_per_job); + } + if ($stats->avg_output_msec_per_job) { + my $max_hive_capacity2 = int($stats->output_capacity * $stats->avg_msec_per_job / $stats->avg_output_msec_per_job); + if ($max_hive_capacity2 < $max_hive_capacity) { + $max_hive_capacity = $max_hive_capacity2; + } + } + if (($hive_capacity > $max_hive_capacity) or ($hive_capacity < $max_hive_capacity )) { + if (abs($hive_capacity - $max_hive_capacity) > 2) { + $stats->hive_capacity(($hive_capacity + $max_hive_capacity) / 2); + } elsif ($hive_capacity > $max_hive_capacity) { + $stats->hive_capacity($hive_capacity - 1); + } elsif ($hive_capacity < $max_hive_capacity) { + $stats->hive_capacity($hive_capacity + 1); + } + } + } + my $sql = "UPDATE analysis_stats SET status='".$stats->status."' "; $sql .= ",batch_size=" . $stats->batch_size(); $sql .= ",avg_msec_per_job=" . $stats->avg_msec_per_job(); + $sql .= ",avg_input_msec_per_job=" . $stats->avg_input_msec_per_job(); + $sql .= ",avg_run_msec_per_job=" . $stats->avg_run_msec_per_job(); + $sql .= ",avg_output_msec_per_job=" . $stats->avg_output_msec_per_job(); $sql .= ",hive_capacity=" . $stats->hive_capacity(); $sql .= ",total_job_count=" . $stats->total_job_count(); $sql .= ",unclaimed_job_count=" . $stats->unclaimed_job_count(); @@ -149,6 +204,7 @@ sub update { $sql .= ",max_retry_count=" . $stats->max_retry_count(); $sql .= ",failed_job_count=" . $stats->failed_job_count(); $sql .= ",failed_job_tolerance=" . $stats->failed_job_tolerance(); + $sql .= ",num_running_workers=" . $stats->num_running_workers(); $sql .= ",num_required_workers=" . $stats->num_required_workers(); $sql .= ",last_update=NOW()"; $sql .= ",sync_lock=''"; @@ -157,7 +213,11 @@ sub update { my $sth = $self->prepare($sql); $sth->execute(); $sth->finish; + $sth = $self->prepare("INSERT INTO analysis_stats_monitor SELECT now(), analysis_stats.* from analysis_stats WHERE analysis_id = ".$stats->analysis_id); + $sth->execute(); + $sth->finish; $stats->seconds_since_last_update(0); #not exact but good enough :) + } @@ -189,19 +249,78 @@ sub update_status sub interval_update_work_done { - my ($self, $analysis_id, $job_count, $interval) = @_; + my ($self, $analysis_id, $job_count, $interval, $worker) = @_; my $sql = "UPDATE analysis_stats SET ". "unclaimed_job_count = unclaimed_job_count - $job_count, ". - "avg_msec_per_job = (((done_job_count*avg_msec_per_job/3) + $interval) / (done_job_count/3 + $job_count)), ". + "avg_msec_per_job = (((done_job_count*avg_msec_per_job)/3 + $interval) / (done_job_count/3 + $job_count)), ". + "avg_input_msec_per_job = (((done_job_count*avg_input_msec_per_job)/3 + ". + ($worker->{fetch_time}).") / (done_job_count/3 + $job_count)), ". + "avg_run_msec_per_job = (((done_job_count*avg_run_msec_per_job)/3 + ". + ($worker->{run_time}).") / (done_job_count/3 + $job_count)), ". + "avg_output_msec_per_job = (((done_job_count*avg_output_msec_per_job)/3 + ". + ($worker->{write_time}).") / (done_job_count/3 + $job_count)), ". "done_job_count = done_job_count + $job_count ". - "WHERE analysis_id= $analysis_id"; - + " WHERE analysis_id= $analysis_id"; + $self->dbc->do($sql); } -sub decrement_needed_workers + + +sub decrease_hive_capacity +{ + my $self = shift; + my $analysis_id = shift; + + my $sql = "UPDATE analysis_stats ". + " SET hive_capacity = hive_capacity - 1, ". + " num_required_workers = IF(num_required_workers > 0, num_required_workers - 1, 0) ". + " WHERE analysis_id='$analysis_id' and hive_capacity > 1"; + + $self->dbc->do($sql); +} + + +sub increase_hive_capacity +{ + my $self = shift; + my $analysis_id = shift; + + my $sql = "UPDATE analysis_stats ". + " SET hive_capacity = hive_capacity + 1, num_required_workers = 1". + " WHERE analysis_id='$analysis_id' and hive_capacity <= 500 and num_required_workers = 0"; + + $self->dbc->do($sql); +} + + +sub increase_running_workers +{ + my $self = shift; + my $analysis_id = shift; + + my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ". + " WHERE analysis_id='$analysis_id'"; + + $self->dbc->do($sql); +} + + +sub decrease_running_workers +{ + my $self = shift; + my $analysis_id = shift; + + my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ". + " WHERE analysis_id='$analysis_id'"; + + $self->dbc->do($sql); +} + + +sub decrease_needed_workers { my $self = shift; my $analysis_id = shift; @@ -213,7 +332,7 @@ sub decrement_needed_workers } -sub increment_needed_workers +sub increase_needed_workers { my $self = shift; my $analysis_id = shift; @@ -311,13 +430,20 @@ sub _columns { ast.status ast.batch_size ast.avg_msec_per_job + ast.avg_input_msec_per_job + ast.avg_run_msec_per_job + ast.avg_output_msec_per_job ast.hive_capacity + ast.behaviour + ast.input_capacity + ast.output_capacity ast.total_job_count ast.unclaimed_job_count ast.done_job_count ast.max_retry_count ast.failed_job_count ast.failed_job_tolerance + ast.num_running_workers ast.num_required_workers ast.last_update ast.sync_lock @@ -342,21 +468,28 @@ sub _objs_from_sth { $analStats->sync_lock($column{'sync_lock'}); $analStats->batch_size($column{'batch_size'}); $analStats->avg_msec_per_job($column{'avg_msec_per_job'}); + $analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'}); + $analStats->avg_run_msec_per_job($column{'avg_run_msec_per_job'}); + $analStats->avg_output_msec_per_job($column{'avg_output_msec_per_job'}); $analStats->hive_capacity($column{'hive_capacity'}); + $analStats->behaviour($column{'behaviour'}); + $analStats->input_capacity($column{'input_capacity'}); + $analStats->output_capacity($column{'output_capacity'}); $analStats->total_job_count($column{'total_job_count'}); $analStats->unclaimed_job_count($column{'unclaimed_job_count'}); $analStats->done_job_count($column{'done_job_count'}); $analStats->max_retry_count($column{'max_retry_count'}); $analStats->failed_job_count($column{'failed_job_count'}); $analStats->failed_job_tolerance($column{'failed_job_tolerance'}); + $analStats->num_running_workers($column{'num_running_workers'}); $analStats->num_required_workers($column{'num_required_workers'}); $analStats->seconds_since_last_update($column{'seconds_since_last_update'}); $analStats->adaptor($self); - push @statsArray, $analStats; + push @statsArray, $analStats; } $sth->finish; - + return \@statsArray } diff --git a/modules/Bio/EnsEMBL/Hive/Queen.pm b/modules/Bio/EnsEMBL/Hive/Queen.pm index 5948267a8604938c325f8a4a96bd3e5b3ff2b728..3503de38f8caa90ee30cc8ab2e6f7857c57ca238 100755 --- a/modules/Bio/EnsEMBL/Hive/Queen.pm +++ b/modules/Bio/EnsEMBL/Hive/Queen.pm @@ -121,7 +121,8 @@ sub create_new_worker { #go into autonomous mode return undef if($self->get_hive_current_load() >= 1.1); - $analStatsDBA->decrement_needed_workers($analysisStats->analysis_id); + $analStatsDBA->decrease_needed_workers($analysisStats->analysis_id); + $analStatsDBA->increase_running_workers($analysisStats->analysis_id); $analysisStats->print_stats; if($analysisStats->status eq 'BLOCKED') { @@ -168,8 +169,13 @@ sub register_worker_death { # if called without a defined cause_of_death, assume catastrophic failure $worker->cause_of_death('FATALITY') unless(defined($worker->cause_of_death)); - + unless ($worker->cause_of_death() eq "HIVE_OVERLOAD") { + ## HIVE_OVERLOAD occurs after a successful update of the analysis_stats teble. (c.f. Worker.pm) + $worker->analysis->stats->adaptor->decrease_running_workers($worker->analysis->stats->analysis_id); + } + my $sql = "UPDATE hive SET died=now(), last_check_in=now()"; + $sql .= " ,status='DEAD'"; $sql .= " ,work_done='" . $worker->work_done . "'"; $sql .= " ,cause_of_death='". $worker->cause_of_death ."'"; $sql .= " WHERE hive_id='" . $worker->hive_id ."'"; @@ -190,7 +196,7 @@ sub register_worker_death { if($self->safe_synchronize_AnalysisStats($worker->analysis->stats)->status ne 'DONE') { # since I'm dying I should make sure there is someone to take my place after I'm gone ... # above synch still sees me as a 'living worker' so I need to compensate for that - $self->db->get_AnalysisStatsAdaptor->increment_needed_workers($worker->analysis->dbID); + $self->db->get_AnalysisStatsAdaptor->increase_needed_workers($worker->analysis->dbID); } } @@ -433,6 +439,7 @@ sub synchronize_AnalysisStats { return $analysisStats unless($analysisStats); return $analysisStats unless($analysisStats->analysis_id); + $analysisStats->refresh(); ## Need to get the new hive_capacity for dynamic analyses $analysisStats->total_job_count(0); $analysisStats->unclaimed_job_count(0); $analysisStats->done_job_count(0); @@ -444,7 +451,10 @@ sub synchronize_AnalysisStats { my $sth = $self->prepare($sql); $sth->execute($analysisStats->analysis_id); + my $hive_capacity = $analysisStats->hive_capacity; + while (my ($status, $count)=$sth->fetchrow_array()) { +# print STDERR "$status - $count\n"; my $total = $analysisStats->total_job_count(); $analysisStats->total_job_count($total + $count); @@ -466,13 +476,13 @@ sub synchronize_AnalysisStats { } $analysisStats->num_required_workers($numWorkers); } - if($status eq 'DONE') { $analysisStats->done_job_count($count); } - if($status eq 'FAILED') { $analysisStats->failed_job_count($count); } + if ($status eq 'DONE') { $analysisStats->done_job_count($count); } + if ($status eq 'FAILED') { $analysisStats->failed_job_count($count); } } $sth->finish; $self->check_blocking_control_rules_for_AnalysisStats($analysisStats); - + if($analysisStats->status ne 'BLOCKED') { $analysisStats->determine_status(); } @@ -482,11 +492,7 @@ sub synchronize_AnalysisStats { # if($analysisStats->hive_capacity > 0) { - my $sql = "SELECT count(*) FROM hive WHERE cause_of_death='' and analysis_id=?"; - $sth = $self->prepare($sql); - $sth->execute($analysisStats->analysis_id); - my($liveCount)=$sth->fetchrow_array(); - $sth->finish; + my $liveCount = $analysisStats->get_running_worker_count(); my $numWorkers = $analysisStats->num_required_workers; @@ -496,9 +502,9 @@ sub synchronize_AnalysisStats { $analysisStats->num_required_workers($numWorkers); } - + $analysisStats->update; #update and release sync_lock - + return $analysisStats; } @@ -596,6 +602,12 @@ sub get_num_running_workers { return $runningCount; } +sub enter_status { + my ($self, $worker, $status) = @_; + + $self->dbc->do("UPDATE hive SET status = '$status' WHERE hive_id = ".$worker->hive_id); +} + =head2 get_num_needed_workers Arg[1] : Bio::EnsEMBL::Analysis object (optional) @@ -908,6 +920,7 @@ sub _columns { h.host h.process_id h.work_done + h.status h.born h.last_check_in h.died @@ -932,6 +945,7 @@ sub _objs_from_sth { $worker->host($column{'host'}); $worker->process_id($column{'process_id'}); $worker->work_done($column{'work_done'}); + $worker->status($column{'status'}); $worker->born($column{'born'}); $worker->last_check_in($column{'last_check_in'}); $worker->died($column{'died'}); diff --git a/modules/Bio/EnsEMBL/Hive/RunnableDB/Test.pm b/modules/Bio/EnsEMBL/Hive/RunnableDB/Test.pm index b37a5f8ab62618a8e94540497c55c3ebfff47b7b..f1004f5fa32548953826adb7db56ef00b5c0ab2a 100644 --- a/modules/Bio/EnsEMBL/Hive/RunnableDB/Test.pm +++ b/modules/Bio/EnsEMBL/Hive/RunnableDB/Test.pm @@ -66,9 +66,21 @@ our @ISA = qw(Bio::EnsEMBL::Hive::Process); sub fetch_input { my $self = shift; + + # Initialise values + $self->divisor(2); + $self->value(1); + $self->time_fetching(0); + $self->time_running(0); + $self->time_writting(0); + + # Read parameters and input $self->get_params($self->parameters); $self->get_params($self->input_id); + # Sleep as required + sleep($self->time_fetching); + return 1; } @@ -82,6 +94,11 @@ sub fetch_input { sub run { my $self = shift; + + # Sleep as required + sleep($self->time_running); + + # Fail if modulus of $value and $divisor is 0 my $divisor = $self->divisor(); my $value = $self->value(); if (!$divisor or !defined($value)) { @@ -89,6 +106,7 @@ sub run } elsif ($value % $divisor == 0) { die "$value % $divisor is 0 => die!\n"; } + return 1; } @@ -101,6 +119,10 @@ sub run sub write_output { my $self = shift; + + # Sleep as required + sleep($self->time_writting); + return 1; } @@ -149,6 +171,72 @@ sub value { } +=head2 time_fetching + + Arg [1] : (optional) $time_fetching + Example : $object->time_fetching($time_fetching); + Example : $time_fetching = $object->time_fetching(); + Description : Getter/setter for the time_fetching attribute + Returntype : + Exceptions : none + Caller : general + Status : Stable + +=cut + +sub time_fetching { + my $self = shift; + if (@_) { + $self->{_time_fetching} = shift; + } + return $self->{_time_fetching}; +} + + +=head2 time_running + + Arg [1] : (optional) $time_running + Example : $object->time_running($time_running); + Example : $time_running = $object->time_running(); + Description : Getter/setter for the time_running attribute + Returntype : + Exceptions : none + Caller : general + Status : Stable + +=cut + +sub time_running { + my $self = shift; + if (@_) { + $self->{_time_running} = shift; + } + return $self->{_time_running}; +} + + +=head2 time_writting + + Arg [1] : (optional) $time_writting + Example : $object->time_writting($time_writting); + Example : $time_writting = $object->time_writting(); + Description : Getter/setter for the time_writting attribute + Returntype : + Exceptions : none + Caller : general + Status : Stable + +=cut + +sub time_writting { + my $self = shift; + if (@_) { + $self->{_time_writting} = shift; + } + return $self->{_time_writting}; +} + + =head2 get_params =cut @@ -158,7 +246,7 @@ sub get_params { my $param_string = shift; return unless($param_string); - print("parsing parameter string : ",$param_string,"\n"); +# print("parsing parameter string : ",$param_string,"\n"); my $params = eval($param_string); return unless($params); @@ -169,6 +257,15 @@ sub get_params { if(defined($params->{'value'})) { $self->value($params->{'value'}); } + if(defined($params->{'time_fetching'})) { + $self->time_fetching($params->{'time_fetching'}); + } + if(defined($params->{'time_running'})) { + $self->time_running($params->{'time_running'}); + } + if(defined($params->{'time_writting'})) { + $self->time_writting($params->{'time_writting'}); + } } 1; diff --git a/modules/Bio/EnsEMBL/Hive/Worker.pm b/modules/Bio/EnsEMBL/Hive/Worker.pm index 56cbc4a6d910eb7a901258aa346de65dfb86a03b..cb283c5bc89803b9f207c6daec2e56e1a41c989e 100755 --- a/modules/Bio/EnsEMBL/Hive/Worker.pm +++ b/modules/Bio/EnsEMBL/Hive/Worker.pm @@ -75,6 +75,7 @@ use Bio::EnsEMBL::Utils::Argument; use Bio::EnsEMBL::Utils::Exception; use Sys::Hostname; use Time::HiRes qw(time); +use POSIX; use Bio::EnsEMBL::Analysis; use Bio::EnsEMBL::DBSQL::DBAdaptor; @@ -84,6 +85,11 @@ use Bio::EnsEMBL::Hive::DBSQL::DataflowRuleAdaptor; use Bio::EnsEMBL::Hive::Extensions; use Bio::EnsEMBL::Hive::Process; +## Minimum amount of time in msec that a worker should run before reporting +## back to the hive. This is used when setting the batch_size automatically. +## 120000 msec = 2 minutes +my $MIN_BATCH_TIME = 120000; + sub new { my ($class,@args) = @_; my $self = bless {}, $class; @@ -224,6 +230,12 @@ sub cause_of_death { return $self->{'_cause_of_death'}; } +sub status { + my( $self, $value ) = @_; + $self->{'_status'} = $value if($value); + return $self->{'_status'}; +} + sub born { my( $self, $value ) = @_; $self->{'_born'} = $value if($value); @@ -357,7 +369,7 @@ sub batch_size { } if(($batch_size <= 0) and ($stats->avg_msec_per_job)) { - $batch_size = int(120000 / $stats->avg_msec_per_job); # num jobs in 120 secs + $batch_size = POSIX::ceil($MIN_BATCH_TIME / $stats->avg_msec_per_job); # num jobs in $MIN_BATCH_TIME msecs } $batch_size = 1 if($batch_size < 1); # make sure we grab at least one job @@ -408,44 +420,53 @@ sub run $self->db->dbc->disconnect_when_inactive(0); - my $alive=1; - while($alive) { + my $alive = 1; + while ($alive) { my $batch_start = time() * 1000; - + my $batch_end = $batch_start; + my $job_counter = 0; my $jobs = []; - if($specific_job) { - $self->queen->worker_reclaim_job($self,$specific_job); - push @$jobs, $specific_job; - $alive=undef; - } else { - $jobs = $self->queen->worker_grab_jobs($self); - } + $self->{fetch_time} = 0; + $self->{run_time} = 0; + $self->{write_time} = 0; - $self->queen->worker_check_in($self); #will sync analysis_stats if needed + do { + if($specific_job) { + $self->queen->worker_reclaim_job($self,$specific_job); + push @$jobs, $specific_job; + $alive=undef; + } else { + $jobs = $self->queen->worker_grab_jobs($self); + } - $self->cause_of_death('NO_WORK') unless(scalar @{$jobs}); + $self->queen->worker_check_in($self); #will sync analysis_stats if needed - if($self->debug) { - $self->analysis->stats->print_stats; - print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n"); - } - - foreach my $job (@{$jobs}) { - $job->print_job if($self->debug); + $self->cause_of_death('NO_WORK') unless(scalar @{$jobs}); - $self->redirect_job_output($job); - $self->run_module_with_job($job); - $self->close_and_update_job_output($job); - - $self->queen->worker_register_job_done($self, $job); + if($self->debug) { + $self->analysis->stats->print_stats; + print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n"); + } + + foreach my $job (@{$jobs}) { + $job->print_job if($self->debug); + + $self->redirect_job_output($job); + $self->run_module_with_job($job); + $self->close_and_update_job_output($job); + + $self->queen->worker_register_job_done($self, $job); + + $self->{'_work_done'}++; + } + $batch_end = time() * 1000; + $job_counter += scalar(@$jobs); + } while (scalar(@$jobs) and $batch_end-$batch_start < $MIN_BATCH_TIME); ## Run for $MIN_BATCH_TIME at least - $self->{'_work_done'}++; - } - my $batch_end = time() * 1000; #printf("batch start:%f end:%f\n", $batch_start, $batch_end); - $self->db->get_AnalysisStatsAdaptor-> - interval_update_work_done($self->analysis->dbID, scalar(@$jobs), $batch_end-$batch_start); - + $self->db->get_AnalysisStatsAdaptor->interval_update_work_done($self->analysis->dbID, + $job_counter, $batch_end-$batch_start, $self); + $self->cause_of_death('JOB_LIMIT') if($specific_job); if($self->job_limit and ($self->{'_work_done'} >= $self->job_limit)) { @@ -455,11 +476,18 @@ sub run printf("life_span exhausted (alive for %d secs)\n", (time() - $self->{'start_time'})); $self->cause_of_death('LIFESPAN'); } - #unless($self->check_system_load()) { - # $self->cause_of_death('SYS_OVERLOAD'); - #} + + if (!$self->cause_of_death and $self->analysis->stats->num_running_workers > $self->analysis->stats->hive_capacity) { + my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ". + "WHERE num_running_workers > hive_capacity AND analysis_id = " . $self->analysis->stats->analysis_id; + my $row_count = $self->queen->dbc->do($sql); + if ($row_count == 1) { + $self->cause_of_death('HIVE_OVERLOAD'); + } + } if($self->cause_of_death) { $alive=undef; } } + $self->queen->dbc->do("UPDATE hive SET status = 'DEAD' WHERE hive_id = ".$self->hive_id); if($self->perform_global_cleanup) { #have runnable cleanup any global/process files/data it may have created @@ -489,11 +517,13 @@ sub run_module_with_job my $self = shift; my $job = shift; + my ($start_time, $end_time); + my $runObj = $self->analysis->process; return 0 unless($runObj); return 0 unless($job and ($job->hive_id eq $self->hive_id)); - my $start_time = time() * 1000; + my $init_time = time() * 1000; $self->queen->dbc->query_count(0); #pass the input_id from the job into the Process object @@ -506,26 +536,44 @@ sub run_module_with_job $runObj->input_id($job->input_id); $runObj->db($self->db); } - + + my $analysis_stats = $self->analysis->stats; + + $self->enter_status("GET_INPUT"); $job->update_status('GET_INPUT'); print("\nGET_INPUT\n") if($self->debug); + + $start_time = time() * 1000; $runObj->fetch_input; + $end_time = time() * 1000; + $self->{fetch_time} += $end_time - $start_time; + $self->enter_status("RUN"); $job->update_status('RUN'); print("\nRUN\n") if($self->debug); + + $start_time = time() * 1000; $runObj->run; + $end_time = time() * 1000; + $self->{run_time} += $end_time - $start_time; if($self->execute_writes) { + $self->enter_status("WRITE_OUTPUT"); $job->update_status('WRITE_OUTPUT'); print("\nWRITE_OUTPUT\n") if($self->debug); + + $start_time = time() * 1000; $runObj->write_output; + $end_time = time() * 1000; + $self->{write_time} += $end_time - $start_time; } else { print("\n\n!!!! NOT write_output\n\n\n") if($self->debug); } - + $self->enter_status("READY"); + $job->query_count($self->queen->dbc->query_count); - $job->runtime_msec(time()*1000 - $start_time); - + $job->runtime_msec(time()*1000 - $init_time); + if ($runObj->isa("Bio::EnsEMBL::Hive::Process") and $runObj->autoflow_inputjob and $self->execute_writes) { printf("AUTOFLOW input->output\n") if($self->debug); @@ -535,6 +583,10 @@ sub run_module_with_job return 1; } +sub enter_status { + my ($self, $status) = @_; + return $self->queen->enter_status($self, $status); +} sub redirect_job_output { diff --git a/sql/tables.sql b/sql/tables.sql index ded31694b21b5c03383c28c76da9ea97823d8914..834ce9121b51033a3f68c0a2b4cb0e99afc72683 100644 --- a/sql/tables.sql +++ b/sql/tables.sql @@ -18,11 +18,13 @@ CREATE TABLE hive ( host varchar(40) DEFAULT '' NOT NULL, process_id varchar(40) DEFAULT '' NOT NULL, work_done int(11) DEFAULT '0' NOT NULL, + status enum('READY','GET_INPUT','RUN','WRITE_OUTPUT','DEAD') DEFAULT 'READY' NOT NULL, born datetime NOT NULL, last_check_in datetime NOT NULL, died datetime DEFAULT NULL, - cause_of_death enum('', 'NO_WORK', 'JOB_LIMIT', 'LIFESPAN', 'FATALITY') DEFAULT '' NOT NULL, - PRIMARY KEY (hive_id) + cause_of_death enum('', 'NO_WORK', 'JOB_LIMIT', 'HIVE_OVERLOAD', 'LIFESPAN', 'FATALITY') DEFAULT '' NOT NULL, + PRIMARY KEY (hive_id), + INDEX analysis_status (analysis_id, status), ); @@ -206,14 +208,21 @@ CREATE TABLE analysis_stats ( status enum('BLOCKED', 'LOADING', 'SYNCHING', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') DEFAULT 'READY' NOT NULL, batch_size int(10) default 1 NOT NULL, - avg_msec_per_job int(10) default 0 NOT NULL, + avg_msec_per_job int(10) default 0 NOT NULL, + avg_input_msec_per_job int(10) default 0 NOT NULL, + avg_run_msec_per_job int(10) default 0 NOT NULL, + avg_output_msec_per_job int(10) default 0 NOT NULL, hive_capacity int(10) default 1 NOT NULL, + behaviour enum('STATIC', 'DYNAMIC') DEFAULT 'STATIC' NOT NULL, + input_capacity int(10) default 4 NOT NULL, + output_capacity int(10) default 4 NOT NULL, total_job_count int(10) NOT NULL, unclaimed_job_count int(10) NOT NULL, done_job_count int(10) NOT NULL, max_retry_count int(10) default 3 NOT NULL, failed_job_count int(10) NOT NULL, failed_job_tolerance int(10) default 0 NOT NULL, + num_running_workers int(10) default 0 NOT NULL, num_required_workers int(10) NOT NULL, last_update datetime NOT NULL, sync_lock int(10) default 0 NOT NULL, @@ -221,6 +230,32 @@ CREATE TABLE analysis_stats ( UNIQUE KEY (analysis_id) ); +CREATE TABLE analysis_stats_monitor ( + time datetime NOT NULL default '0000-00-00 00:00:00', + analysis_id int(10) NOT NULL, + status enum('BLOCKED', 'LOADING', 'SYNCHING', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED') + DEFAULT 'READY' NOT NULL, + batch_size int(10) default 1 NOT NULL, + avg_msec_per_job int(10) default 0 NOT NULL, + avg_input_msec_per_job int(10) default 0 NOT NULL, + avg_run_msec_per_job int(10) default 0 NOT NULL, + avg_output_msec_per_job int(10) default 0 NOT NULL, + hive_capacity int(10) default 1 NOT NULL, + behaviour enum('STATIC', 'DYNAMIC') DEFAULT 'STATIC' NOT NULL, + input_capacity int(10) default 4 NOT NULL, + output_capacity int(10) default 4 NOT NULL, + total_job_count int(10) NOT NULL, + unclaimed_job_count int(10) NOT NULL, + done_job_count int(10) NOT NULL, + max_retry_count int(10) default 3 NOT NULL, + failed_job_count int(10) NOT NULL, + failed_job_tolerance int(10) default 0 NOT NULL, + num_running_workers int(10) default 0 NOT NULL, + num_required_workers int(10) NOT NULL, + last_update datetime NOT NULL, + sync_lock int(10) default 0 NOT NULL +); + ------------------------------------------------------------------------------------ -- -- Table structure for table 'monitor'