Commit dbaad5bf authored by Leo Gordon's avatar Leo Gordon
Browse files

Meadow branch merged into trunk

parent 004f42af
......@@ -16,6 +16,23 @@ Summary:
Bio::EnsEMBL::Analysis::RunnableDB perl wrapper objects as nodes/blocks in
the graphs but could be adapted more generally.
13 July, 2009 : Leo Gordon
Merging the "Meadow" code from this March' development branch.
Because it separates LSF-specific code from higher level, it will be easier to update.
-------------------------------------------------------------------------------------------------------
Albert, sorry - in the process of merging into the development branch I had to remove your HIGHMEM code.
I hope it is a temporary measure and we will be having hive-wide queue control soon.
If not - you can restore the pre-merger state by updating with the following command:
cvs update -r lg4_pre_merger_20090713
('maximise_concurrency' option was carried over)
-------------------------------------------------------------------------------------------------------
3 April, 2009 : Albert Vilella
Added a new maximise_concurrency 1/0 option. When set to 1, it will
......
......@@ -482,12 +482,11 @@ sub claim_jobs_for_worker {
my $uuid = $ug->create();
my $claim = $ug->to_string( $uuid );
#print("claiming jobs for hive_id=", $worker->hive_id, " with uuid $claim\n");
my $status = 'READY';
$status = 'HIGHMEM' if (defined($worker->{HIGHMEM}));
my $sql_base = "UPDATE analysis_job SET job_claim='$claim'".
" , hive_id='". $worker->hive_id ."'".
" , status='CLAIMED'".
" WHERE job_claim='' and status='" . $status . "'".
" WHERE job_claim='' and status='READY'".
" AND analysis_id='" .$worker->analysis->dbID. "'";
my $sql_virgin = $sql_base .
......@@ -608,20 +607,6 @@ sub reset_dead_job_by_dbID {
#print(" done update BROKEN jobs\n");
}
sub reset_highmem_job_by_dbID {
my $self = shift;
my $job_id = shift;
#added hive_id index to analysis_job table which made this operation much faster
my $sql;
#first just reset the claimed jobs, these don't need a retry_count index increment
$sql = "UPDATE analysis_job SET job_claim='', status='HIGHMEM'".
" WHERE analysis_job_id=$job_id";
$self->dbc->do($sql);
#print(" done update CLAIMED\n");
}
=head2 reset_job_by_dbID
......
......@@ -93,20 +93,21 @@ sub fetch_by_needed_workers {
my $maximise_concurrency = shift;
my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')";
my $first_order_by;
my $order_by;
if ($maximise_concurrency) {
$first_order_by = 'ORDER BY num_running_workers';
# print STDERR "###> Maximising concurrency\n";
$order_by = 'ORDER BY num_running_workers';
} else {
$first_order_by = 'ORDER BY num_required_workers DESC';
$order_by = 'ORDER BY num_required_workers DESC';
}
$order_by .= ', hive_capacity DESC, analysis_id';
if($limit) {
$self->_final_clause("$first_order_by, hive_capacity DESC, analysis_id LIMIT $limit");
} else {
$self->_final_clause("$first_order_by, hive_capacity DESC, analysis_id");
$order_by .= " LIMIT $limit";
}
$self->_final_clause($order_by);
my $results = $self->_generic_fetch($constraint);
$self->_final_clause(""); #reset final clause for other fetches
$self->_final_clause(''); #reset final clause for other fetches
return $results;
}
......@@ -218,7 +219,7 @@ sub update {
$sql .= ",num_running_workers=" . $stats->num_running_workers();
$sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",last_update=NOW()";
$sql .= ",sync_lock=''";
$sql .= ",sync_lock='0'";
$sql .= " WHERE analysis_id='".$stats->analysis_id."' ";
my $sth = $self->prepare($sql);
......
......@@ -108,7 +108,6 @@ sub create_new_worker {
$analysis_id = $job->analysis_id if(defined($job));
my $analysisStats;
if($analysis_id) {
$analysisStats = $analStatsDBA->fetch_by_analysis_id($analysis_id);
$self->safe_synchronize_AnalysisStats($analysisStats);
......@@ -167,6 +166,7 @@ sub register_worker_death {
my ($self, $worker) = @_;
return unless($worker);
# if called without a defined cause_of_death, assume catastrophic failure
$worker->cause_of_death('FATALITY') unless(defined($worker->cause_of_death));
unless ($worker->cause_of_death() eq "HIVE_OVERLOAD") {
......@@ -352,6 +352,7 @@ sub fetch_failed_workers {
=head2 synchronize_hive
Arg [1] : $this_analysis (optional)
Example : $queen->synchronize_hive();
Description: Runs through all analyses in the system and synchronizes
the analysis_stats summary with the states in the analysis_job
......@@ -363,19 +364,22 @@ sub fetch_failed_workers {
=cut
sub synchronize_hive {
my $self = shift;
my $self = shift;
my $this_analysis = shift; # optional parameter
my $start_time = time();
my $allAnalysis = $self->db->get_AnalysisAdaptor->fetch_all;
print("analyze ", scalar(@$allAnalysis), "\n");
foreach my $analysis (@$allAnalysis) {
my $stats = $analysis->stats;
$self->synchronize_AnalysisStats($stats);
my $list_of_analyses = $this_analysis ? [$this_analysis] : $self->db->get_AnalysisAdaptor->fetch_all;
print "Synchronizing the hive (".scalar(@$list_of_analyses)." analyses this time) \n";
foreach my $analysis (@$list_of_analyses) {
$self->synchronize_AnalysisStats($analysis->stats);
}
foreach my $analysis (@$allAnalysis) {
foreach my $analysis (@$list_of_analyses) {
$self->check_blocking_control_rules_for_AnalysisStats($analysis->stats);
}
print((time() - $start_time), " secs to synchronize_hive\n");
}
......@@ -436,13 +440,6 @@ sub synchronize_AnalysisStats {
my $self = shift;
my $analysisStats = shift;
# Trying to make hive not synchronize if there is a high load in the
# server, e.g. during blasts (max 450 workers). The best thing I
# could find is the combination of these two numbers
if (($self->get_hive_current_load("silent") > 0.9) && $self->get_num_running_workers("silent") > 400) {
return $analysisStats;
}
return $analysisStats unless($analysisStats);
return $analysisStats unless($analysisStats->analysis_id);
......@@ -453,12 +450,8 @@ sub synchronize_AnalysisStats {
$analysisStats->failed_job_count(0);
$analysisStats->num_required_workers(0);
# my $sql = "SELECT status, count(*) FROM analysis_job ".
# "WHERE analysis_id=? GROUP BY status";
# This should be better in terms of performance
# http://www.mysqlperformanceblog.com/2007/08/16/how-much-overhead-is-caused-by-on-disk-temporary-tables/
my $sql = "SELECT status, count(status) FROM analysis_job ".
"WHERE analysis_id=? GROUP BY status ORDER BY NULL LIMIT 10";
my $sql = "SELECT status, count(*) FROM analysis_job ".
"WHERE analysis_id=? GROUP BY status";
my $sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id);
......@@ -588,8 +581,7 @@ sub get_num_failed_analyses
sub get_hive_current_load {
my $self = shift;
my $silent = shift;
my $sql = "SELECT /*! SQL_BUFFER_RESULT */ sum(1/analysis_stats.hive_capacity) FROM hive, analysis_stats ".
my $sql = "SELECT sum(1/analysis_stats.hive_capacity) FROM hive, analysis_stats ".
"WHERE hive.analysis_id=analysis_stats.analysis_id and cause_of_death ='' ".
"AND analysis_stats.hive_capacity>0";
my $sth = $self->prepare($sql);
......@@ -597,22 +589,20 @@ sub get_hive_current_load {
(my $load)=$sth->fetchrow_array();
$sth->finish;
$load=0 unless($load);
print("current hive load = $load\n") unless (defined($silent));
print("*") if ($silent eq 'silent');
print("current hive load = $load\n");
return $load;
}
sub get_num_running_workers {
my $self = shift;
my $silent = shift;
my $sql = "SELECT count(*) FROM hive WHERE cause_of_death =''";
my $sth = $self->prepare($sql);
$sth->execute();
(my $runningCount)=$sth->fetchrow_array();
$sth->finish;
$runningCount=0 unless($runningCount);
print("current hive num_running_workers = $runningCount\n") unless (defined($silent));
print("current hive num_running_workers = $runningCount\n");
return $runningCount;
}
......@@ -688,11 +678,36 @@ sub get_num_needed_workers {
return $numWorkers;
}
sub get_needed_workers_failed_analyses_resync_if_necessary {
my ($self, $this_analysis) = @_;
my $runCount = $self->get_num_running_workers();
my $load = $self->get_hive_current_load();
my $worker_count = $self->get_num_needed_workers($this_analysis);
my $failed_analyses = $self->get_num_failed_analyses($this_analysis);
if($load==0 and $worker_count==0 and $runCount==0) {
print "*** nothing is running and nothing to do => perform a hard resync\n" ;
$self->synchronize_hive($this_analysis);
$worker_count = $self->get_num_needed_workers($this_analysis);
$failed_analyses = $self->get_num_failed_analyses($this_analysis);
if($worker_count==0) {
if($failed_analyses==0) {
print "Nothing left to do".($this_analysis ? (' for analysis '.$this_analysis->logic_name) : '').". DONE!!\n\n";
}
}
}
return ($worker_count, $failed_analyses);
}
sub get_hive_progress
{
my $self = shift;
my $sql = "SELECT /*! SQL_BUFFER_RESULT */ sum(done_job_count), sum(failed_job_count), sum(total_job_count), ".
my $sql = "SELECT sum(done_job_count), sum(failed_job_count), sum(total_job_count), ".
"sum(unclaimed_job_count * analysis_stats.avg_msec_per_job)/1000/60/60 ".
"FROM analysis_stats";
my $sth = $self->prepare($sql);
......@@ -707,26 +722,24 @@ sub get_hive_progress
my $remaining = $total - $done - $failed;
printf("hive %1.3f%% complete (< %1.3f CPU_hrs) (%d todo + %d done + %d failed = %d total)\n",
$completed, $cpuhrs, $remaining, $done, $failed, $total);
return $done, $total, $cpuhrs;
return $remaining;
}
sub print_hive_status
{
my $self = shift;
$self->print_analysis_status;
$self->print_running_worker_status;
sub print_hive_status {
my ($self, $this_analysis) = @_;
$self->print_analysis_status($this_analysis);
$self->print_running_worker_status;
}
sub print_analysis_status
{
my $self = shift;
sub print_analysis_status {
my ($self, $this_analysis) = @_;
my $allStats = $self->db->get_AnalysisStatsAdaptor->fetch_all();
foreach my $analysis_stats (@{$allStats}) {
$analysis_stats->print_stats($self->{'verbose_stats'});
}
my $list_of_analyses = $this_analysis ? [$this_analysis] : $self->db->get_AnalysisAdaptor->fetch_all;
foreach my $analysis (sort {$a->dbID <=> $b->dbID} @$list_of_analyses) {
$analysis->stats->print_stats($self->{'verbose_stats'});
}
}
......@@ -779,6 +792,24 @@ sub monitor
$sth->execute();
}
=head2 register_all_workers_dead
Example : $queen->register_all_workers_dead();
Description: Registers all workers dead
Exceptions : none
Caller : beekeepers and other external processes
=cut
sub register_all_workers_dead {
my $self = shift;
my $overdueWorkers = $self->fetch_overdue_workers(0);
foreach my $worker (@{$overdueWorkers}) {
$self->register_worker_death($worker);
}
}
#
# INTERNAL METHODS
......
This diff is collapsed.
......@@ -30,6 +30,7 @@ $self->{'job_id'} = undef;
$self->{'debug'} = undef;
$self->{'analysis_job'} = undef;
$self->{'no_write'} = undef;
$self->{'maximise_concurrency'} = undef;
my $conf_file;
my ($help, $host, $user, $pass, $dbname, $port, $adaptor, $url);
......@@ -47,11 +48,9 @@ GetOptions('help' => \$help,
'job_id=i' => \$self->{'job_id'},
'analysis_id=i' => \$self->{'analysis_id'},
'logic_name=s' => \$self->{'logic_name'},
'batchsize=i' => \$self->{'batch_size'},
'batch_size=i' => \$self->{'batch_size'},
'limit=i' => \$self->{'job_limit'},
'lifespan=i' => \$self->{'lifespan'},
'maximise_concurrency=i' => \$self->{'maximise_concurrency'},
'highmem=i' => \$self->{'highmem'},
'outdir=s' => \$self->{'outdir'},
'bk=s' => \$self->{'beekeeper'},
'pid=s' => \$self->{'process_id'},
......@@ -63,6 +62,7 @@ GetOptions('help' => \$help,
'nowrite' => \$self->{'no_write'},
'regfile=s' => \$reg_conf,
'regname=s' => \$reg_alias,
'maximise_concurrency' => \$self->{'maximise_concurrency'},
);
$self->{'analysis_id'} = shift if(@_);
......@@ -105,7 +105,6 @@ unless($DBA and $DBA->isa("Bio::EnsEMBL::Hive::DBSQL::DBAdaptor")) {
my $queen = $DBA->get_Queen();
$queen->{maximise_concurrency} = 1 if ($self->{maximise_concurrency});
$queen->{highmem} = $self->{highmem} if ($self->{highmem});
################################
# LSF submit system dependency
......@@ -134,6 +133,7 @@ if($self->{'logic_name'}) {
}
$self->{'analysis_id'} = $analysis->dbID;
}
if($self->{'analysis_id'} and $self->{'input_id'}) {
$self->{'analysis_job'} = new Bio::EnsEMBL::Hive::AnalysisJob;
$self->{'analysis_job'}->input_id($self->{'input_id'});
......@@ -164,7 +164,6 @@ unless($worker) {
exit(0);
}
$worker->{HIGHMEM} = $self->{highmem} if($self->{highmem});
$worker->debug($self->{'debug'}) if($self->{'debug'});
if(defined($self->{'outdir'})) { $worker->output_dir($self->{'outdir'}); }
......@@ -235,7 +234,7 @@ sub usage {
print " -dbpass <pass> : mysql connection password\n";
print " -analysis_id <id> : analysis_id in db\n";
print " -logic_name <string> : logic_name of analysis to make this worker\n";
print " -batchsize <num> : #jobs to claim at a time\n";
print " -batch_size <num> : #jobs to claim at a time\n";
print " -limit <num> : #jobs to run before worker can die naturally\n";
print " -lifespan <num> : number of minutes this worker is allowed to run\n";
print " -outdir <path> : directory where stdout/stderr is redirected\n";
......
......@@ -19,7 +19,7 @@ CREATE TABLE hive (
host varchar(40) DEFAULT '' NOT NULL,
process_id varchar(40) DEFAULT '' NOT NULL,
work_done int(11) DEFAULT '0' NOT NULL,
status enum('READY','GET_INPUT','RUN','WRITE_OUTPUT','DEAD','HIGHMEM') DEFAULT 'READY' NOT NULL,
status enum('READY','GET_INPUT','RUN','WRITE_OUTPUT','DEAD') DEFAULT 'READY' NOT NULL,
born datetime NOT NULL,
last_check_in datetime NOT NULL,
died datetime DEFAULT NULL,
......@@ -123,7 +123,7 @@ CREATE TABLE analysis_job (
input_id char(255) not null,
job_claim char(40) NOT NULL default '', #UUID
hive_id int(10) NOT NULL,
status enum('READY','BLOCKED','CLAIMED','GET_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED','HIGHMEM') DEFAULT 'READY' NOT NULL,
status enum('READY','BLOCKED','CLAIMED','GET_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED') DEFAULT 'READY' NOT NULL,
retry_count int(10) default 0 not NULL,
completed datetime NOT NULL,
branch_code int(10) default 1 NOT NULL,
......@@ -206,7 +206,7 @@ CREATE TABLE analysis_data (
CREATE TABLE analysis_stats (
analysis_id int(10) NOT NULL,
status enum('BLOCKED', 'LOADING', 'SYNCHING', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED','HIGHMEM')
status enum('BLOCKED', 'LOADING', 'SYNCHING', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED')
DEFAULT 'READY' NOT NULL,
batch_size int(10) default 1 NOT NULL,
avg_msec_per_job int(10) default 0 NOT NULL,
......@@ -234,7 +234,7 @@ CREATE TABLE analysis_stats (
CREATE TABLE analysis_stats_monitor (
time datetime NOT NULL default '0000-00-00 00:00:00',
analysis_id int(10) NOT NULL,
status enum('BLOCKED', 'LOADING', 'SYNCHING', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED','HIGHMEM')
status enum('BLOCKED', 'LOADING', 'SYNCHING', 'READY', 'WORKING', 'ALL_CLAIMED', 'DONE', 'FAILED')
DEFAULT 'READY' NOT NULL,
batch_size int(10) default 1 NOT NULL,
avg_msec_per_job int(10) default 0 NOT NULL,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment