Skip to content
Snippets Groups Projects
Commit a76de6b6 authored by Leo Gordon's avatar Leo Gordon
Browse files

allow Runnables to cut corners

parent adc1a38a
No related branches found
No related tags found
No related merge requests found
......@@ -119,7 +119,9 @@ sub new {
=cut
sub fetch_input {
my $self = shift;
my $self = shift;
return 1;
}
=head2 run
......@@ -133,7 +135,9 @@ sub fetch_input {
=cut
sub run {
my $self = shift;
my $self = shift;
return 1;
}
=head2 write_output
......@@ -146,7 +150,9 @@ sub run {
=cut
sub write_output {
my $self = shift;
my $self = shift;
return 1;
}
=head2 DESTROY
......@@ -159,8 +165,9 @@ sub write_output {
=cut
sub DESTROY {
my $self = shift;
$self->SUPER::DESTROY if $self->can("SUPER::DESTROY");
my $self = shift;
$self->SUPER::DESTROY if $self->can("SUPER::DESTROY");
}
......
......@@ -613,49 +613,55 @@ sub run_module_with_job {
$runObj->db($self->db);
}
my $analysis_stats = $self->analysis->stats;
$self->enter_status('GET_INPUT');
$job->update_status('GET_INPUT');
print("\nGET_INPUT\n") if($self->debug);
$start_time = time() * 1000;
$runObj->fetch_input;
$end_time = time() * 1000;
$self->{fetch_time} += $end_time - $start_time;
$self->enter_status('RUN');
$job->update_status('RUN');
print("\nRUN\n") if($self->debug);
$start_time = time() * 1000;
$runObj->run;
$end_time = time() * 1000;
$self->{run_time} += $end_time - $start_time;
if($self->execute_writes) {
$self->enter_status('WRITE_OUTPUT');
$job->update_status('WRITE_OUTPUT');
print("\nWRITE_OUTPUT\n") if($self->debug);
$start_time = time() * 1000;
$runObj->write_output;
$end_time = time() * 1000;
$self->{write_time} += $end_time - $start_time;
if( $native_hive_process and $runObj->autoflow_inputjob ) {
printf("AUTOFLOW input->output\n") if($self->debug);
$runObj->dataflow_output_id();
CARRY_ON: {
my $carry_on = 1;
$self->enter_status('GET_INPUT');
$job->update_status('GET_INPUT');
print("\nGET_INPUT\n") if($self->debug);
$start_time = time() * 1000;
$carry_on = $runObj->fetch_input;
$end_time = time() * 1000;
$self->{fetch_time} += $end_time - $start_time;
last CARRY_ON if(defined($carry_on) and $carry_on==0); # if 0 is returned, leave early
$self->enter_status('RUN');
$job->update_status('RUN');
print("\nRUN\n") if($self->debug);
$start_time = time() * 1000;
$carry_on = $runObj->run;
$end_time = time() * 1000;
$self->{run_time} += $end_time - $start_time;
last CARRY_ON if(defined($carry_on) and $carry_on==0); # if 0 is returned, leave early
if($self->execute_writes) {
$self->enter_status('WRITE_OUTPUT');
$job->update_status('WRITE_OUTPUT');
print("\nWRITE_OUTPUT\n") if($self->debug);
$start_time = time() * 1000;
$runObj->write_output;
$end_time = time() * 1000;
$self->{write_time} += $end_time - $start_time;
if( $native_hive_process and $runObj->autoflow_inputjob ) {
printf("AUTOFLOW input->output\n") if($self->debug);
$runObj->dataflow_output_id();
}
} else {
print("\n\n!!!! NOT write_output\n\n\n") if($self->debug);
}
}
} else {
print("\n\n!!!! NOT write_output\n\n\n") if($self->debug);
}
$job->query_count($self->queen->dbc->query_count);
$job->runtime_msec(time()*1000 - $init_time);
$job->query_count($self->queen->dbc->query_count);
$job->runtime_msec(time()*1000 - $init_time);
$job->update_status('DONE');
$self->enter_status('READY');
$job->update_status('DONE');
$self->enter_status('READY');
}
sub enter_status {
......@@ -710,19 +716,6 @@ sub stop_job_output_redirection {
}
}
# Does not seem to be used anywhere?
#
sub check_system_load {
my $self = shift;
my $host = hostname;
my $numCpus = `grep -c '^process' /proc/cpuinfo`;
print("host: $host cpus:$numCpus\n");
return 1; #everything ok
}
sub _specific_job {
my $self = shift;
$self->{'_specific_job'} = shift if(@_);
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment