diff --git a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm index 1a4d0edd65ac9de3e66cf66c78d0bffc458ea854..fa9c13b0453d28bc0608c38aecb18d5c17fed5c5 100644 --- a/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm +++ b/modules/Bio/EnsEMBL/Hive/DBSQL/AnalysisJobAdaptor.pm @@ -42,6 +42,7 @@ package Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor; use strict; use Bio::EnsEMBL::Hive::Worker; use Bio::EnsEMBL::Hive::AnalysisJob; +use Bio::EnsEMBL::Hive::DBSQL::AnalysisDataAdaptor; use Bio::EnsEMBL::DBSQL::BaseAdaptor; use Sys::Hostname; use Data::UUID; @@ -62,22 +63,28 @@ sub CreateNewJob { return undef unless(scalar @args); - my ($input_id, $analysis, $input_analysis_job_id, $blocked) = + my ($input_id, $analysis, $prev_analysis_job_id, $blocked) = rearrange([qw(INPUT_ID ANALYSIS input_job_id BLOCK )], @args); - $input_analysis_job_id=0 unless($input_analysis_job_id); + $prev_analysis_job_id=0 unless($prev_analysis_job_id); throw("must define input_id") unless($input_id); throw("must define analysis") unless($analysis); throw("analysis must be [Bio::EnsEMBL::Analysis] not a [$analysis]") unless($analysis->isa('Bio::EnsEMBL::Analysis')); + throw("analysis must have adaptor connected to database") + unless($analysis->adaptor and $analysis->adaptor->db); + my $dbc = $analysis->adaptor->db->dbc; + + my $dataDBA = new Bio::EnsEMBL::Hive::DBSQL::AnalysisDataAdaptor($dbc); + my $input_analysis_data_id = $dataDBA->store($input_id); + my $sql = "INSERT ignore into analysis_job ". - " SET input_id=\"$input_id\" ". - " ,input_analysis_job_id='$input_analysis_job_id' ". + " SET input_analysis_data_id=\"$input_analysis_data_id\" ". + " ,prev_analysis_job_id='$prev_analysis_job_id' ". " ,analysis_id='".$analysis->dbID ."' "; $sql .= " ,status='BLOCKED', job_claim='BLOCKED'" if($blocked); - my $dbc = $analysis->adaptor->db->dbc; my $sth = $dbc->prepare($sql); $sth->execute(); my $dbID = $sth->{'mysql_insertid'}; @@ -239,9 +246,10 @@ sub _columns { my $self = shift; return qw (a.analysis_job_id - a.input_analysis_job_id + a.prev_analysis_job_id a.analysis_id - a.input_id + a.input_analysis_data_id + a.output_analysis_data_id a.job_claim a.hive_id a.status @@ -265,7 +273,6 @@ sub _objs_from_sth { $job->dbID($column{'analysis_job_id'}); $job->analysis_id($column{'analysis_id'}); - $job->input_id($column{'input_id'}); $job->job_claim($column{'job_claim'}); $job->hive_id($column{'hive_id'}); $job->status($column{'status'}); @@ -274,6 +281,10 @@ sub _objs_from_sth { $job->branch_code($column{'branch_code'}); $job->adaptor($self); + my $input_id = $self->db->get_AnalysisDataAdaptor-> + fetch_by_dbID($column{'input_analysis_data_id'}); + $job->input_id($input_id); + push @jobs, $job; } $sth->finish; diff --git a/sql/tables.sql b/sql/tables.sql index 4dfc880955c45d5120aa3bb43c44a6270168e93a..42fe815cb8ad5bc14402cbb598866a6423cd94df 100644 --- a/sql/tables.sql +++ b/sql/tables.sql @@ -135,31 +135,35 @@ CREATE TABLE analysis_ctrl_rule ( -- by workers, are updated as the work is done, with a final update on completion. -- -- semantics: --- analysis_job_id - autoincrement id --- input_analysis_job_id - previous analysis_job which created this one (and passed input_id) --- analysis_id - the analysis_id needed to accomplish this job. --- input_id - input data passed into Analysis:RunnableDB to control the work --- job_claim - UUID set by workers as the fight over jobs --- hive_id - link to hive table to define which worker claimed this job --- status - state the job is in --- retry_count - number times job had to be reset when worker failed to run it --- completed - timestamp when job was completed --- branch_code - switch-like branching control, default=1 (ie true) +-- analysis_job_id - autoincrement id +-- prev_analysis_job_id - previous analysis_job which created this one (and passed input_id) +-- analysis_id - the analysis_id needed to accomplish this job. +-- input_analysis_data_id - input data passed into Analysis:RunnableDB to control the work +-- foreign key join to analysis_data table +-- output_analysis_data_id - ouput data passed from Analysis:RunnableDB to be passed to next job +-- foreign key join to analysis_data table +-- job_claim - UUID set by workers as the fight over jobs +-- hive_id - link to hive table to define which worker claimed this job +-- status - state the job is in +-- retry_count - number times job had to be reset when worker failed to run it +-- completed - timestamp when job was completed +-- branch_code - switch-like branching control, default=1 (ie true) CREATE TABLE analysis_job ( - analysis_job_id int(10) NOT NULL auto_increment, - input_analysis_job_id int(10) NOT NULL, #analysis_job which created this from rules - analysis_id int(10) NOT NULL, - input_id varchar(100) not null, - job_claim varchar(40) NOT NULL default '', #UUID - hive_id int(10) NOT NULL, - status enum('READY','BLOCKED','CLAIMED','GET_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED') DEFAULT 'READY' NOT NULL, - retry_count int(10) default 0 not NULL, - completed datetime NOT NULL, - branch_code int(10) default 1 NOT NULL, + analysis_job_id int(10) NOT NULL auto_increment, + prev_analysis_job_id int(10) NOT NULL, #analysis_job which created this from rules + analysis_id int(10) NOT NULL, + input_analysis_data_id int(10) NOT NULL, + output_analysis_data_id int(10) default 0 NOT NULL, + job_claim varchar(40) NOT NULL default '', #UUID + hive_id int(10) NOT NULL, + status enum('READY','BLOCKED','CLAIMED','GET_INPUT','RUN','WRITE_OUTPUT','DONE','FAILED') DEFAULT 'READY' NOT NULL, + retry_count int(10) default 0 not NULL, + completed datetime NOT NULL, + branch_code int(10) default 1 NOT NULL, PRIMARY KEY (analysis_job_id), - UNIQUE KEY input_id_analysis (input_id, analysis_id), + UNIQUE KEY input_id_analysis (input_analysis_data_id, analysis_id), INDEX claim_analysis_status (job_claim, analysis_id, status), INDEX analysis_status (analysis_id, status), INDEX hive_id (hive_id)