Commit 048df0fb authored by Leo Gordon's avatar Leo Gordon
Browse files

"dataflow made friendly", untested

parent 41864ab2
......@@ -16,6 +16,45 @@ Summary:
Bio::EnsEMBL::Analysis::RunnableDB perl wrapper objects as nodes/blocks in
the graphs but could be adapted more generally.
26 March, 2010 : Leo Gordon
* branch_code column in analysis_job table is unnecessary and was removed
Branching using branch_codes is a very important and powerful mechanism,
but it is completely defined in dataflow_rule table.
branch_code() WAS at some point a getter/setter method in AnalysisJob,
but it was only used to pass parameters around in the code (now obsolete),
and this information was never reflected in the database,
so analysis_job.branch_code was always 1 no matter what.
* stringification using Data::Dumper with parameters was moved out of init_pipelines and JobFactory.pm
and is now in a separate Hive::Utils.pm module (Hive::Utils::stringify can be imported, inherited or just called).
It is transparently called by AnalysisJobAdaptor when creating jobs which allows
to pass input_ids as hashrefs and not strings. Magic happens on the adaptor level.
* Queen->flow_output_job() method has been made obsolete and removed from the Queen.pm
Dataflow is now completely handled by Process->dataflow_output_id() method,
which now handles arrays/fans of jobs and semaphores (later on this).
Please always use dataflow_output_id() if you need to create a new job or fan of jobs,
as this is the top level method for doing exactly this.
Only call the naked adaptor's method if you know what you're doing.
* JobFactory module has been upgraded (simplified) to work through dataflow mechanism.
It no longer can create analyses, but that's not necessary as it should be init_pipeline's job.
Family pipeline has been patched to work with the new JobFactory module.
* branched dataflow was going to meet semaphores at some point, the time is near.
dataflow_output_id() is now semaphore aware, and can propagate semaphores through the control graph.
A new fan is hooked on its own semaphore; when the semaphored_job is not specified we do semaphore propagation.
Inability to create a job in the fan is tracked and the corresponding semaphore_count decreased
(so users do not have to worry about it).
* LongMult examples have been patched to work with the new dataflow_output_id() method.
22 March, 2010 : Leo Gordon
* Bio::EnsEMBL::Hive::ProcessWithParams is the preferred way of parsing/passing around the parameters.
......
......@@ -14,8 +14,8 @@ my $pipeline_db = {
-host => 'compara2',
-port => 3306,
-user => 'ensadmin',
-pass => '#######',
-dbname => 'long_mult_pipeline',
-pass => 'ensembl',
-dbname => $ENV{USER}.'_long_mult_pipeline',
};
sub dbconn_2_mysql {
......@@ -54,14 +54,17 @@ return {
{ 'a_multiplier' => '9650516169', 'b_multiplier' => '327358788' },
{ 'a_multiplier' => '327358788', 'b_multiplier' => '9650516169' },
],
-flow_into => [ 'add_together' ],
-flow_into => {
2 => [ 'part_multiply' ], # will create a fan of jobs
1 => [ 'add_together' ], # will create a funnel job to wait for the fan to complete and add the results
},
},
{ -logic_name => 'part_multiply',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply',
-parameters => {},
-input_ids => [
# (jobs for this analysis will be created by the 'start' jobs above)
# (jobs for this analysis will be flown_into via branch-2 from 'start' jobs above)
],
},
......@@ -69,9 +72,9 @@ return {
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::AddTogether',
-parameters => {},
-input_ids => [
# (jobs for this analysis will be "flown into" from the 'start' jobs above)
# (jobs for this analysis will be flown_into via branch-1 from 'start' jobs above)
],
-wait_for => [ 'start', 'part_multiply' ], # but we have to wait for both to complete
-wait_for => [ 'part_multiply' ], # we can only start adding when all partial products have been computed
},
],
};
......
## Configuration file for the long multiplication semaphored pipeline example
#
## Run it like this:
#
# init_pipeline.pl -conf long_mult_sema_pipeline.conf
#
# code directories:
my $ensembl_cvs_root_dir = $ENV{'HOME'}.'/work';
#my $ensembl_cvs_root_dir = $ENV{'HOME'}.'/ensembl_main'; ## for some Compara developers
# long multiplication pipeline database connection parameters:
my $pipeline_db = {
-host => 'compara2',
-port => 3306,
-user => 'ensadmin',
-pass => 'ensembl',
-dbname => $ENV{USER}.'_long_mult_sema_pipeline',
};
sub dbconn_2_mysql {
my ($db_conn, $with_db) = @_;
return "--host=$db_conn->{-host} --port=$db_conn->{-port} "
."--user=$db_conn->{-user} --pass=$db_conn->{-pass} "
.($with_db ? $db_conn->{-dbname} : '');
}
return {
# pass connection parameters into the pipeline initialization script to create adaptors:
-pipeline_db => $pipeline_db,
# shell commands that create and possibly pre-fill the pipeline database:
-pipeline_create_commands => [
'mysql '.dbconn_2_mysql($pipeline_db, 0)." -e 'CREATE DATABASE $pipeline_db->{-dbname}'",
# standard eHive tables and procedures:
'mysql '.dbconn_2_mysql($pipeline_db, 1)." <$ensembl_cvs_root_dir/ensembl-hive/sql/tables.sql",
'mysql '.dbconn_2_mysql($pipeline_db, 1)." <$ensembl_cvs_root_dir/ensembl-hive/sql/procedures.sql",
# additional tables needed for long multiplication pipeline's operation:
'mysql '.dbconn_2_mysql($pipeline_db, 1)." -e 'CREATE TABLE intermediate_result (a_multiplier char(40) NOT NULL, digit tinyint NOT NULL, result char(41) NOT NULL, PRIMARY KEY (a_multiplier, digit))'",
'mysql '.dbconn_2_mysql($pipeline_db, 1)." -e 'CREATE TABLE final_result (a_multiplier char(40) NOT NULL, b_multiplier char(40) NOT NULL, result char(80) NOT NULL, PRIMARY KEY (a_multiplier, b_multiplier))'",
# name the pipeline to differentiate the submitted processes:
'mysql '.dbconn_2_mysql($pipeline_db, 1)." -e 'INSERT INTO meta (meta_key, meta_value) VALUES (\"name\", \"slmult\")'",
],
-pipeline_analyses => [
{ -logic_name => 'sema_start',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::SemaStart',
-parameters => {},
-input_ids => [
{ 'a_multiplier' => '9650516169', 'b_multiplier' => '327358788' },
{ 'a_multiplier' => '327358788', 'b_multiplier' => '9650516169' },
],
-flow_into => {
1 => [ 'add_together' ], # will create a semaphored funnel job to wait for the fan to complete and add the results
2 => [ 'part_multiply' ], # will create a fan of jobs that control the semaphored funnel
},
},
{ -logic_name => 'part_multiply',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::PartMultiply',
-parameters => {},
-input_ids => [
# (jobs for this analysis will be flown_into via branch-2 from 'start' jobs above)
],
},
{ -logic_name => 'add_together',
-module => 'Bio::EnsEMBL::Hive::RunnableDB::LongMult::AddTogether',
-parameters => {},
-input_ids => [
# (jobs for this analysis will be flown_into via branch-1 from 'start' jobs above)
],
# jobs in this analyses are semaphored, so no need to '-wait_for'
},
],
};
......@@ -114,13 +114,6 @@ sub query_count {
return $self->{'_query_count'};
}
sub branch_code {
my $self = shift;
$self->{'_branch_code'} = shift if(@_);
$self->{'_branch_code'} = 1 unless(defined($self->{'_branch_code'}));
return $self->{'_branch_code'};
}
sub semaphore_count {
my $self = shift;
$self->{'_semaphore_count'} = shift if(@_);
......
......@@ -39,16 +39,16 @@
package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor;
use strict;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::DBSQL::BaseAdaptor;
use Sys::Hostname;
use Data::UUID;
use Sys::Hostname;
use Bio::EnsEMBL::DBSQL::BaseAdaptor;
use Bio::EnsEMBL::Utils::Argument;
use Bio::EnsEMBL::Utils::Exception;
use Bio::EnsEMBL::Hive::Worker;
use Bio::EnsEMBL::Hive::AnalysisJob;
use Bio::EnsEMBL::Hive::Utils 'stringify'; # import 'stringify()'
our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
use base ('Bio::EnsEMBL::DBSQL::BaseAdaptor');
# our $max_retry_count = 7;
......@@ -60,7 +60,7 @@ our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
=head2 CreateNewJob
Args : -input_id => string of input_id which will be passed to run the job
Args : -input_id => string of input_id which will be passed to run the job (or a Perl hash that will be automagically stringified)
-analysis => Bio::EnsEMBL::Analysis object from a database
-block => int(0,1) set blocking state of job (default = 0)
-input_job_id => (optional) analysis_job_id of job that is creating this
......@@ -87,7 +87,8 @@ sub CreateNewJob {
my ($input_id, $analysis, $prev_analysis_job_id, $blocked, $semaphore_count, $semaphored_job_id) =
rearrange([qw(INPUT_ID ANALYSIS INPUT_JOB_ID BLOCK SEMAPHORE_COUNT SEMAPHORED_JOB_ID)], @args);
$prev_analysis_job_id=0 unless($prev_analysis_job_id);
$prev_analysis_job_id ||=0;
throw("must define input_id") unless($input_id);
throw("must define analysis") unless($analysis);
throw("analysis must be [Bio::EnsEMBL::Analysis] not a [$analysis]")
......@@ -95,6 +96,10 @@ sub CreateNewJob {
throw("analysis must have adaptor connected to database")
unless($analysis->adaptor and $analysis->adaptor->db);
if(ref($input_id)) { # let's do the Perl hash stringification centrally rather than in many places:
$input_id = stringify($input_id);
}
if(length($input_id) >= 255) {
my $input_data_id = $analysis->adaptor->db->get_AnalysisDataAdaptor->store_if_needed($input_id);
$input_id = "_ext_input_analysis_data_id $input_data_id";
......@@ -326,7 +331,6 @@ sub _columns {
a.status
a.retry_count
a.completed
a.branch_code
a.runtime_msec
a.query_count
a.semaphore_count
......@@ -365,7 +369,6 @@ sub _objs_from_sth {
$job->status($column{'status'});
$job->retry_count($column{'retry_count'});
$job->completed($column{'completed'});
$job->branch_code($column{'branch_code'});
$job->runtime_msec($column{'runtime_msec'});
$job->query_count($column{'query_count'});
$job->semaphore_count($column{'semaphore_count'});
......@@ -390,7 +393,7 @@ sub _objs_from_sth {
#
################
sub decrease_semaphore_count_for_jobid {
sub decrease_semaphore_count_for_jobid { # used in semaphore annihilation or unsuccessful creation
my $self = shift @_;
my $jobid = shift @_;
my $dec = shift @_ || 1;
......@@ -402,6 +405,18 @@ sub decrease_semaphore_count_for_jobid {
$sth->finish;
}
sub increase_semaphore_count_for_jobid { # used in semaphore propagation
my $self = shift @_;
my $jobid = shift @_;
my $inc = shift @_ || 1;
my $sql = "UPDATE analysis_job SET semaphore_count=semaphore_count+? WHERE analysis_job_id=?";
my $sth = $self->prepare($sql);
$sth->execute($inc, $jobid);
$sth->finish;
}
=head2 update_status
......@@ -419,7 +434,7 @@ sub update_status {
my $sql = "UPDATE analysis_job SET status='".$job->status."' ";
if($job->status eq 'DONE') {
$sql .= ",completed=now(),branch_code=".$job->branch_code;
$sql .= ",completed=now()";
$sql .= ",runtime_msec=".$job->runtime_msec;
$sql .= ",query_count=".$job->query_count;
}
......
......@@ -47,32 +47,27 @@ use Bio::EnsEMBL::Utils::Exception;
our @ISA = qw(Bio::EnsEMBL::DBSQL::BaseAdaptor);
=head2 fetch_from_analysis_job
=head2 fetch_from_analysis_id_branch_code
Args : Bio::EnsEMBL::Hive::AnalysisJob
Example : my @rules = @{$ruleAdaptor->fetch_from_analysis_job($job)};
Description: searches database for rules with given 'from' analysis
returns all such rules in a list (by reference)
Args : unsigned int $analysis_id, unsigned int $branch_code
Example : my @rules = @{$ruleAdaptor->fetch_from_analysis_id_branch_code($analysis_id, $branch_code)};
Description: searches database for rules with given from_analysis_id and branch_code
and returns all such rules in a list (by reference)
Returntype : reference to list of Bio::EnsEMBL::Hive::DataflowRule objects
Exceptions : none
Caller : ?
Caller : Bio::EnsEMBL::Hive::Queen::flow_output_job
=cut
sub fetch_from_analysis_job
{
my $self = shift;
my $fromAnalysisJob = shift;
throw("arg is required\n") unless($fromAnalysisJob);
throw("arg must be a [Bio::EnsEMBL::Hive::AnalysisJob] not a $fromAnalysisJob")
unless ($fromAnalysisJob->isa('Bio::EnsEMBL::Hive::AnalysisJob'));
return [] unless($fromAnalysisJob->analysis_id);
my $constraint = "r.from_analysis_id = '".$fromAnalysisJob->analysis_id."'"
." AND r.branch_code=". $fromAnalysisJob->branch_code;
return $self->_generic_fetch($constraint);
sub fetch_from_analysis_id_branch_code {
my ($self, $analysis_id, $branch_code) = @_;
return [] unless($analysis_id);
$branch_code ||= 1;
my $constraint = "r.from_analysis_id=${analysis_id} AND r.branch_code=${branch_code}";
return $self->_generic_fetch($constraint);
}
......@@ -156,14 +151,14 @@ sub remove {
=cut
sub create_rule {
my ($self, $fromAnalysis, $toAnalysis, $branchCode) = @_;
my ($self, $from_analysis, $to_analysis, $branch_code) = @_;
return unless($fromAnalysis and $toAnalysis);
return unless($from_analysis and $to_analysis);
my $rule = Bio::EnsEMBL::Hive::DataflowRule->new();
$rule->from_analysis($fromAnalysis);
$rule->to_analysis($toAnalysis);
$rule->branch_code($branchCode) if(defined($branchCode));
$rule->from_analysis($from_analysis);
$rule->to_analysis($to_analysis);
$rule->branch_code($branch_code) if(defined($branch_code));
return $self->store($rule);
}
......
......@@ -82,9 +82,6 @@
=cut
my $g_hive_process_workdir; # a global directory location for the process using this module
package Bio::EnsEMBL::Hive::Process;
use strict;
......@@ -292,6 +289,7 @@ sub autoflow_inputjob {
Title : dataflow_output_id
Arg[1](req) : <string> $output_id
Arg[2](opt) : <int> $branch_code (optional, defaults to 1)
Arg[3](opt) : <hashref> $create_job_options (optional, defaults to {}, options added to the CreateNewJob method)
Usage : $self->dataflow_output_id($output_id, $branch_code);
Function:
If Process needs to create jobs, this allows it to have jobs
......@@ -303,31 +301,52 @@ sub autoflow_inputjob {
=cut
sub dataflow_output_id {
my ($self, $output_id, $branch_code, $blocked) = @_;
return unless($output_id);
return unless($self->analysis);
$branch_code=1 unless(defined($branch_code));
# Dataflow works by doing a transform from this process to the next.
# The job starts out 'attached' to this process hence the analysis_id, branch_code, and dbID
# are all relative to the starting point. The dataflow process transforms the job to a
# different analysis_id, and moves the dbID to the previous_analysis_job_id
my $job = new Bio::EnsEMBL::Hive::AnalysisJob;
$job->input_id($output_id);
$job->analysis_id($self->analysis->dbID);
$job->branch_code($branch_code);
$job->dbID($self->input_job->dbID);
$job->status( $blocked ? 'BLOCKED' : 'READY' );
#if process uses branch_code 1 explicitly, turn off automatic dataflow
$self->autoflow_inputjob(0) if($branch_code==1);
return $self->queen->flow_output_job($job);
my ($self, $output_ids, $branch_code, $create_job_options) = @_;
return unless($self->analysis);
return unless($self->input_job);
$output_ids ||= [ $self->input_id() ]; # replicate the input_id in the branch_code's output by default
$output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY'); # force previously used single values into an arrayref
$branch_code ||= 1; # default branch_code is 1
$create_job_options ||= {}; # { -block => 1 } or { -semaphore_cout => scalar(@fan_job_ids) } or { -semaphored_job_id => $funnel_job_id }
# this tricky code is responsible for correct propagation of semaphores down the dataflow pipes:
my $propagate_semaphore = not exists ($create_job_options->{'-semaphored_job_id'}); # CONVENTION: if zero is explicitly supplied, it is a request not to propagate
# However nothing is supplied, semaphored_job_id will be propagated from the parent job:
my $semaphored_job_id = $create_job_options->{'-semaphored_job_id'} ||= $self->input_job->semaphored_job_id();
# if branch_code is set to 1 (explicitly or impliticly), turn off automatic dataflow:
$self->autoflow_inputjob(0) if($branch_code==1);
my @output_job_ids = ();
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
my $rules = $self->db->get_DataflowRuleAdaptor->fetch_from_analysis_id_branch_code($self->analysis->dbID, $branch_code);
foreach my $rule (@{$rules}) {
foreach my $output_id (@$output_ids) {
if(my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob(
-input_id => $output_id,
-analysis => $rule->to_analysis,
-input_job_id => $self->input_job->dbID, # creator_job's id
%$create_job_options
)) {
if($semaphored_job_id and $propagate_semaphore) {
$job_adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id ); # propagate the semaphore
}
# only add the ones that were indeed created:
push @output_job_ids, $job_id;
} elsif($semaphored_job_id and !$propagate_semaphore) {
$job_adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id ); # if we didn't succeed in creating the job, fix the semaphore
}
}
}
return \@output_job_ids;
}
=head2 debug
Title : debug
......
......@@ -337,31 +337,6 @@ sub worker_register_job_done {
}
sub flow_output_job {
my $self = shift;
my $job = shift;
return unless($job);
my $create_blocked_job = 0;
$create_blocked_job = 1 if($job->status and ($job->status eq 'BLOCKED'));
my @output_jobs;
my $rules = $self->db->get_DataflowRuleAdaptor->fetch_from_analysis_job($job);
foreach my $rule (@{$rules}) {
my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob (
-input_id => $job->input_id,
-analysis => $rule->to_analysis,
-input_job_id => $job->dbID,
-block => $create_blocked_job
);
my $job_url = $rule->to_analysis->adaptor->db->dbc->url;
$job_url .= "/analysis_job?dbID=" . $job_id;
push @output_jobs, $job_url;
}
return \@output_jobs;
}
######################################
#
# Public API interface for beekeeper
......
......@@ -6,57 +6,25 @@ Bio::EnsEMBL::Hive::RunnableDB::JobFactory
=head1 DESCRIPTION
A generic module for creating batches of similar jobs.
=head1 USAGE EXAMPLES
cat <<EOF >/tmp/jf_test.txt
5
8
9
13
15
26
EOF
mysql --defaults-group-suffix=_compara1 -e 'DROP DATABASE job_factory_test'
mysql --defaults-group-suffix=_compara1 -e 'CREATE DATABASE job_factory_test'
mysql --defaults-group-suffix=_compara1 job_factory_test <~lg4/work/ensembl-hive/sql/tables.sql
mysql --defaults-group-suffix=_compara1 job_factory_test
INSERT INTO analysis (created, logic_name, module, parameters)
VALUES (NOW(), 'analysis_factory', 'Bio::EnsEMBL::Hive::RunnableDB::JobFactory',
"{ 'module' => 'Bio::EnsEMBL::Hive::RunnableDB::Test', 'numeric' => 1, 'parameters' => { 'divisor' => 4 }, 'input_id' => { 'value' => '$RangeStart', 'time_running' => '$RangeCount*2'} }");
INSERT INTO analysis (created, logic_name, module, parameters)
VALUES (NOW(), 'factory_from_file', 'Bio::EnsEMBL::Hive::RunnableDB::JobFactory',
"{ 'module' => 'Bio::EnsEMBL::Hive::RunnableDB::Test', 'numeric' => 1, 'parameters' => { 'divisor' => 13 }, 'input_id' => { 'value' => '$RangeStart', 'time_running' => 2} }");
A generic module for creating batches of similar jobs using dataflow mechanism
(a fan of jobs is created in one branch and the funnel in another).
Make sure you wire this buliding block properly from outside.
There are 3 ways the batches are generated, depending on the source of ids:
INSERT INTO analysis_job (analysis_id, input_id) VALUES (1, "{ 'inputlist' => [10..47], 'step' => 5, 'logic_name' => 'alpha_analysis', 'hive_capacity' => 3 }");
* inputlist. The list is explicitly given in the parameters, can be abbreviated: 'inputlist' => ['a'..'z']
INSERT INTO analysis_job (analysis_id, input_id) VALUES (1, "{ 'inputlist' => [2..7], 'logic_name' => 'beta_analysis', 'batch_size' => 2 }");
* inputfile. The list is contained in a file whose name is supplied as parameter: 'inputfile' => 'myfile.txt'
INSERT INTO analysis_job (analysis_id, input_id) VALUES (2, "{ 'inputfile' => '/tmp/jf_test.txt', 'logic_name' => 'gamma_file', 'randomize' => 1 }");
* inputquery. The list is generated by an SQL query against the production database: 'inputquery' => 'SELECT object_id FROM object WHERE x=y'
SELECT * FROM analysis; SELECT * FROM analysis_stats; SELECT * FROM analysis_job;
If 'sema_funnel_branch_code' is defined, it becomes the destination branch for a semaphored funnel job,
whose count is automatically set to the number of fan jobs that it will be waiting for.
QUIT
beekeeper.pl -url mysql://ensadmin:ensembl@compara1/job_factory_test -sync
#runWorker.pl -url mysql://ensadmin:ensembl@compara1/job_factory_test
beekeeper.pl -url mysql://ensadmin:ensembl@compara1/job_factory_test -loop
mysql --defaults-group-suffix=_compara1 job_factory_test -e 'SELECT * FROM analysis'
mysql --defaults-group-suffix=_compara1 job_factory_test -e 'SELECT * FROM analysis_job'
=head1 USAGE EXAMPLES
NB: NB: NB: The documentation needs refreshing. There are now three modes of operation (inputlist, inputfile, inputquery).
(to be added)
=cut
......@@ -64,8 +32,7 @@ package Bio::EnsEMBL::Hive::RunnableDB::JobFactory;
use strict;
use Data::Dumper; # NB: not for testing, but for actual data structure stringification
use Bio::EnsEMBL::Hive::Utils 'stringify'; # import 'stringify()'
use base ('Bio::EnsEMBL::Hive::ProcessWithParams');
sub fetch_input { # we have nothing to fetch, really
......@@ -77,24 +44,14 @@ sub fetch_input { # we have nothing to fetch, really
sub run {
my $self = shift @_;
my $logic_name = $self->param('logic_name') || die "'logic_name' is an obligatory parameter";
my $module = $self->param('module') || ''; # will only become obligatory if $logic_name does not exist
my $parameters = $self->param('parameters') || {};
my $batch_size = $self->param('batch_size') || undef;
my $hive_capacity = $self->param('hive_capacity') || undef;
my $analysis = $self->db->get_AnalysisAdaptor()->fetch_by_logic_name($logic_name)
|| $self->create_analysis_object($logic_name, $module, $parameters, $batch_size, $hive_capacity);
my $input_hash = $self->param('input_id') || die "'input_id' is an obligatory parameter";
my $numeric = $self->param('numeric') || 0;
my $step = $self->param('step') || 1;
my $randomize = $self->param('randomize') || 0;
my $template_hash = $self->param('input_id') || die "'input_id' is an obligatory parameter";
my $numeric = $self->param('numeric') || 0;
my $step = $self->param('step') || 1;
my $randomize = $self->param('randomize') || 0;
my $inputlist = $self->param('inputlist');
my $inputfile = $self->param('inputfile');
my $inputquery = $self->param('inputquery');
my $inputlist = $self->param('inputlist');
my $inputfile = $self->param('inputfile');
my $inputquery = $self->param('inputquery');
my $list = $inputlist
|| ($inputfile && $self->make_list_from_file($inputfile))
......@@ -105,52 +62,36 @@ sub run {
fisher_yates_shuffle_in_place($list);
}
$self->split_list_into_ranges($analysis, $input_hash, $numeric, $list, $step);
my $output_ids = $self->split_list_into_ranges($template_hash, $numeric, $list, $step);
$self->param('output_ids', $output_ids);
}
sub write_output { # and we have nothing to write out
sub write_output { # nothing to write out, but some dataflow to perform:
my $self = shift @_;
return 1;