Commit 26a2ced3 authored by Leo Gordon's avatar Leo Gordon
Browse files

Removed maximise_concurrency and added analysis_stats.priority to guide the...

Removed maximise_concurrency and added analysis_stats.priority to guide the scheduler; improved scheduler and LSF meadow
parent 4ce3539f
......@@ -268,6 +268,12 @@ sub can_be_empty {
return $self->{'_can_be_empty'};
}
sub priority {
my $self = shift;
$self->{'_priority'} = shift if(@_);
return $self->{'_priority'};
}
sub print_stats {
my $self = shift;
......@@ -315,41 +321,6 @@ sub print_stats {
}
sub determine_status {
my $self = shift;
if($self->status ne 'BLOCKED') {
if ($self->unclaimed_job_count == 0 and
$self->total_job_count == $self->done_job_count + $self->failed_job_count) {
my $failure_percentage = 0;
if ($self->total_job_count) {
$failure_percentage = $self->failed_job_count * 100 / $self->total_job_count;
}
if ($failure_percentage > $self->failed_job_tolerance) {
$self->status('FAILED');
print
"\n",
"##################################################\n",
printf
"## ERROR: %-35s ##\n", $self->get_analysis->logic_name." failed!";
printf
"## %4.1f%% jobs failed (tolerance: %3d%%) ##\n", $failure_percentage, $self->failed_job_tolerance;
print
"##################################################\n\n";
} else {
$self->status('DONE');
}
}
if($self->total_job_count == $self->unclaimed_job_count) {
$self->status('READY');
}
if( 0 < $self->unclaimed_job_count and $self->unclaimed_job_count < $self->total_job_count ) {
$self->status('WORKING');
}
}
}
sub check_blocking_control_rules {
my $self = shift;
......@@ -389,4 +360,35 @@ sub check_blocking_control_rules {
}
sub determine_status {
my $self = shift;
if($self->status ne 'BLOCKED') {
if($self->unclaimed_job_count == $self->total_job_count) { # nothing has been claimed yet (or an empty analysis)
$self->status('READY');
} elsif( $self->total_job_count == $self->done_job_count + $self->failed_job_count ) { # all jobs of the analysis have been tried
my $absolute_tolerance = $self->failed_job_tolerance * $self->total_job_count / 100.0;
if ($self->failed_job_count > $absolute_tolerance) {
$self->status('FAILED');
print "\n##################################################\n";
printf("## ERROR: %-35s ##\n", $self->get_analysis->logic_name." failed!");
printf("## %d jobs failed (tolerance: %d (%3d%%)) ##\n", $self->failed_job_count, $absolute_tolerance, $self->failed_job_tolerance);
print "##################################################\n\n";
} else {
$self->status('DONE');
}
} elsif ($self->unclaimed_job_count == 0 ) { # everything has been claimed
$self->status('ALL_CLAIMED');
} elsif( 0 < $self->unclaimed_job_count and $self->unclaimed_job_count < $self->total_job_count ) {
$self->status('WORKING');
}
}
}
1;
......@@ -92,14 +92,13 @@ sub fetch_all {
sub fetch_by_needed_workers {
my ($self, $limit, $maximise_concurrency, $rc_id) = @_;
my ($self, $limit, $rc_id) = @_;
my $constraint = "ast.num_required_workers>0 AND ast.status in ('READY','WORKING')"
.(defined($rc_id) ? " AND ast.rc_id = $rc_id" : '');
my $final_clause = 'ORDER BY num_running_workers'
.($maximise_concurrency ? '' : ' DESC')
.', hive_capacity DESC, analysis_id'
my $final_clause = 'ORDER BY priority DESC, '
.( ($self->dbc->driver eq 'sqlite') ? 'RANDOM()' : 'RAND()' )
.($limit ? " LIMIT $limit" : '');
$self->_final_clause($final_clause);
......@@ -213,6 +212,7 @@ sub update {
$sql .= ",sync_lock='0'";
$sql .= ",rc_id=". $stats->rc_id();
$sql .= ",can_be_empty=". $stats->can_be_empty();
$sql .= ",priority=". $stats->priority();
$sql .= " WHERE analysis_id='".$stats->analysis_id."' ";
my $sth = $self->prepare($sql);
......@@ -437,6 +437,7 @@ sub _columns {
ast.sync_lock
ast.rc_id
ast.can_be_empty
ast.priority
);
push @columns , ($self->dbc->driver eq 'sqlite')
......@@ -461,6 +462,7 @@ sub _objs_from_sth {
$analStats->sync_lock($column{'sync_lock'});
$analStats->rc_id($column{'rc_id'});
$analStats->can_be_empty($column{'can_be_empty'});
$analStats->priority($column{'priority'});
$analStats->batch_size($column{'batch_size'});
$analStats->avg_msec_per_job($column{'avg_msec_per_job'});
$analStats->avg_input_msec_per_job($column{'avg_input_msec_per_job'});
......
......@@ -105,7 +105,7 @@ sub total_running_workers_limit { # if set and ->can('count_running_workers'),
return $self->{'_total_running_workers_limit'};
}
sub pending_adjust { # if set and ->can('count_pending_workers'),
sub pending_adjust { # if set and ->can('count_pending_workers_by_rc_id'),
# provides a cut-off on the number of workers being submitted
my $self = shift @_;
......@@ -124,33 +124,4 @@ sub submitted_workers_limit { # if set, provides a cut-off on the number of work
return $self->{'_submitted_workers_limit'};
}
sub limit_workers {
my ($self, $worker_count) = @_;
if($self->can('count_pending_workers') and $self->pending_adjust()) {
my $pending_count = $self->count_pending_workers();
$worker_count -= $pending_count;
}
if(defined(my $submit_limit = $self->submitted_workers_limit)) {
if($submit_limit < $worker_count) {
$worker_count = $submit_limit;
}
}
if($self->can('count_running_workers') and defined(my $total_limit = $self->total_running_workers_limit)) {
my $available_slots = $total_limit - $self->count_running_workers();
if($available_slots < $worker_count) {
$worker_count = $available_slots;
}
}
$worker_count = 0 if ($worker_count<0);
return $worker_count;
}
1;
......@@ -23,16 +23,21 @@ sub get_current_worker_process_id {
}
}
sub count_pending_workers {
sub count_pending_workers_by_rc_id {
my ($self) = @_;
my $jnp = $self->job_name_prefix();
my $cmd = qq{bjobs -w -J '${jnp}*' -u all 2>/dev/null | grep -c PEND};
my $cmd = qq{bjobs -w -J '${jnp}*' -u all 2>/dev/null | grep PEND};
my $pend_count = qx/$cmd/;
chomp($pend_count);
my %pending_by_rc_id = ();
return $pend_count;
foreach my $line (qx/$cmd/) {
if($line=~/Hive(\d+)/) { # FIXME: should be safer to match against $jnp instead of 'Hive'
$pending_by_rc_id{$1}++;
}
}
return \%pending_by_rc_id;
}
sub status_of_all_our_workers { # returns a hashref
......
......@@ -74,6 +74,7 @@ sub default_options {
'host' => 'localhost',
'pipeline_name' => 'hive_generic',
'hive_use_triggers' => 1, # experimental, default is "off"
'pipeline_db' => {
-host => $self->o('host'),
......@@ -97,13 +98,11 @@ sub pipeline_create_commands {
my $self = shift @_;
my $db_conn = shift @_ || 'pipeline_db';
my $hive_use_triggers = $self->{'_extra_options'}{'hive_use_triggers'};
return ($self->o($db_conn, '-driver') eq 'sqlite')
? [
# standard eHive tables, triggers and procedures:
$self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/tables.sqlite',
$hive_use_triggers ? ( $self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/triggers.sqlite' ) : (),
$self->o('hive_use_triggers') ? ( $self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/triggers.sqlite' ) : (),
$self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/procedures.sqlite',
]
: [
......@@ -111,7 +110,7 @@ sub pipeline_create_commands {
# standard eHive tables, triggers, foreign_keys and procedures:
$self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/tables.sql',
$hive_use_triggers ? ( $self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/triggers.mysql' ) : (),
$self->o('hive_use_triggers') ? ( $self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/triggers.mysql' ) : (),
$self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/foreign_keys.mysql',
$self->db_connect_command($db_conn).' <'.$self->o('ensembl_cvs_root_dir').'/ensembl-hive/sql/procedures.mysql',
];
......@@ -188,7 +187,7 @@ sub pre_options {
'job_topup!' => '',
'analysis_topup!' => '',
'hive_driver' => '',
'hive_use_triggers' => '',
# 'hive_use_triggers' => '',
};
}
......@@ -348,8 +347,8 @@ sub run {
my %seen_logic_name = ();
foreach my $aha (@{$self->pipeline_analyses}) {
my ($logic_name, $module, $parameters_hash, $input_ids, $program_file, $blocked, $batch_size, $hive_capacity, $failed_job_tolerance, $max_retry_count, $can_be_empty, $rc_id) =
rearrange([qw(logic_name module parameters input_ids program_file blocked batch_size hive_capacity failed_job_tolerance max_retry_count can_be_empty rc_id)], %$aha);
my ($logic_name, $module, $parameters_hash, $program_file, $input_ids, $blocked, $batch_size, $hive_capacity, $failed_job_tolerance, $max_retry_count, $can_be_empty, $rc_id, $priority) =
rearrange([qw(logic_name module parameters program_file input_ids blocked batch_size hive_capacity failed_job_tolerance max_retry_count can_be_empty rc_id priority)], %$aha);
unless($logic_name) {
die "logic_name' must be defined in every analysis";
......@@ -376,9 +375,9 @@ sub run {
warn "Creating analysis '$logic_name'.\n";
$analysis = Bio::EnsEMBL::Analysis->new(
-db => '',
-db_file => '',
-db_version => '1',
# -db => '',
# -db_file => '',
# -db_version => '1',
-logic_name => $logic_name,
-module => $module,
-parameters => stringify($parameters_hash || {}), # have to stringify it here, because Analysis code is external wrt Hive code
......@@ -394,6 +393,7 @@ sub run {
$stats->max_retry_count( $max_retry_count ) if(defined($max_retry_count));
$stats->rc_id( $rc_id ) if(defined($rc_id));
$stats->can_be_empty( $can_be_empty ) if(defined($can_be_empty));
$stats->priority( $priority ) if(defined($priority));
$stats->status($blocked ? 'BLOCKED' : 'READY'); # be careful, as this "soft" way of blocking may be accidentally unblocked by deep sync
$stats->update();
}
......
......@@ -128,10 +128,10 @@ sub pipeline_analyses {
{ 'a_multiplier' => $self->o('second_mult'), 'b_multiplier' => $self->o('first_mult') },
],
-flow_into => {
2 => [ 'part_multiply' ], # will create a fan of jobs
1 => [ 'add_together' ], # will create a funnel job to wait for the fan to complete and add the results
# '2->A' => [ 'part_multiply' ], # will create a semaphored fan of jobs (comment out the -wait_for rule from 'add_together')
# 'A->1' => [ 'add_together' ], # will create a semaphored funnel job to wait for the fan to complete and add the results
# 2 => [ 'part_multiply' ], # will create a fan of jobs
# 1 => [ 'add_together' ], # will create a funnel job to wait for the fan to complete and add the results
'2->A' => [ 'part_multiply' ], # will create a semaphored fan of jobs (comment out the -wait_for rule from 'add_together')
'A->1' => [ 'add_together' ], # will create a semaphored funnel job to wait for the fan to complete and add the results
},
},
......@@ -153,7 +153,8 @@ sub pipeline_analyses {
-input_ids => [
# (jobs for this analysis will be flown_into via branch-1 from 'start' jobs above)
],
-wait_for => [ 'part_multiply' ], # we can only start adding when all partial products have been computed
# -wait_for => [ 'part_multiply' ], # we can only start adding when all partial products have been computed
-priority => 10,
-flow_into => {
1 => [ ':////final_result' ],
},
......
......@@ -158,7 +158,11 @@ sub create_new_worker {
$self->safe_synchronize_AnalysisStats($analysisStats);
#return undef unless(($analysisStats->status ne 'BLOCKED') and ($analysisStats->num_required_workers > 0));
} else {
$analysisStats = $self->_pick_best_analysis_for_new_worker($rc_id);
if( $analysisStats = $self->_pick_best_analysis_for_new_worker($rc_id) ) {
print "Scheduler picked analysis_id=".$analysisStats->analysis_id()." for the worker\n";
} else {
print "Scheduler failed to pick analysis_id for the worker\n";
}
}
return undef unless($analysisStats);
......@@ -441,16 +445,16 @@ sub synchronize_hive {
print STDERR "\nSynchronizing the hive (".scalar(@$list_of_analyses)." analyses this time):\n";
foreach my $analysis (@$list_of_analyses) {
$self->synchronize_AnalysisStats($analysis->stats);
print STDERR '.';
print STDERR ( ($analysis->stats()->status eq 'BLOCKED') ? 'x' : 'o');
}
print STDERR "\n";
print STDERR "Checking blocking control rules:\n";
foreach my $analysis (@$list_of_analyses) {
my $open = $analysis->stats->check_blocking_control_rules();
print STDERR ($open ? 'o' : 'x');
}
print STDERR "\n";
# print STDERR "Checking blocking control rules:\n";
# foreach my $analysis (@$list_of_analyses) {
# my $open = $analysis->stats->check_blocking_control_rules();
# print STDERR ($open ? 'o' : 'x');
# }
# print STDERR "\n";
print STDERR ''.((time() - $start_time))." seconds to synchronize_hive\n\n";
}
......@@ -485,10 +489,10 @@ sub safe_synchronize_AnalysisStats {
"WHERE sync_lock=0 and analysis_id=" . $stats->analysis_id;
#print("$sql\n");
my $row_count = $self->dbc->do($sql);
return $stats unless($row_count == 1);
return $stats unless($row_count == 1); # return the un-updated status if locked
#printf("got sync_lock on analysis_stats(%d)\n", $stats->analysis_id);
#OK have the lock, go and do the sync
# since we managed to obtain the lock, let's go and perform the sync:
$self->synchronize_AnalysisStats($stats);
return $stats;
......@@ -589,6 +593,7 @@ sub synchronize_AnalysisStats {
$analysisStats->determine_status();
}
# $analysisStats->sync_lock(0); ## do we perhaps need it here?
$analysisStats->update; #update and release sync_lock
return $analysisStats;
......@@ -652,10 +657,10 @@ sub count_running_workers {
}
=head2 get_num_needed_workers
=head2 schedule_workers
Arg[1] : Bio::EnsEMBL::Analysis object (optional)
Example : $count = $queen->get_num_needed_workers();
Example : $count = $queen->schedule_workers();
Description: Runs through the analyses in the system which are waiting
for workers to be created for them. Calculates the maximum
number of workers needed to fill the current needs of the system
......@@ -665,12 +670,12 @@ sub count_running_workers {
=cut
sub get_num_needed_workers {
my ($self, $filter_analysis) = @_;
sub schedule_workers {
my ($self, $filter_analysis, $orig_pending_by_rc_id, $available_submit_limit) = @_;
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
my $clearly_needed_analyses = $statsDBA->fetch_by_needed_workers(undef,$self->{maximise_concurrency});
my $potentially_needed_analyses = $statsDBA->fetch_by_statuses(['LOADING', 'BLOCKED']);
my $clearly_needed_analyses = $statsDBA->fetch_by_needed_workers(undef);
my $potentially_needed_analyses = $statsDBA->fetch_by_statuses(['LOADING', 'BLOCKED', 'ALL_CLAIMED']);
my @all_analyses = (@$clearly_needed_analyses, @$potentially_needed_analyses);
return 0 unless(@all_analyses);
......@@ -679,14 +684,17 @@ sub get_num_needed_workers {
return 0 if($available_load <=0.0);
my $total_workers = 0;
my %rc2workers = ();
my %pending_by_rc_id = %{ $orig_pending_by_rc_id || {} };
my $total_workers_to_run = 0;
my %workers_to_run_by_rc_id = ();
foreach my $analysis_stats (@all_analyses) {
last if ($available_load <= 0.0);
last if (defined($available_submit_limit) and !$available_submit_limit);
next if (defined $filter_analysis && $filter_analysis->dbID != $analysis_stats->analysis_id);
#digging deeper under the surface so need to sync
if(($analysis_stats->status eq 'LOADING') or ($analysis_stats->status eq 'BLOCKED')) {
if(($analysis_stats->status eq 'LOADING') or ($analysis_stats->status eq 'BLOCKED') or ($analysis_stats->status eq 'ALL_CLAIMED')) {
$self->synchronize_AnalysisStats($analysis_stats);
}
......@@ -696,43 +704,65 @@ sub get_num_needed_workers {
# FIXME: the following call *sometimes* returns a stale number greater than the number of workers actually needed for an analysis; -sync fixes it
my $workers_this_analysis = $analysis_stats->num_required_workers;
if((my $hive_capacity = $analysis_stats->hive_capacity) > 0) { # if there is a limit, use it for cut-off
my $limit_workers_this_analysis = int($available_load * $hive_capacity);
if(defined($available_submit_limit)) { # submit_limit total capping, if available
if($workers_this_analysis > $available_submit_limit) {
$workers_this_analysis = $available_submit_limit;
}
$available_submit_limit -= $workers_this_analysis;
}
if($workers_this_analysis > $limit_workers_this_analysis) {
$workers_this_analysis = $limit_workers_this_analysis;
if((my $hive_capacity = $analysis_stats->hive_capacity) > 0) { # per-analysis hive_capacity capping, if available
my $remaining_capacity_for_this_analysis = int($available_load * $hive_capacity);
if($workers_this_analysis > $remaining_capacity_for_this_analysis) {
$workers_this_analysis = $remaining_capacity_for_this_analysis;
}
$available_load -= 1.0*$workers_this_analysis/$hive_capacity;
}
$total_workers += $workers_this_analysis;
$rc2workers{$analysis_stats->rc_id} += $workers_this_analysis;
$analysis_stats->print_stats();
printf(" (%1.3f remaining-hive-load) use %3d workers of analysis_id=%d\n", $available_load, $workers_this_analysis, $analysis_stats->analysis_id);
last if($available_load <= 0.0);
my $curr_rc_id = $analysis_stats->rc_id;
if($pending_by_rc_id{ $curr_rc_id }) { # per-rc_id capping by pending processes, if available
my $pending_this_analysis = ($pending_by_rc_id{ $curr_rc_id } < $workers_this_analysis) ? $pending_by_rc_id{ $curr_rc_id } : $workers_this_analysis;
$workers_this_analysis -= $pending_this_analysis;
$pending_by_rc_id{ $curr_rc_id } -= $pending_this_analysis;
}
$total_workers_to_run += $workers_this_analysis;
$workers_to_run_by_rc_id{ $curr_rc_id } += $workers_this_analysis;
$analysis_stats->print_stats();
printf("Scheduler suggests adding %d more workers of rc_id=%d for analysis_id=%d [%1.3f hive_load remaining]\n", $workers_this_analysis, $curr_rc_id, $analysis_stats->analysis_id, $available_load);
}
printf("need a total of $total_workers workers (availLoad=%1.5f)\n", $available_load);
return ($total_workers, \%rc2workers);
printf("Scheduler suggests adding a total of %d workers [%1.5f hive_load remaining]\n", $total_workers_to_run, $available_load);
return ($total_workers_to_run, \%workers_to_run_by_rc_id);
}
sub get_needed_workers_resync_if_necessary {
sub schedule_workers_resync_if_necessary {
my ($self, $meadow, $analysis) = @_;
my ($needed_count, $rc_hash) = $self->get_num_needed_workers($analysis);
my $pending_by_rc_id = ($meadow->can('count_pending_workers_by_rc_id') and $meadow->pending_adjust()) ? $meadow->count_pending_workers_by_rc_id() : {};
my $submit_limit = $meadow->submitted_workers_limit();
my $meadow_limit = ($meadow->can('count_running_workers') and defined($meadow->total_running_workers_limit)) ? $meadow->total_running_workers_limit - $meadow->count_running_workers : undef;
unless( $needed_count or $self->get_hive_current_load() or $self->count_running_workers() ) {
my $available_submit_limit = ($submit_limit and $meadow_limit)
? (($submit_limit<$meadow_limit) ? $submit_limit : $meadow_limit)
: (defined($submit_limit) ? $submit_limit : $meadow_limit);
my ($total_workers_to_run, $workers_to_run_by_rc_id) = $self->schedule_workers($analysis, $pending_by_rc_id, $available_submit_limit);
unless( $total_workers_to_run or $self->get_hive_current_load() or $self->count_running_workers() ) {
print "*** nothing is running and nothing to do (according to analysis_stats) => perform a hard resync\n" ;
$self->synchronize_hive($analysis);
$self->check_for_dead_workers($meadow, 1);
$self->synchronize_hive($analysis);
($needed_count, $rc_hash) = $self->get_num_needed_workers($analysis);
($total_workers_to_run, $workers_to_run_by_rc_id) = $self->schedule_workers($analysis, $pending_by_rc_id, $available_submit_limit);
}
return ($needed_count, $rc_hash);
return ($total_workers_to_run, $workers_to_run_by_rc_id);
}
......@@ -854,7 +884,7 @@ sub _pick_best_analysis_for_new_worker {
my $statsDBA = $self->db->get_AnalysisStatsAdaptor;
return undef unless($statsDBA);
my ($stats) = @{$statsDBA->fetch_by_needed_workers(1,$self->{maximise_concurrency}, $rc_id)};
my ($stats) = @{$statsDBA->fetch_by_needed_workers(1, $rc_id)};
if($stats) {
#synchronize and double check that it can be run
$self->safe_synchronize_AnalysisStats($stats);
......@@ -869,7 +899,7 @@ sub _pick_best_analysis_for_new_worker {
# hidden jobs that haven't made it into the summary stats
print("QUEEN: no obvious needed workers, need to dig deeper\n");
my $stats_list = $statsDBA->fetch_by_statuses(['LOADING', 'BLOCKED']);
my $stats_list = $statsDBA->fetch_by_statuses(['LOADING', 'BLOCKED', 'ALL_CLAIMED']);
foreach $stats (@$stats_list) {
$self->safe_synchronize_AnalysisStats($stats);
......@@ -877,7 +907,7 @@ sub _pick_best_analysis_for_new_worker {
}
# does the following really ever help?
($stats) = @{$statsDBA->fetch_by_needed_workers(1,$self->{maximise_concurrency}, $rc_id)};
($stats) = @{$statsDBA->fetch_by_needed_workers(1, $rc_id)};
return $stats;
}
......
......@@ -61,7 +61,7 @@ sub main {
$self->{'sleep_minutes'} = 1;
$self->{'verbose_stats'} = 1;
$self->{'maximise_concurrency'} = 0;
$self->{'maximise_concurrency'} = undef;
$self->{'retry_throwing_jobs'} = undef;
$self->{'hive_output_dir'} = undef;
......@@ -97,10 +97,10 @@ sub main {
'life_span|lifespan=i' => \$self->{'life_span'},
'logic_name=s' => \$self->{'logic_name'},
'hive_output_dir=s' => \$self->{'hive_output_dir'},
'maximise_concurrency=i' => \$self->{'maximise_concurrency'},
'retry_throwing_jobs=i' => \$self->{'retry_throwing_jobs'},
'batch_size=i' => \$self->{'batch_size'}, # OBSOLETE!
'batch_size=i' => \$self->{'batch_size'}, # OBSOLETE!
'maximise_concurrency=i' => \$self->{'maximise_concurrency'}, # OBSOLETE!
# other commands/options
'h|help' => \$help,
......@@ -124,10 +124,14 @@ sub main {
if ($help) { script_usage(0); }
if( $self->{'batch_size'} ) {
if( defined($self->{'batch_size'}) ) {
print "\nERROR : -batch_size flag is obsolete, please modify batch_size of the analysis instead\n";
script_usage(1);
}
if( defined($self->{'maximise_concurrency'}) ) {
print "\nERROR : -maximise_concurrency flag is obsolete, please set the -priority of the analysis instead\n";
script_usage(1);
}
parse_conf($self, $conf_file);
......@@ -158,7 +162,7 @@ sub main {
}
my $queen = $self->{'dba'}->get_Queen;
$queen->{'maximise_concurrency'} = 1 if ($self->{'maximise_concurrency'});
# $queen->{'maximise_concurrency'} = 1 if ($self->{'maximise_concurrency'});
$queen->{'verbose_stats'} = $self->{'verbose_stats'};
my $pipeline_name = destringify(
......@@ -180,20 +184,21 @@ sub main {
print STDERR "+---------------------------------------------------------------------+\n";
}
my $meadow;
if($local) {
$self->{'meadow'} = Bio::EnsEMBL::Hive::Meadow::LOCAL->new();
$self->{'meadow'} -> total_running_workers_limit($local_cpus);
$meadow = Bio::EnsEMBL::Hive::Meadow::LOCAL->new();
$meadow->total_running_workers_limit($local_cpus);
} else {
$self->{'meadow'} = Bio::EnsEMBL::Hive::Meadow::LSF->new();
$self->{'meadow'} -> meadow_options($meadow_options);
$meadow = Bio::EnsEMBL::Hive::Meadow::LSF->new();
$meadow->meadow_options($meadow_options);
}
$self->{'meadow'} -> pending_adjust(not $no_pend_adjust);
$meadow->pending_adjust(not $no_pend_adjust);
if($self->{'run_job_id'}) {
$worker_limit = 1;
}
$self->{'meadow'} -> submitted_workers_limit($worker_limit);
$self->{'meadow'} -> pipeline_name($pipeline_name);
$meadow->submitted_workers_limit($worker_limit);
$meadow->pipeline_name($pipeline_name);
if($reset_job_id) { $queen->reset_and_fetch_job_by_dbID($reset_job_id); }
......@@ -209,18 +214,18 @@ sub main {
if($remove_analysis_id) { remove_analysis_id($self, $remove_analysis_id); }