Commit 5958eba7 authored by Leo Gordon's avatar Leo Gordon
Browse files

added two states, PRE_CLEANUP (conditional) and POST_CLEANUP (unconditional)...

added two states, PRE_CLEANUP (conditional) and POST_CLEANUP (unconditional) to the life cycle of the Job
parent caf40dcf
......@@ -17,23 +17,24 @@
$self->input_id, $self->analysis and several other variables.
From this input and configuration data, each Process can then proceed to
do something. The flow of execution within a Process is:
fetch_input();
run();
write_output();
DESTROY
The developer can implement their own versions of fetch_input, run,
write_output, and DESTROY to do what they need.
pre_cleanup() if($retry_count>0); # clean up databases/filesystem before subsequent attempts
fetch_input(); # fetch the data from databases/filesystems
run(); # perform the main computation
write_output(); # record the results in databases/filesystems
post_cleanup(); # destroy all non-trivial data structures after the job is done
The developer can implement their own versions of
pre_cleanup, fetch_input, run, write_output, and post_cleanup to do what they need.
The entire system is based around the concept of a workflow graph which
can split and loop back on itself. This is accomplished by dataflow
rules (or pipes) that connect one Process (or analysis) to others.
Where a unix commandline program can send output on STDOUT STDERR pipes,
rules (similar to Unix pipes) that connect one Process (or analysis) to others.
Where a Unix command line program can send output on STDOUT STDERR pipes,
a hive Process has access to unlimited pipes referenced by numerical
branch_codes. This is accomplished within the Process via
$self->dataflow_output_id(...);
The design philosophy is that each Process does its work and creates output,
but it doesn't worry about where the input came from, or where it's output
but it doesn't worry about where the input came from, or where its output
goes. If the system has dataflow pipes connected, then the output jobs
have purpose, if not - the output work is thrown away. The workflow graph
'controls' the behaviour of the system, not the processes. The processes just
......@@ -136,6 +137,20 @@ sub param_defaults {
}
=head2 pre_cleanup
Title : pre_cleanup
Function: sublcass can implement functions related to cleaning up the database/filesystem after the previous unsuccessful run.
=cut
# sub pre_cleanup {
# my $self = shift;
#
# return 1;
# }
=head2 fetch_input
Title : fetch_input
......@@ -153,6 +168,7 @@ sub fetch_input {
return 1;
}
=head2 run
Title : run
......@@ -169,6 +185,7 @@ sub run {
return 1;
}
=head2 write_output
Title : write_output
......@@ -184,20 +201,20 @@ sub write_output {
return 1;
}
=head2 DESTROY
Title : DESTROY
Function: sublcass can implement functions related to cleanup and release.
Typical activities includes freeing datastructures or
closing files.
=head2 post_cleanup
Title : post_cleanup
Function: sublcass can implement functions related to cleaning up after running one job
(destroying non-trivial data structures in memory).
=cut
sub DESTROY {
my $self = shift;
$self->SUPER::DESTROY if $self->can("SUPER::DESTROY");
}
#sub post_cleanup {
# my $self = shift;
#
# return 1;
#}
######################################################
......
......@@ -76,6 +76,19 @@ sub param_defaults {
};
}
=head2 pre_cleanup
Title : pre_cleanup
Function: sublcass can implement functions related to cleaning up the database/filesystem after the previous unsuccessful run.
Here we just define it to see when the job gets into this state.
=cut
sub pre_cleanup {
}
=head2 fetch_input
Description : Implements fetch_input() interface method of Bio::EnsEMBL::Hive::Process that is used to read in parameters and load data.
......@@ -89,6 +102,7 @@ sub fetch_input {
$self->dangerous_math('FETCH_INPUT');
}
=head2 run
Description : Implements run() interface method of Bio::EnsEMBL::Hive::Process that is used to perform the main bulk of the job (minus input and output).
......@@ -102,6 +116,7 @@ sub run {
$self->dangerous_math('RUN');
}
=head2 write_output
Description : Implements write_output() interface method of Bio::EnsEMBL::Hive::Process that is used to deal with job's output after the execution.
......@@ -115,6 +130,20 @@ sub write_output {
$self->dangerous_math('WRITE_OUTPUT');
}
=head2 post_cleanup
Title : post_cleanup
Function: sublcass can implement functions related to cleaning up after running one job
(destroying non-trivial data structures in memory).
Here we just define it to see when the job gets into this state.
=cut
sub post_cleanup {
}
=head2 dangerous_math
Description: an internal subroutine that will first sleep for some predefined time,
......
......@@ -707,21 +707,23 @@ sub run_module_with_job {
$job->param_init( $runnable_object->strict_hash_format(), $runnable_object->param_defaults(), $self->db->get_MetaContainer->get_param_hash(), $self->analysis->parameters(), $job->input_id() );
$job->autoflow(1);
$self->enter_status('FETCH_INPUT', $job);
if( $runnable_object->can('pre_cleanup') and $job->retry_count()>0 ) {
$self->enter_status('PRE_CLEANUP', $job);
$runnable_object->pre_cleanup;
}
$self->enter_status('FETCH_INPUT', $job);
$self->{'fetching_stopwatch'}->continue();
$runnable_object->fetch_input;
$self->{'fetching_stopwatch'}->pause();
$self->enter_status('RUN', $job);
$self->{'running_stopwatch'}->continue();
$runnable_object->run;
$self->{'running_stopwatch'}->pause();
if($self->execute_writes) {
$self->enter_status('WRITE_OUTPUT', $job);
$self->{'writing_stopwatch'}->continue();
$runnable_object->write_output;
$self->{'writing_stopwatch'}->pause();
......@@ -734,6 +736,11 @@ sub run_module_with_job {
print STDERR "\n!!! *no* WRITE_OUTPUT requested, so there will be no AUTOFLOW\n" if($self->debug);
}
if( $runnable_object->can('post_cleanup') ) { # Todo: may need to run it after the eval, to clean up the memory even after partially failed attempts?
$self->enter_status('POST_CLEANUP', $job);
$runnable_object->post_cleanup;
}
my @zombie_funnel_dataflow_rule_ids = keys %{$job->fan_cache};
if( scalar(@zombie_funnel_dataflow_rule_ids) ) {
$job->transient_error(0);
......
## Introduce two more states, 'PRE_CLEANUP' (conditional, to clean up after previous failed run) and 'POST_CLEANUP' (unconditional, to clean up after the current run)
ALTER TABLE worker MODIFY COLUMN status enum('READY','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DEAD') DEFAULT 'READY' NOT NULL;
ALTER TABLE job MODIFY COLUMN status enum('READY','BLOCKED','CLAIMED','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL;
ALTER TABLE job_message MODIFY COLUMN status enum('UNKNOWN','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','PASSED_ON') DEFAULT 'UNKNOWN';
......@@ -111,7 +111,7 @@ CREATE TABLE worker (
host varchar(40) DEFAULT NULL,
process_id varchar(40) DEFAULT NULL,
work_done int(11) DEFAULT '0' NOT NULL,
status enum('READY','COMPILATION','FETCH_INPUT','RUN','WRITE_OUTPUT','DEAD') DEFAULT 'READY' NOT NULL,
status enum('READY','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DEAD') DEFAULT 'READY' NOT NULL,
born timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_check_in datetime NOT NULL,
died datetime DEFAULT NULL,
......@@ -224,7 +224,7 @@ CREATE TABLE job (
analysis_id int(10) unsigned NOT NULL,
input_id char(255) NOT NULL,
worker_id int(10) unsigned DEFAULT NULL,
status enum('READY','BLOCKED','CLAIMED','COMPILATION','FETCH_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL,
status enum('READY','BLOCKED','CLAIMED','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','DONE','FAILED','PASSED_ON') DEFAULT 'READY' NOT NULL,
retry_count int(10) default 0 NOT NULL,
completed datetime DEFAULT NULL,
runtime_msec int(10) default NULL,
......@@ -265,7 +265,7 @@ CREATE TABLE job_message (
worker_id int(10) unsigned NOT NULL,
time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
retry int(10) DEFAULT 0 NOT NULL,
status enum('UNKNOWN', 'COMPILATION', 'FETCH_INPUT', 'RUN', 'WRITE_OUTPUT', 'PASSED_ON') DEFAULT 'UNKNOWN',
status enum('UNKNOWN','COMPILATION','PRE_CLEANUP','FETCH_INPUT','RUN','WRITE_OUTPUT','POST_CLEANUP','PASSED_ON') DEFAULT 'UNKNOWN',
msg text,
is_error TINYINT,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment