Commit a735dcb4 authored by Leo Gordon's avatar Leo Gordon
Browse files

added -compile_modules_once flag to test the new (slightly faster and more logical) approach

parent 57938b20
......@@ -99,11 +99,11 @@ sub create_new_worker {
my ( $meadow_type, $meadow_name, $process_id, $exec_host,
$rc_id, $logic_name, $analysis_id, $input_id, $job_id,
$no_write, $debug, $worker_output_dir, $hive_output_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs) =
$no_write, $debug, $worker_output_dir, $hive_output_dir, $job_limit, $life_span, $no_cleanup, $retry_throwing_jobs, $compile_module_once) =
rearrange([qw(meadow_type meadow_name process_id exec_host
rc_id logic_name analysis_id input_id job_id
no_write debug worker_output_dir hive_output_dir job_limit life_span no_cleanup retry_throwing_jobs) ], @args);
no_write debug worker_output_dir hive_output_dir job_limit life_span no_cleanup retry_throwing_jobs compile_module_once) ], @args);
if($logic_name) {
if($analysis_id) {
......@@ -243,10 +243,14 @@ sub create_new_worker {
if(defined $retry_throwing_jobs) {
$worker->retry_throwing_jobs($retry_throwing_jobs);
}
if(defined $compile_module_once) {
$worker->compile_module_once($compile_module_once);
}
return $worker;
}
sub register_worker_death {
my ($self, $worker) = @_;
......
......@@ -327,6 +327,15 @@ sub last_check_in {
return $self->{'_last_check_in'};
}
sub runnable_object {
my $self = shift @_;
$self->{'_runnable_object'} = shift @_ if(@_);
return $self->{'_runnable_object'};
}
# this is a setter/getter that defines default behaviour when a job throws: should it be retried or not?
sub retry_throwing_jobs {
......@@ -336,6 +345,13 @@ sub retry_throwing_jobs {
return defined($self->{'_retry_throwing_jobs'}) ? $self->{'_retry_throwing_jobs'} : 1;
}
sub compile_module_once {
my $self = shift @_;
$self->{'_compile_module_once'} = shift @_ if(@_);
return $self->{'_compile_module_once'} ;
}
=head2 hive_output_dir
Arg [1] : (optional) string directory path
......@@ -494,6 +510,13 @@ sub run {
$self->print_worker();
}
if( $self->compile_module_once() ) {
$self->enter_status('COMPILATION');
my $runnable_object = $self->analysis->process or die "Unknown compilation error";
$self->runnable_object( $runnable_object );
$self->enter_status('READY');
}
$self->db->dbc->disconnect_when_inactive(0);
my $min_batch_time = $self->analysis->stats->min_batch_time();
......@@ -602,13 +625,19 @@ sub run_one_batch {
eval { # capture any throw/die
$job->incomplete(1);
$self->enter_status('COMPILATION', $job); # ToDo: when Runnables are ready, switch to compiling once per batch (saves time)
my $runnable_db = $self->analysis->process or die "Unknown compilation error";
my $runnable_object;
if( $self->compile_module_once() ) {
$runnable_object = $self->runnable_object();
} else {
$self->enter_status('COMPILATION', $job);
$runnable_object = $self->analysis->process or die "Unknown compilation error";
}
$self->db->dbc->query_count(0);
$job_stopwatch->restart();
$self->run_module_with_job($runnable_db, $job);
$self->run_module_with_job($runnable_object, $job);
$job->incomplete(0);
};
......@@ -668,33 +697,33 @@ sub run_one_batch {
sub run_module_with_job {
my ($self, $runnable_db, $job) = @_;
my ($self, $runnable_object, $job) = @_;
$runnable_db->input_job( $job );
$runnable_db->db( $self->db );
$runnable_db->worker( $self );
$runnable_db->debug( $self->debug );
$runnable_object->input_job( $job );
$runnable_object->db( $self->db );
$runnable_object->worker( $self );
$runnable_object->debug( $self->debug );
$job->param_init( $runnable_db->strict_hash_format(), $runnable_db->param_defaults(), $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() );
$job->param_init( $runnable_object->strict_hash_format(), $runnable_object->param_defaults(), $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() );
$job->autoflow(1);
$self->enter_status('GET_INPUT', $job);
$self->{'fetching_stopwatch'}->continue();
$runnable_db->fetch_input;
$runnable_object->fetch_input;
$self->{'fetching_stopwatch'}->pause();
$self->enter_status('RUN', $job);
$self->{'running_stopwatch'}->continue();
$runnable_db->run;
$runnable_object->run;
$self->{'running_stopwatch'}->pause();
if($self->execute_writes) {
$self->enter_status('WRITE_OUTPUT', $job);
$self->{'writing_stopwatch'}->continue();
$runnable_db->write_output;
$runnable_object->write_output;
$self->{'writing_stopwatch'}->pause();
if( $job->autoflow ) {
......
......@@ -61,6 +61,7 @@ sub main {
$self->{'sleep_minutes'} = 1;
$self->{'verbose_stats'} = 1;
$self->{'retry_throwing_jobs'} = undef;
$self->{'compile_module_once'} = undef;
$self->{'hive_output_dir'} = undef;
GetOptions(
......@@ -96,6 +97,7 @@ sub main {
'logic_name=s' => \$self->{'logic_name'},
'hive_output_dir=s' => \$self->{'hive_output_dir'},
'retry_throwing_jobs=i' => \$self->{'retry_throwing_jobs'},
'compile_module_once=i' => \$self->{'compile_module_once'},
'debug=i' => \$self->{'debug'},
# other commands/options
......@@ -298,7 +300,7 @@ sub generate_worker_cmd {
if ($run_job_id) {
$worker_cmd .= " -job_id $run_job_id";
} else {
foreach my $worker_option ('job_limit', 'life_span', 'logic_name', 'retry_throwing_jobs', 'hive_output_dir', 'debug') {
foreach my $worker_option ('job_limit', 'life_span', 'logic_name', 'retry_throwing_jobs', 'compile_module_once', 'hive_output_dir', 'debug') {
if(defined(my $value = $self->{$worker_option})) {
$worker_cmd .= " -${worker_option} $value";
}
......@@ -449,6 +451,7 @@ __DATA__
-life_span <num> : life_span limit for each worker
-logic_name <string> : restrict the pipeline stat/runs to this analysis logic_name
-retry_throwing_jobs 0|1 : if a job dies *knowingly*, should we retry it by default?
-compile_module_once 0|1 : should we compile the module only once (desired future behaviour), or pretend to do it before every job (current behaviour)?
-hive_output_dir <path> : directory where stdout/stderr of the hive is redirected
-debug <debug_level> : set debug level of the workers
......
......@@ -21,7 +21,7 @@ my $db_conf = {
my ($reg_conf, $reg_alias, $url); # Connection parameters
my ($rc_id, $logic_name, $analysis_id, $input_id, $job_id); # Task specification parameters
my ($job_limit, $life_span, $no_cleanup, $no_write, $hive_output_dir, $worker_output_dir, $retry_throwing_jobs); # Worker control parameters
my ($job_limit, $life_span, $no_cleanup, $no_write, $hive_output_dir, $worker_output_dir, $retry_throwing_jobs, $compile_module_once); # Worker control parameters
my ($help, $debug, $show_analysis_stats);
GetOptions(
......@@ -51,6 +51,7 @@ GetOptions(
'hive_output_dir|outdir=s' => \$hive_output_dir, # keep compatibility with the old name
'worker_output_dir=s' => \$worker_output_dir, # will take precedence over hive_output_dir if set
'retry_throwing_jobs=i' => \$retry_throwing_jobs,
'compile_module_once=i' => \$compile_module_once,
# Other commands
'h|help' => \$help,
......@@ -101,7 +102,7 @@ eval {
-exec_host => $exec_host,
# Task specification:
-rc_id => $rc_id,
-rc_id => $rc_id, # Idea: could/should we use rc_name instead?
-logic_name => $logic_name,
-analysis_id => $analysis_id,
-input_id => $input_id,
......@@ -115,6 +116,7 @@ eval {
-worker_output_dir => $worker_output_dir,
-hive_output_dir => $hive_output_dir,
-retry_throwing_jobs => $retry_throwing_jobs,
-compile_module_once => $compile_module_once,
# Other parameters:
-debug => $debug,
......@@ -208,6 +210,7 @@ __DATA__
-hive_output_dir <path> : directory where stdout/stderr of the whole hive of workers is redirected
-worker_output_dir <path> : directory where stdout/stderr of this particular worker is redirected
-retry_throwing_jobs <0|1> : if a job dies *knowingly*, should we retry it by default?
-compile_module_once 0|1 : should we compile the module only once (desired future behaviour), or pretend to do it before every job (current behaviour)?
=head2 Other options:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment