Commit 2a41ca6f authored by Leo Gordon's avatar Leo Gordon
Browse files

experimental implementation of counting semaphores (see docs)

parent 8773ee3f
...@@ -37,8 +37,8 @@ mysql $MYCONN long_mult_test <$ENS_CODE_ROOT/ensembl-hive/sql/load_long_mult.sql ...@@ -37,8 +37,8 @@ mysql $MYCONN long_mult_test <$ENS_CODE_ROOT/ensembl-hive/sql/load_long_mult.sql
# #
# or you can add your own task(s). Several tasks can be added at once: # or you can add your own task(s). Several tasks can be added at once:
mysql $MYCONN long_mult_test <<EoF mysql $MYCONN long_mult_test <<EoF
INSERT INTO analysis_job (analysis_id, input_id) VALUES ( 1, "{ 'a_multiplier' => '9750516269', 'b_multiplier' => '327358788' }"); INSERT INTO analysis_job (analysis_id, input_id) VALUES ( 1, "{ 'a_multiplier' => '9650516169', 'b_multiplier' => '327358788' }");
INSERT INTO analysis_job (analysis_id, input_id) VALUES ( 1, "{ 'a_multiplier' => '327358788', 'b_multiplier' => '9750516269' }"); INSERT INTO analysis_job (analysis_id, input_id) VALUES ( 1, "{ 'a_multiplier' => '327358788', 'b_multiplier' => '9650516169' }");
EoF EoF
# 5. Initialize the newly created eHive for the first time: # 5. Initialize the newly created eHive for the first time:
......
############################################################################################################################
#
# Please see the long_mult_example_pipeline.txt first.
#
# This is an example ( a follow-up on 'long_mult_example_pipeline.txt', so make sure you have read it first)
# of how to set up a pipeline with counting semaphores.
#
############################################################################################################################
# 0. Cache MySQL connection parameters in a variable (they will work as eHive connection parameters as well) :
export MYCONN="--host=hostname --port=port_number --user=username --password=secret"
#
# also, set the ENS_CODE_ROOT to the directory where ensembl packages are installed:
export ENS_CODE_ROOT="$HOME/ensembl_main"
# 1. Create an empty database:
mysql $MYCONN -e 'DROP DATABASE IF EXISTS long_mult_test'
mysql $MYCONN -e 'CREATE DATABASE long_mult_test'
# 2. Create eHive infrastructure:
mysql $MYCONN long_mult_test <$ENS_CODE_ROOT/ensembl-hive/sql/tables.sql
# 3. Create analyses/control_rules/dataflow_rules of the LongMult pipeline:
mysql $MYCONN long_mult_test <$ENS_CODE_ROOT/ensembl-hive/sql/create_sema_long_mult.sql
# 4. "Load" the pipeline with a multiplication task:
mysql $MYCONN long_mult_test <<EoF
INSERT INTO analysis_job (analysis_id, input_id) VALUES ( 1, "{ 'a_multiplier' => '9650516169', 'b_multiplier' => '327358788' }");
INSERT INTO analysis_job (analysis_id, input_id) VALUES ( 1, "{ 'a_multiplier' => '327358788', 'b_multiplier' => '9650516169' }");
EoF
# 5. Initialize the newly created eHive for the first time:
beekeeper.pl $MYCONN --database=long_mult_test -sync
# 6. You can either execute three individual workers (each picking one analysis of the pipeline):
runWorker.pl $MYCONN --database=long_mult_test
#
# ... or run an automatic loop that will run workers for you:
beekeeper.pl $MYCONN --database=long_mult_test -loop
#
# KNOWN BUG: if you keep suggesting your own analysis_id/logic_name, the system may sometimes think there is no work,
# where actually there will be some previously semaphored jobs that have become available yet invisible to some workers.
# KNOWN FIX: just run "beekeeper.pl $MYCONN --database=long_mult_test -sync" once, and the problem should rectify itself.
# 7. The results of the computations are to be found in 'final_result' table:
mysql $MYCONN long_mult_test -e 'SELECT * FROM final_result'
# 8. You can add more multiplication tasks by repeating from step 4.
...@@ -121,6 +121,19 @@ sub branch_code { ...@@ -121,6 +121,19 @@ sub branch_code {
return $self->{'_branch_code'}; return $self->{'_branch_code'};
} }
sub semaphore_count {
my $self = shift;
$self->{'_semaphore_count'} = shift if(@_);
$self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'}));
return $self->{'_semaphore_count'};
}
sub semaphored_job_id {
my $self = shift;
$self->{'_semaphored_job_id'} = shift if(@_);
return $self->{'_semaphored_job_id'};
}
sub stdout_file { sub stdout_file {
my $self = shift; my $self = shift;
$self->{'_stdout_file'} = shift if(@_); $self->{'_stdout_file'} = shift if(@_);
......
...@@ -84,8 +84,8 @@ sub CreateNewJob { ...@@ -84,8 +84,8 @@ sub CreateNewJob {
return undef unless(scalar @args); return undef unless(scalar @args);
my ($input_id, $analysis, $prev_analysis_job_id, $blocked) = my ($input_id, $analysis, $prev_analysis_job_id, $blocked, $semaphore_count, $semaphored_job_id) =
rearrange([qw(INPUT_ID ANALYSIS input_job_id BLOCK )], @args); rearrange([qw(INPUT_ID ANALYSIS INPUT_JOB_ID BLOCK SEMAPHORE_COUNT SEMAPHORED_JOB_ID)], @args);
$prev_analysis_job_id=0 unless($prev_analysis_job_id); $prev_analysis_job_id=0 unless($prev_analysis_job_id);
throw("must define input_id") unless($input_id); throw("must define input_id") unless($input_id);
...@@ -101,16 +101,15 @@ sub CreateNewJob { ...@@ -101,16 +101,15 @@ sub CreateNewJob {
} }
my $sql = q{INSERT ignore into analysis_job my $sql = q{INSERT ignore into analysis_job
(input_id, prev_analysis_job_id,analysis_id,status) (input_id, prev_analysis_job_id,analysis_id,status,semaphore_count,semaphored_job_id)
VALUES (?,?,?,?)}; VALUES (?,?,?,?,?,?)};
my $status ='READY'; my $status = $blocked ? 'BLOCKED' : 'READY';
$status = 'BLOCKED' if($blocked);
my $dbc = $analysis->adaptor->db->dbc; my $dbc = $analysis->adaptor->db->dbc;
my $sth = $dbc->prepare($sql); my $sth = $dbc->prepare($sql);
$sth->execute($input_id, $prev_analysis_job_id, $analysis->dbID, $status); $sth->execute($input_id, $prev_analysis_job_id, $analysis->dbID, $status, $semaphore_count, $semaphored_job_id);
my $dbID = $sth->{'mysql_insertid'}; my $job_id = $sth->{'mysql_insertid'};
$sth->finish; $sth->finish;
$dbc->do("UPDATE analysis_stats SET ". $dbc->do("UPDATE analysis_stats SET ".
...@@ -119,7 +118,7 @@ sub CreateNewJob { ...@@ -119,7 +118,7 @@ sub CreateNewJob {
",status='LOADING' ". ",status='LOADING' ".
"WHERE status!='BLOCKED' and analysis_id='".$analysis->dbID ."'"); "WHERE status!='BLOCKED' and analysis_id='".$analysis->dbID ."'");
return $dbID; return $job_id;
} }
############################################################################### ###############################################################################
...@@ -330,6 +329,8 @@ sub _columns { ...@@ -330,6 +329,8 @@ sub _columns {
a.branch_code a.branch_code
a.runtime_msec a.runtime_msec
a.query_count a.query_count
a.semaphore_count
a.semaphored_job_id
); );
} }
...@@ -367,6 +368,8 @@ sub _objs_from_sth { ...@@ -367,6 +368,8 @@ sub _objs_from_sth {
$job->branch_code($column{'branch_code'}); $job->branch_code($column{'branch_code'});
$job->runtime_msec($column{'runtime_msec'}); $job->runtime_msec($column{'runtime_msec'});
$job->query_count($column{'query_count'}); $job->query_count($column{'query_count'});
$job->semaphore_count($column{'semaphore_count'});
$job->semaphored_job_id($column{'semaphored_job_id'});
$job->adaptor($self); $job->adaptor($self);
if($column{'input_id'} =~ /_ext_input_analysis_data_id (\d+)/) { if($column{'input_id'} =~ /_ext_input_analysis_data_id (\d+)/) {
...@@ -387,6 +390,19 @@ sub _objs_from_sth { ...@@ -387,6 +390,19 @@ sub _objs_from_sth {
# #
################ ################
sub decrease_semaphore_count_for_jobid {
my $self = shift @_;
my $jobid = shift @_;
my $dec = shift @_ || 1;
my $sql = "UPDATE analysis_job SET semaphore_count=semaphore_count-? WHERE analysis_job_id=?";
my $sth = $self->prepare($sql);
$sth->execute($dec, $jobid);
$sth->finish;
}
=head2 update_status =head2 update_status
Arg [1] : $analysis_id Arg [1] : $analysis_id
...@@ -484,7 +500,7 @@ sub claim_jobs_for_worker { ...@@ -484,7 +500,7 @@ sub claim_jobs_for_worker {
my $sql_base = "UPDATE analysis_job SET job_claim='$claim'". my $sql_base = "UPDATE analysis_job SET job_claim='$claim'".
" , worker_id='". $worker->worker_id ."'". " , worker_id='". $worker->worker_id ."'".
" , status='CLAIMED'". " , status='CLAIMED'".
" WHERE job_claim='' and status='READY'". " WHERE job_claim='' AND status='READY' AND semaphore_count<=0 ".
" AND analysis_id='" .$worker->analysis->dbID. "'"; " AND analysis_id='" .$worker->analysis->dbID. "'";
my $sql_virgin = $sql_base . my $sql_virgin = $sql_base .
...@@ -698,7 +714,5 @@ sub remove_analysis_id { ...@@ -698,7 +714,5 @@ sub remove_analysis_id {
} }
1; 1;
...@@ -320,8 +320,7 @@ sub dataflow_output_id { ...@@ -320,8 +320,7 @@ sub dataflow_output_id {
$job->analysis_id($self->analysis->dbID); $job->analysis_id($self->analysis->dbID);
$job->branch_code($branch_code); $job->branch_code($branch_code);
$job->dbID($self->input_job->dbID); $job->dbID($self->input_job->dbID);
$job->status('READY'); $job->status( $blocked ? 'BLOCKED' : 'READY' );
$job->status('BLOCKED') if(defined($blocked) and ($blocked eq 'BLOCKED'));
#if process uses branch_code 1 explicitly, turn off automatic dataflow #if process uses branch_code 1 explicitly, turn off automatic dataflow
$self->autoflow_inputjob(0) if($branch_code==1); $self->autoflow_inputjob(0) if($branch_code==1);
......
...@@ -466,20 +466,20 @@ sub synchronize_AnalysisStats { ...@@ -466,20 +466,20 @@ sub synchronize_AnalysisStats {
$analysisStats->failed_job_count(0); $analysisStats->failed_job_count(0);
$analysisStats->num_required_workers(0); $analysisStats->num_required_workers(0);
my $sql = "SELECT status, count(*) FROM analysis_job ". my $sql = "SELECT status, count(*), semaphore_count>0 semaphored FROM analysis_job ".
"WHERE analysis_id=? GROUP BY status"; "WHERE analysis_id=? GROUP BY status, semaphored";
my $sth = $self->prepare($sql); my $sth = $self->prepare($sql);
$sth->execute($analysisStats->analysis_id); $sth->execute($analysisStats->analysis_id);
my $hive_capacity = $analysisStats->hive_capacity; my $hive_capacity = $analysisStats->hive_capacity;
while (my ($status, $count)=$sth->fetchrow_array()) { while (my ($status, $count, $semaphored)=$sth->fetchrow_array()) {
# print STDERR "$status - $count\n"; # print STDERR "$status - $count\n";
my $total = $analysisStats->total_job_count(); my $total = $analysisStats->total_job_count();
$analysisStats->total_job_count($total + $count); $analysisStats->total_job_count($total + $count);
if($status eq 'READY') { if(($status eq 'READY') and !$semaphored) {
$analysisStats->unclaimed_job_count($count); $analysisStats->unclaimed_job_count($count);
my $numWorkers; my $numWorkers;
if($analysisStats->batch_size > 0) { if($analysisStats->batch_size > 0) {
......
=pod
=head1 NAME
Bio::EnsEMBL::Hive::RunnableDB::LongMult::SemaStart
=head1 DESCRIPTION
'LongMult::SemaStart' is an alternative first step of the LongMult example pipeline that multiplies two long numbers.
In the same manner as 'LongMult::Start', it takes apart the second multiplier and creates several 'LongMult::PartMultiply' jobs
that correspond to the different digits of the second multiplier.
However, instead of using by-analysis control mechanisms (control-flow and data-flow rules)
it uses counting semaphores as a less coarse by-job control mechanism,
which allows several different multiplications to run independently of each other.
=cut
package Bio::EnsEMBL::Hive::RunnableDB::LongMult::SemaStart;
use strict;
use base ('Bio::EnsEMBL::Hive::ProcessWithParams');
sub fetch_input { # this time we have nothing to fetch
my $self = shift @_;
return 1;
}
sub run { # following the 'divide and conquer' principle, out job is to create jobs:
my $self = shift @_;
my $a_multiplier = $self->param('a_multiplier') || die "'a_multiplier' is an obligatory parameter";
my $b_multiplier = $self->param('b_multiplier') || die "'b_multiplier' is an obligatory parameter";
my %digit_hash = ();
foreach my $digit (split(//,$b_multiplier)) {
next if (($digit eq '0') or ($digit eq '1'));
$digit_hash{$digit}++;
}
my $analysis_adaptor = $self->db->get_AnalysisAdaptor();
my $job_adaptor = $self->db->get_AnalysisJobAdaptor();
my $pm_analysis = $analysis_adaptor->fetch_by_logic_name('part_multiply');
my $at_analysis = $analysis_adaptor->fetch_by_logic_name('add_together');
my $current_job_id = $self->input_job->dbID();
# First, create the "sink" job and pre-block it with counting semaphore
my $at_job_id = $job_adaptor->CreateNewJob (
-input_id => "{ 'a_multiplier' => '$a_multiplier', 'b_multiplier' => '$b_multiplier' }",
-analysis => $at_analysis,
-input_job_id => $current_job_id,
-semaphore_count => scalar (keys %digit_hash), # AT MOST that many individual blocks
);
# Then, create the array of intermediate jobs that will be gradually unblocking the "sink" job upon successful completion:
foreach my $digit (keys %digit_hash) {
my $pm_job_id = $job_adaptor->CreateNewJob (
-input_id => "{ 'a_multiplier' => '$a_multiplier', 'digit' => '$digit' }",
-analysis => $pm_analysis,
-input_job_id => $current_job_id,
-semaphored_job_id => $at_job_id,
);
# if this job has already been created in the past
# (and presumably the result has been already computed),
# we want to adjust the semaphore_count manually :
unless($pm_job_id) {
$job_adaptor->decrease_semaphore_count_for_jobid($at_job_id);
}
}
}
sub write_output { # and we have nothing to write out
my $self = shift @_;
return 1;
}
1;
...@@ -487,6 +487,10 @@ sub run ...@@ -487,6 +487,10 @@ sub run
$self->queen->worker_register_job_done($self, $job); $self->queen->worker_register_job_done($self, $job);
if(my $semaphored_job_id = $job->semaphored_job_id) {
$job->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );
}
$self->more_work_done; $self->more_work_done;
} }
$batches_end = time() * 1000; $batches_end = time() * 1000;
...@@ -546,10 +550,8 @@ sub run ...@@ -546,10 +550,8 @@ sub run
} }
sub run_module_with_job sub run_module_with_job {
{ my ($self, $job) = @_;
my $self = shift;
my $job = shift;
my ($start_time, $end_time); my ($start_time, $end_time);
......
# create the 3 analyses we are going to use:
INSERT INTO analysis (created, logic_name, module) VALUES (NOW(), 'start', 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::SemaStart');
INSERT INTO analysis (created, logic_name, module) VALUES (NOW(), 'part_multiply', 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply');
INSERT INTO analysis (created, logic_name, module) VALUES (NOW(), 'add_together', 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::AddTogether');
# (no control- or dataflow rules anymore, pipeline is controlled via semaphores)
# create a table for holding intermediate results (written by 'part_multiply' and read by 'add_together')
CREATE TABLE intermediate_result (
a_multiplier char(40) NOT NULL,
digit tinyint NOT NULL,
result char(41) NOT NULL,
PRIMARY KEY (a_multiplier, digit)
);
# create a table for holding final results (written by 'add_together')
CREATE TABLE final_result (
a_multiplier char(40) NOT NULL,
b_multiplier char(40) NOT NULL,
result char(80) NOT NULL,
PRIMARY KEY (a_multiplier, b_multiplier)
);
# adding simple counting semaphores:
ALTER TABLE analysis_job ADD COLUMN semaphore_count int(10) NOT NULL default 0;
ALTER TABLE analysis_job ADD COLUMN semaphored_job_id int(10) DEFAULT NULL;
...@@ -115,13 +115,18 @@ CREATE TABLE analysis_ctrl_rule ( ...@@ -115,13 +115,18 @@ CREATE TABLE analysis_ctrl_rule (
-- retry_count - number times job had to be reset when worker failed to run it -- retry_count - number times job had to be reset when worker failed to run it
-- completed - timestamp when job was completed -- completed - timestamp when job was completed
-- branch_code - switch-like branching control, default=1 (ie true) -- branch_code - switch-like branching control, default=1 (ie true)
--
-- semaphore_count - if this count is >0, the job is conditionally blocked (until this count drops to 0 or below).
-- Default=0 means "nothing is blocking me by default".
-- semaphored_job_id - the analysis_job_id of job S that is waiting for this job to decrease S's semaphore_count.
-- Default=NULL means "I'm not blocking anything by default".
CREATE TABLE analysis_job ( CREATE TABLE analysis_job (
analysis_job_id int(10) NOT NULL auto_increment, analysis_job_id int(10) NOT NULL auto_increment,
prev_analysis_job_id int(10) NOT NULL, #analysis_job which created this from rules prev_analysis_job_id int(10) NOT NULL, #analysis_job which created this from rules
analysis_id int(10) NOT NULL, analysis_id int(10) NOT NULL,
input_id char(255) not null, input_id char(255) not null,
job_claim char(40) NOT NULL default '', #UUID job_claim char(40) NOT NULL DEFAULT '', #UUID
worker_id int(10) NOT NULL, worker_id int(10) NOT NULL,
status enum('READY','BLOCKED','CLAIMED','GET_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED') DEFAULT 'READY' NOT NULL, status enum('READY','BLOCKED','CLAIMED','GET_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED') DEFAULT 'READY' NOT NULL,
retry_count int(10) default 0 not NULL, retry_count int(10) default 0 not NULL,
...@@ -130,6 +135,9 @@ CREATE TABLE analysis_job ( ...@@ -130,6 +135,9 @@ CREATE TABLE analysis_job (
runtime_msec int(10) default 0 NOT NULL, runtime_msec int(10) default 0 NOT NULL,
query_count int(10) default 0 NOT NULL, query_count int(10) default 0 NOT NULL,
semaphore_count int(10) NOT NULL default 0,
semaphored_job_id int(10) DEFAULT NULL,
PRIMARY KEY (analysis_job_id), PRIMARY KEY (analysis_job_id),
UNIQUE KEY input_id_analysis (input_id, analysis_id), UNIQUE KEY input_id_analysis (input_id, analysis_id),
INDEX claim_analysis_status (job_claim, analysis_id, status), INDEX claim_analysis_status (job_claim, analysis_id, status),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment