Commit 7675c31c authored by Jessica Severin's avatar Jessica Severin
Browse files

YAHRF (Yet Another Hive ReFactor).....chapter 1

needed to better manage the hive system's load on the database housing all
the hive related tables (in case the database is overloaded by multiple users).
Added analysis_stats.sync_lock column (and correspondly in Object and Adaptor)
Added Queen::safe_synchronize_AnalysisStats method which wraps over the
  synchronize_AnalysisStats method and does various checks and locks to ensure
  that only one worker is trying to do a 'synchronize' on a given analysis at
  any given moment.
Cleaned up API between Queen/Worker so that worker only talks directly to the
  Queen, rather than getting the underlying database adaptor.
Added analysis_job columns runtime_msec, query_count to provide more data on
  how the jobs hammer a database (queries/sec).
parent af273c18
......@@ -78,14 +78,17 @@ sub job_claim {
sub status {
my( $self, $value ) = @_;
if($value) {
$self->{'_status'} = $value;
$self->adaptor->update_status($self) if($self->adaptor);
}
$self->{'_status'} = $value if($value);
return $self->{'_status'};
}
sub update_status {
my ($self, $status ) = @_;
return unless($self->adaptor);
$self->status($status);
$self->adaptor->update_status($self);
}
sub retry_count {
my( $self, $value ) = @_;
$self->{'_retry_count'} = $value if($value);
......@@ -98,6 +101,20 @@ sub completed {
return $self->{'_completed'};
}
sub runtime_msec {
my( $self, $value ) = @_;
$self->{'_runtime_msec'} = $value if($value);
$self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
return $self->{'_runtime_msec'};
}
sub query_count {
my( $self, $value ) = @_;
$self->{'_query_count'} = $value if($value);
$self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
return $self->{'_query_count'};
}
sub branch_code {
my( $self, $value ) = @_;
$self->{'_branch_code'} = $value if(defined($value));
......
......@@ -147,6 +147,12 @@ sub seconds_since_last_update {
return time() - $self->{'_last_update'};
}
sub sync_lock {
my $self = shift;
$self->{'_sync_lock'} = shift if(@_);
return $self->{'_sync_lock'};
}
sub determine_status {
my $self = shift;
......
......@@ -300,6 +300,8 @@ sub _columns {
a.retry_count
a.completed
a.branch_code
a.runtime_msec
a.query_count
);
}
......@@ -335,6 +337,8 @@ sub _objs_from_sth {
$job->retry_count($column{'retry_count'});
$job->completed($column{'completed'});
$job->branch_code($column{'branch_code'});
$job->runtime_msec($column{'runtime_msec'});
$job->query_count($column{'query_count'});
$job->adaptor($self);
if($column{'input_id'} =~ /_ext_input_analysis_data_id (\d+)/) {
......@@ -370,7 +374,11 @@ sub update_status {
my ($self,$job) = @_;
my $sql = "UPDATE analysis_job SET status='".$job->status."' ";
$sql .= " ,completed=now(),branch_code=".$job->branch_code if($job->status eq 'DONE');
if($job->status eq 'DONE') {
$sql .= ",completed=now(),branch_code=".$job->branch_code;
$sql .= ",runtime_msec=".$job->runtime_msec;
$sql .= ",query_count=".$job->query_count;
}
$sql .= " WHERE analysis_job_id='".$job->dbID."' ";
my $sth = $self->prepare($sql);
......
......@@ -149,6 +149,7 @@ sub update {
$sql .= ",failed_job_count=" . $stats->failed_job_count();
$sql .= ",num_required_workers=" . $stats->num_required_workers();
$sql .= ",last_update=NOW()";
$sql .= ",sync_lock=''";
$sql .= " WHERE analysis_id='".$stats->analysis_id."' ";
my $sth = $self->prepare($sql);
......@@ -315,6 +316,7 @@ sub _columns {
ast.failed_job_count
ast.num_required_workers
ast.last_update
ast.sync_lock
);
push @columns , "UNIX_TIMESTAMP()-UNIX_TIMESTAMP(ast.last_update) seconds_since_last_update ";
return @columns;
......@@ -333,6 +335,7 @@ sub _objs_from_sth {
$analStats->analysis_id($column{'analysis_id'});
$analStats->status($column{'status'});
$analStats->sync_lock($column{'sync_lock'});
$analStats->batch_size($column{'batch_size'});
$analStats->avg_msec_per_job($column{'avg_msec_per_job'});
$analStats->hive_capacity($column{'hive_capacity'});
......
......@@ -74,10 +74,11 @@ use Bio::EnsEMBL::Hive::DBSQL::AnalysisCtrlRuleAdaptor;
our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
############################
#
# PUBLIC METHODS
# PUBLIC API for Workers
#
################
############################
=head2 create_new_worker
......@@ -175,7 +176,7 @@ sub register_worker_death {
}
# re-sync the analysis_stats when a worker dies as part of dynamic sync system
if($self->synchronize_AnalysisStats($worker->analysis->stats)->status ne 'DONE') {
if($self->safe_synchronize_AnalysisStats($worker->analysis->stats)->status ne 'DONE') {
# since I'm dying I should make sure there is someone to take my place after I'm gone ...
# above synch still sees me as a 'living worker' so I need to compensate for that
$self->db->get_AnalysisStatsAdaptor->increment_needed_workers($worker->analysis->dbID);
......@@ -196,10 +197,102 @@ sub worker_check_in {
$sth->execute();
$sth->finish;
$self->synchronize_AnalysisStats($worker->analysis->stats);
$self->safe_synchronize_AnalysisStats($worker->analysis->stats);
}
=head2 worker_grab_jobs
Arg [1] : Bio::EnsEMBL::Hive::Worker object $worker
Example:
my $jobs = $queen->worker_grab_jobs();
Description:
For the specified worker, it will search available jobs,
and using the workers requested batch_size, claim/fetch that
number of jobs, and then return them.
Returntype :
reference to array of Bio::EnsEMBL::Hive::AnalysisJob objects
Exceptions :
Caller :
=cut
sub worker_grab_jobs {
my $self = shift;
my $worker = shift;
my $jobDBA = $self->db->get_AnalysisJobAdaptor;
my $claim = $jobDBA->claim_jobs_for_worker($worker);
my $jobs = $jobDBA->fetch_by_claim_analysis($claim, $worker->analysis->dbID);
return $jobs;
}
=head2 grab_job_by_dbID
Arg [1]: int $analysis_job_id
Example:
my $job = $queen->grab_job_by_dbID($analysis_job_id);
Description:
For the specified analysis_job_id it will fetch just that job,
reclaim it and return it. Specifying a specific job bypasses
the safety checks, thus multiple workers could be running the
same job simultaneously (use only for debugging).
Returntype :
Bio::EnsEMBL::Hive::AnalysisJob objects
Exceptions :
Caller :
=cut
sub grab_job_by_dbID {
my $self = shift;
my $analysis_job_id = shift;
return undef unless($analysis_job_id);
my $jobDBA = $self->db->get_AnalysisJobAdaptor;
printf("fetching job for id ", $analysis_job_id, "\n");
my $job = $jobDBA->fetch_by_dbID($analysis_job_id);
return undef unless($job);
$job->hive_id(0);
$jobDBA->reclaim_job($job);
$self->db->get_AnalysisStatsAdaptor->update_status($job->analysis_id, 'LOADING');
return $job;
}
sub worker_register_job_done {
my $self = shift;
my $worker = shift;
my $job = shift;
return unless($job);
return unless($job->dbID and $job->adaptor and $job->hive_id);
return unless($worker and $worker->analysis and $worker->analysis->dbID);
# create_next_jobs
my $rules = $self->db->get_DataflowRuleAdaptor->fetch_from_analysis_job($job);
foreach my $rule (@{$rules}) {
Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob (
-input_id => $job->input_id,
-analysis => $rule->to_analysis,
-input_job_id => $job->dbID,
);
}
$job->update_status('DONE');
}
######################################
#
# Public API interface for beekeeper
#
######################################
sub fetch_overdue_workers {
my ($self,$overdue_secs) = @_;
......@@ -241,6 +334,45 @@ sub synchronize_hive {
}
=head2 safe_synchronize_AnalysisStats
Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
Example : $self->synchronize($analysisStats);
Description: Prewrapper around synchronize_AnalysisStats that does
checks and grabs sync_lock before proceeding with sync.
Used by distributed worker sync system to avoid contention.
Exceptions : none
Caller : general
=cut
sub safe_synchronize_AnalysisStats {
my $self = shift;
my $stats = shift;
return $stats unless($stats);
return $stats unless($stats->analysis_id);
return $stats if($stats->status eq 'SYNCHING');
return $stats if($stats->status eq 'DONE');
return $stats if($stats->sync_lock);
return $stats if(($stats->status eq 'WORKING') and
($stats->seconds_since_last_update < 5*60));
# OK try to claim the sync_lock
my $sql = "UPDATE analysis_stats SET status='SYNCHING', sync_lock=1 ".
"WHERE sync_lock=0 and analysis_id=" . $stats->analysis_id;
print("$sql\n");
my $row_count = $self->dbc->do($sql);
return $stats unless($row_count == 1);
printf("got sync_lock on analysis_stats(%d)\n", $stats->analysis_id);
#OK have the lock, go and do the sync
$self->synchronize_AnalysisStats($stats);
return $stats;
}
=head2 synchronize_AnalysisStats
Arg [1] : Bio::EnsEMBL::Hive::AnalysisStats object
......@@ -260,13 +392,6 @@ sub synchronize_AnalysisStats {
return $analysisStats unless($analysisStats);
return $analysisStats unless($analysisStats->analysis_id);
return $analysisStats if(($analysisStats->status eq 'WORKING') and
($analysisStats->seconds_since_last_update < 3*60));
return $analysisStats if(($analysisStats->status eq 'SYNCHING') and
($analysisStats->seconds_since_last_update < 10*60));
$analysisStats->update_status('SYNCHING');
$analysisStats->total_job_count(0);
$analysisStats->unclaimed_job_count(0);
......@@ -298,9 +423,8 @@ sub synchronize_AnalysisStats {
if($status eq 'FAILED') { $analysisStats->failed_job_count($count); }
}
$sth->finish;
if($analysisStats->status ne 'BLOCKED') {
$analysisStats->determine_status();
}
$analysisStats->determine_status();
#
# adjust_stats_for_living_workers
......@@ -324,6 +448,8 @@ sub synchronize_AnalysisStats {
$analysisStats->update;
$self->check_blocking_control_rules_for_AnalysisStats($analysisStats);
return $analysisStats;
}
......@@ -398,10 +524,8 @@ sub get_num_running_workers {
Description: Runs through the analyses in the system which are waiting
for workers to be created for them. Calculates the maximum
number of workers needed to fill the current needs of the system
Exceptions : none
Caller : general
Caller : beekeepers and other external processes
=cut
......@@ -534,7 +658,7 @@ sub _pick_best_analysis_for_new_worker {
my $stats_list = $statsDBA->fetch_by_status('LOADING', 'BLOCKED');
foreach $stats (@$stats_list) {
#$stats->print_stats();
$self->synchronize_AnalysisStats($stats);
$self->safe_synchronize_AnalysisStats($stats);
$self->check_blocking_control_rules_for_AnalysisStats($stats);
#$stats->print_stats();
......@@ -561,7 +685,7 @@ sub _pick_best_analysis_for_new_worker {
=cut
sub _fetch_by_hive_id{
sub _fetch_by_hive_id {
my ($self,$id) = @_;
unless(defined $id) {
......
......@@ -366,43 +366,42 @@ sub run
$self->db->dbc->disconnect_when_inactive(0);
my $jobDBA = $self->db->get_AnalysisJobAdaptor;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $alive=1;
while($alive) {
my $batch_start = time() * 1000;
my $jobs = [];
if($specific_job) {
$specific_job->hive_id($self->hive_id);
$jobDBA->reclaim_job($specific_job);
push @$jobs, $specific_job;
} else {
my $claim = $jobDBA->claim_jobs_for_worker($self);
$jobs = $jobDBA->fetch_by_claim_analysis($claim, $self->analysis->dbID);
$jobs = $self->queen->worker_grab_jobs($self);
}
$self->queen->worker_check_in($self); #will sync analysis_stats if >60sec overdue
$self->queen->worker_check_in($self); #will sync analysis_stats if needed
$self->cause_of_death('NO_WORK') unless(scalar @{$jobs});
$self->analysis->stats->print_stats if($self->debug);
print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n") if($self->debug);
if($self->debug) {
$self->analysis->stats->print_stats;
print(STDOUT "claimed ",scalar(@{$jobs}), " jobs to process\n");
}
my $batch_start = time() * 1000;
foreach my $job (@{$jobs}) {
$self->redirect_job_output($job);
$self->run_module_with_job($job);
$self->create_next_jobs($job);
$job->status('DONE');
$self->close_and_update_job_output($job);
$self->close_and_update_job_output($job);
$self->queen->worker_register_job_done($self, $job);
$self->{'_work_done'}++;
}
my $batch_end = time() * 1000;
#printf("batch start:%f end:%f\n", $batch_start, $batch_end);
$statsDBA->interval_update_work_done($self->analysis->dbID,
scalar(@$jobs),
($batch_end - $batch_start));
$self->db->get_AnalysisStatsAdaptor->
interval_update_work_done($self->analysis->dbID, scalar(@$jobs), $batch_end-$batch_start);
$self->cause_of_death('JOB_LIMIT') if($specific_job);
if($self->job_limit and ($self->{'_work_done'} >= $self->job_limit)) {
......@@ -450,18 +449,24 @@ sub run_module_with_job
return 0 unless($runObj);
return 0 unless($job and ($job->hive_id eq $self->hive_id));
my $start_time = time() * 1000;
$self->queen->dbc->query_count(0);
#pass the input_id from the job into the runnableDB object
$runObj->input_id($job->input_id);
$runObj->analysis_job_id($job->dbID);
$runObj->debug($self->debug);
$job->status('GET_INPUT');
$job->update_status('GET_INPUT');
print("GET_INPUT\n") if($self->debug);
$runObj->fetch_input;
$job->status('RUN');
$job->update_status('RUN');
print("RUN\n") if($self->debug);
$runObj->run;
$job->status('WRITE_OUTPUT');
$job->update_status('WRITE_OUTPUT');
print("WRITE_OUTPUT\n") if($self->debug);
$runObj->write_output;
#runnableDB is allowed to alter its input_id on output
......@@ -469,26 +474,10 @@ sub run_module_with_job
$job->input_id($runObj->input_id);
$job->branch_code($runObj->branch_code);
return 1;
}
sub create_next_jobs
{
my $self = shift;
my $job = shift;
return unless($self->db);
my $rules = $self->db->get_DataflowRuleAdaptor->fetch_from_analysis_job($job);
$job->query_count($self->queen->dbc->query_count);
$job->runtime_msec(time()*1000 - $start_time);
foreach my $rule (@{$rules}) {
Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob (
-input_id => $job->input_id,
-analysis => $rule->to_analysis,
-input_job_id => $job->dbID,
);
}
return 1;
}
......
......@@ -109,10 +109,8 @@ if($self->{'logic_name'}) {
if($self->{'job_id'}) {
printf("fetching job for id ", $self->{'job_id'}, "\n");
$self->{'analysis_job'} = $queen->db->get_AnalysisJobAdaptor->fetch_by_dbID($self->{'job_id'});
$self->{'analysis_job'} = $queen->grab_job_by_dbID($self->{'job_id'});
$self->{'analysis_id'} = $self->{'analysis_job'}->analysis_id if($self->{'analysis_job'});
$queen->db->get_AnalysisJobAdaptor->reset_job_by_dbID($self->{'analysis_job'}->dbID);
}
my $worker = $queen->create_new_worker(
......@@ -159,7 +157,7 @@ if($self->{'input_id'}) {
eval { $worker->run($job); };
}
elsif($self->{'analysis_job'}) {
my $job = $self->{'analysis_job'};
my $job = $self->{'analysis_job'};
print("running job_id=", $job->dbID," input_id:", $job->input_id,"\n");
eval { $worker->run($job); };
}
......
......@@ -26,39 +26,6 @@ CREATE TABLE hive (
);
------------------------------------------------------------------------------------
--
-- Table structure for table 'simple_rule'
--
-- overview:
-- redesign of pipeline rule system. Basic design is simplifed so that a
-- 'rule' is simply a link from one analysis object to another
-- (ie an edge in a flowchart or object interaction diagram where
-- condition_analysis_id => goal_analysis_id)
-- Each analysis object (analysis_id) is a unique node in the
-- graph that describes the pipeline system.
-- (ie each analysis_id is an 'Instance' of the module it points to)
-- Main reason for redesign that by making a single table we can implement
-- a UNIQUE constraint so that the pipeline can modify itself as it runs
-- and avoid race conditions where the same link is created multiple times
--
-- semantics:
-- simple_rule_id - internal ID
-- condition_analysis_id - foreign key to analysis table analysis_id
-- goal_analysis_id - foreign key to analysis table analysis_id
-- branch_code - joined to analysis_job.branch_code to allow branching
CREATE TABLE simple_rule (
simple_rule_id int(10) unsigned not null auto_increment,
condition_analysis_id int(10) unsigned NOT NULL,
goal_analysis_id int(10) unsigned NOT NULL,
branch_code int(10) default 1 NOT NULL,
PRIMARY KEY (simple_rule_id),
UNIQUE (condition_analysis_id, goal_analysis_id)
);
------------------------------------------------------------------------------------
--
-- Table structure for table 'dataflow_rule'
......@@ -157,6 +124,8 @@ CREATE TABLE analysis_job (
retry_count int(10) default 0 not NULL,
completed datetime NOT NULL,
branch_code int(10) default 1 NOT NULL,
runtime_msec int(10) default 0 NOT NULL,
query_count int(10) default 0 NOT NULL,
PRIMARY KEY (analysis_job_id),
UNIQUE KEY input_id_analysis (input_id, analysis_id),
......@@ -239,6 +208,7 @@ CREATE TABLE analysis_stats (
failed_job_count int(10) NOT NULL,
num_required_workers int(10) NOT NULL,
last_update datetime NOT NULL,
sync_lock int(10) default 0 NOT NULL,
UNIQUE KEY (analysis_id)
);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment