Commit ac05ff22 authored by Leo Gordon's avatar Leo Gordon
Browse files

introduced generalized AS::increment_a_counter() method and started using it

parent a59e8d58
......@@ -440,10 +440,29 @@ sub grab_jobs_for_role {
}
}
$self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'ready_job_count', -$claim_count, $analysis_id );
return $claim_count ? $self->fetch_all_by_role_id_AND_status($role_id, 'CLAIMED') : [];
}
sub release_claimed_jobs_from_role {
my ($self, $role) = @_;
# previous value of role_id is not important, because that Role never had a chance to run the jobs
my $num_released_jobs = $self->dbc->protected_prepare_execute( [ "UPDATE job SET status='READY', role_id=NULL WHERE role_id=? AND status='CLAIMED'", $role->dbID ],
sub { my ($after) = @_; $self->db->get_LogMessageAdaptor->store_worker_message( $role->worker, "releasing claimed jobs from role".$after, 0 ); }
);
my $analysis_stats_adaptor = $self->db->get_AnalysisStatsAdaptor;
my $analysis_id = $role->analysis_id;
$analysis_stats_adaptor->increment_a_counter( 'ready_job_count', $num_released_jobs, $analysis_id );
# $analysis_stats_adaptor->update_status( $analysis_id, 'LOADING' );
}
=head2 release_undone_jobs_from_role
Arg [1] : Bio::EnsEMBL::Hive::Role object
......@@ -470,13 +489,7 @@ sub release_undone_jobs_from_role {
my $worker = $role->worker;
#first just reset the claimed jobs, these don't need a retry_count index increment:
# (previous role_id does not matter, because that Role has never had a chance to run the job)
$self->dbc->do( qq{
UPDATE job
SET status='READY', role_id=NULL
WHERE role_id='$role_id'
AND status='CLAIMED'
} );
$self->release_claimed_jobs_from_role( $role );
my $sth = $self->prepare( qq{
SELECT job_id
......@@ -563,6 +576,9 @@ sub gc_dataflow {
$job->set_and_update_status('PASSED_ON');
# PASSED_ON jobs are included in done_job_count
$self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'done_job_count', 1, $analysis->dbID );
if(my $semaphored_job_id = $job->semaphored_job_id) {
$self->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # step-unblock the semaphore
}
......
......@@ -167,8 +167,7 @@ sub update {
sub update_status {
my ($self, $analysis_id, $status) = @_;
my $sql = "UPDATE analysis_stats SET status='$status' ";
$sql .= " WHERE analysis_id='$analysis_id' ";
my $sql = "UPDATE analysis_stats SET status='$status' WHERE analysis_id='$analysis_id' ";
my $sth = $self->prepare($sql);
$sth->execute();
......@@ -176,17 +175,6 @@ sub update_status {
}
sub interval_update_claim {
my ($self, $analysis_id, $job_count) = @_;
unless( $self->db->hive_use_triggers() ) {
my $sql = "UPDATE analysis_stats SET ready_job_count = ready_job_count - $job_count WHERE analysis_id= $analysis_id";
$self->dbc->do( $sql );
}
}
=head2 interval_update_work_done
Arg [1] : int $analysis_id
......@@ -232,27 +220,15 @@ sub interval_update_work_done {
}
sub increase_running_workers {
my $self = shift;
my $analysis_id = shift;
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers + 1 ".
" WHERE analysis_id='$analysis_id'";
$self->dbc->do($sql);
}
sub decrease_running_workers {
my $self = shift;
my $analysis_id = shift;
sub increment_a_counter {
my ($self, $counter, $increment, $analysis_id) = @_;
my $sql = "UPDATE analysis_stats SET num_running_workers = num_running_workers - 1 ".
" WHERE analysis_id='$analysis_id'";
$self->dbc->do($sql);
unless( $self->db->hive_use_triggers() ) {
if($increment) { # can either be positive or negative
$self->dbc->do( "UPDATE analysis_stats SET $counter = $counter + ($increment) WHERE analysis_id='$analysis_id'" );
}
}
}
1;
......@@ -65,13 +65,9 @@ sub finalize_role {
my $role_id = $role->dbID;
my $when_finished = $role->when_finished ? "'".$role->when_finished."'" : 'CURRENT_TIMESTAMP';
my $sql = "UPDATE role SET when_finished=$when_finished WHERE role_id=$role_id";
$self->dbc->do( "UPDATE role SET when_finished=$when_finished WHERE role_id=$role_id" );
$self->dbc->do( $sql );
unless( $self->db->hive_use_triggers() ) {
$self->db->get_AnalysisStatsAdaptor->decrease_running_workers( $role->analysis_id );
}
$self->db->get_AnalysisStatsAdaptor->increment_a_counter( 'num_running_workers', -1, $role->analysis_id );
if( $release_undone_jobs ) {
$self->db->get_AnalysisJobAdaptor->release_undone_jobs_from_role( $role );
......
......@@ -279,9 +279,7 @@ sub specialize_worker {
# However this may be tricky to emulate by triggers that know nothing about "special tasks",
# so I am (temporarily?) simplifying the accounting algorithm.
#
unless( $self->db->hive_use_triggers() ) {
$analysis_stats_adaptor->increase_running_workers( $analysis->dbID );
}
$analysis_stats_adaptor->increment_a_counter( 'num_running_workers', 1, $analysis->dbID );
}
......
......@@ -509,7 +509,6 @@ sub run {
my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
if(scalar(@$actual_batch)) {
$self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_claim($self->current_role->analysis->dbID, scalar(@$actual_batch));
my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
$jobs_done_by_batches_loop += $jobs_done_by_this_batch;
$self->job_limiter->final_decision( $jobs_done_by_this_batch );
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment