Commit faead1e0 authored by Jessica Severin's avatar Jessica Severin
Browse files

added system for job-level blocking/unblocking. This is a very fine grain

control structure where a process/program has been made aware of the job(s)
they are responsible for controlling.  This is facilited via a job url:
   mysql://ia64e:3306/jessica_compara32b_tree/analysis_job?dbID=6065355
AnalysisJobAdptor::CreateNewJob now returns this url on job creation.
When a job is datflowed, an array of these urls is returned (one for each rule).
Jobs can now be dataflowed from a Process subclass with blocking enabled.
A job can be fetched directly with one of these URLs.
A commandline utility ehive_unblock.pl has been added to unblock a url job.
To unblock a job do:
   Bio::EnsEMBL::Hive::URLFactory->fetch($url)->update_status('READY');
This is primarily useful in asynchronous split process/parsing situations.
parent b2cae78f
......@@ -221,6 +221,25 @@ sub fetch_all_failed_jobs {
}
sub fetch_by_url_query
{
my $self = shift;
my $query = shift;
return undef unless($query);
#print("Bio::EnsEMBL::DBSQL::AnalysisAdaptor::fetch_by_url_query : $query\n");
if((my $p=index($query, "=")) != -1) {
my $type = substr($query,0, $p);
my $value = substr($query,$p+1,length($query));
if($type eq 'dbID') {
return $self->fetch_by_dbID($value);
}
}
return undef;
}
#
# INTERNAL METHODS
#
......
......@@ -160,7 +160,7 @@ sub autoflow_inputjob {
=cut
sub dataflow_output_id {
my ($self, $output_id, $branch_code) = @_;
my ($self, $output_id, $branch_code, $blocked) = @_;
return unless($output_id);
return unless($self->analysis);
......@@ -176,12 +176,13 @@ sub dataflow_output_id {
$job->input_id($output_id);
$job->analysis_id($self->analysis->dbID);
$job->branch_code($branch_code);
$job->dbID($self->input_job->dbID);
$job->dbID($self->input_job->dbID);
$job->status('BLOCKED') if(defined($blocked) and ($blocked eq 'BLOCKED'));
#if process uses branch_code 1 explicitly, turn off automatic dataflow
$self->autoflow_inputjob(0) if($branch_code==1);
$self->queen->flow_output_job($job);
return $self->queen->flow_output_job($job);
}
sub debug {
......
......@@ -293,15 +293,23 @@ sub flow_output_job {
my $job = shift;
return unless($job);
my $create_blocked_job = 0;
$create_blocked_job = 1 if($job->status eq 'BLOCKED');
my @output_jobs;
my $rules = $self->db->get_DataflowRuleAdaptor->fetch_from_analysis_job($job);
foreach my $rule (@{$rules}) {
Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob (
my $job_id = Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor->CreateNewJob (
-input_id => $job->input_id,
-analysis => $rule->to_analysis,
-input_job_id => $job->dbID,
-block => $create_blocked_job
);
my $job_url = $rule->to_analysis->adaptor->db->dbc->url;
$job_url .= "/analysis_job?dbID=" . $job_id;
push @output_jobs, $job_url;
}
return \@output_jobs;
}
......
......@@ -102,6 +102,11 @@ sub fetch
my $adaptor = $dba->get_AnalysisAdaptor;
return $adaptor->fetch_by_url_query($query);
}
if($table eq 'analysis_job') {
#my $adaptor = new Bio::EnsEMBL::DBSQL::AnalysisAdaptor($dba);
my $adaptor = $dba->get_AnalysisJobAdaptor;
return $adaptor->fetch_by_url_query($query);
}
}
return undef;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment