Skip to content
Snippets Groups Projects
Commit 4956d672 authored by Leo Gordon's avatar Leo Gordon
Browse files

use OFFSET to separate jobs being claimed into ranges

parent da59a8d4
No related branches found
No related tags found
No related merge requests found
......@@ -545,30 +545,35 @@ sub reset_or_grab_job_by_dbID {
=cut
sub grab_jobs_for_worker {
my ($self, $worker, $how_many_this_batch) = @_;
my ($self, $worker, $how_many_this_batch, $workers_rank) = @_;
my $analysis_id = $worker->analysis_id();
my $worker_id = $worker->dbID();
my $update_sql = "UPDATE job SET worker_id='$worker_id', status='CLAIMED'";
my $selection_start_sql = " WHERE analysis_id='$analysis_id' AND status='READY'";
my $virgin_selection_sql = $selection_start_sql . " AND retry_count=0 LIMIT $how_many_this_batch";
my $any_selection_sql = $selection_start_sql . " LIMIT $how_many_this_batch";
my $analysis_id = $worker->analysis_id();
my $worker_id = $worker->dbID();
my $offset = $how_many_this_batch*$workers_rank;
my $prefix_sql = qq{
UPDATE job j
JOIN (
SELECT job_id
FROM job
WHERE analysis_id='$analysis_id'
AND status='READY'
};
my $suffix_sql = qq{
LIMIT $how_many_this_batch
OFFSET $offset
) as x
USING (job_id)
SET j.worker_id='$worker_id', j.status='CLAIMED'
WHERE j.status='READY'
};
if($self->dbc->driver eq 'mysql') {
# we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
if( (my $claim_count = $self->dbc->do( $update_sql . $virgin_selection_sql )) == 0 ) {
$claim_count = $self->dbc->do( $update_sql . $any_selection_sql );
}
} else {
# we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
if( (my $claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $virgin_selection_sql) AND status='READY'" )) == 0 ) {
$claim_count = $self->dbc->do( $update_sql . " WHERE job_id IN (SELECT job_id FROM job $any_selection_sql) AND status='READY'" );
}
}
# we have to be explicitly numeric here because of '0E0' value returned by DBI if "no rows have been affected":
if( (my $claim_count = $self->dbc->do( $prefix_sql . ' AND retry_count=0 '. $suffix_sql)) == 0 ) {
$claim_count = $self->dbc->do( $prefix_sql . $suffix_sql);
}
# my $constraint = "j.analysis_id='$analysis_id' AND j.worker_id='$worker_id' AND j.status='CLAIMED'";
# my $constraint = "j.analysis_id='$analysis_id' AND j.worker_id='$worker_id' AND j.status='CLAIMED'";
my $constraint = "j.worker_id='$worker_id' AND j.status='CLAIMED'";
return $self->_generic_fetch($constraint);
}
......
......@@ -692,18 +692,14 @@ sub get_hive_current_load {
sub count_running_workers {
my ($self, $analysis_id) = @_;
my $sql = qq{
SELECT count(*)
FROM worker
WHERE status!='DEAD'
} . ($analysis_id ? " AND analysis_id='$analysis_id'" : '');
return $self->count_all( "status!='DEAD'".($analysis_id ? " AND analysis_id=$analysis_id" : '') );
}
my $sth = $self->prepare($sql);
$sth->execute();
(my $queen_running_workers)=$sth->fetchrow_array();
$sth->finish();
return $queen_running_workers || 0;
sub get_workers_rank {
my ($self, $worker) = @_;
return $self->count_all( "status!='DEAD' AND analysis_id=".$worker->analysis_id." AND worker_id<".$worker->dbID );
}
......
......@@ -523,7 +523,8 @@ sub run {
my $desired_batch_size = $self->analysis->stats->get_or_estimate_batch_size();
$desired_batch_size = $self->job_limiter->preliminary_offer( $desired_batch_size );
my $actual_batch = $job_adaptor->grab_jobs_for_worker( $self, $desired_batch_size );
my $workers_rank = $self->adaptor->get_workers_rank( $self );
my $actual_batch = $job_adaptor->grab_jobs_for_worker( $self, $desired_batch_size, $workers_rank );
if(scalar(@$actual_batch)) {
my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
$jobs_done_by_batches_loop += $jobs_done_by_this_batch;
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment