AnalysisJob.pm 8.93 KB
Newer Older
Jessica Severin's avatar
Jessica Severin committed
1 2 3 4 5 6 7
#
# You may distribute this module under the same terms as perl itself

=pod 

=head1 NAME

Jessica Severin's avatar
Jessica Severin committed
8
  Bio::EnsEMBL::Hive::AnalysisJob
Jessica Severin's avatar
Jessica Severin committed
9 10

=head1 DESCRIPTION
Jessica Severin's avatar
Jessica Severin committed
11

12
  An AnalysisJob is the link between the input_id control data, the analysis and
Jessica Severin's avatar
Jessica Severin committed
13
  the rule system.  It also tracks the state of the job as it is processed
Jessica Severin's avatar
Jessica Severin committed
14 15

=head1 CONTACT
Jessica Severin's avatar
Jessica Severin committed
16

17
  Please contact ehive-users@ebi.ac.uk mailing list with questions/suggestions.
Jessica Severin's avatar
Jessica Severin committed
18 19

=head1 APPENDIX
Jessica Severin's avatar
Jessica Severin committed
20

21 22
  The rest of the documentation details each of the object methods.
  Internal methods are usually preceded with a _
Jessica Severin's avatar
Jessica Severin committed
23

Jessica Severin's avatar
Jessica Severin committed
24 25
=cut

26

Jessica Severin's avatar
Jessica Severin committed
27 28 29 30
package Bio::EnsEMBL::Hive::AnalysisJob;

use strict;

31 32 33 34 35
#use Bio::EnsEMBL::Analysis;
#use Bio::EnsEMBL::DBSQL::DBAdaptor;
#use Bio::EnsEMBL::Hive::Worker;

use base ('Bio::EnsEMBL::Hive::Params');
Jessica Severin's avatar
Jessica Severin committed
36

37 38 39 40 41
sub new {
  my ($class,@args) = @_;
  my $self = bless {}, $class;
  return $self;
}
Jessica Severin's avatar
Jessica Severin committed
42 43 44 45 46 47 48 49 50 51 52 53 54 55

sub adaptor {
  my $self = shift;
  $self->{'_adaptor'} = shift if(@_);
  return $self->{'_adaptor'};
}

sub dbID {
  my $self = shift;
  $self->{'_dbID'} = shift if(@_);
  return $self->{'_dbID'};
}

sub input_id {
56 57
  my $self = shift;
  $self->{'_input_id'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
58 59 60
  return $self->{'_input_id'};
}

Leo Gordon's avatar
Leo Gordon committed
61
sub worker_id {
Jessica Severin's avatar
Jessica Severin committed
62
  my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
63 64
  $self->{'_worker_id'} = shift if(@_);
  return $self->{'_worker_id'};
Jessica Severin's avatar
Jessica Severin committed
65 66 67
}

sub analysis_id {
68 69
  my $self = shift;
  $self->{'_analysis_id'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
70 71 72 73
  return $self->{'_analysis_id'};
}

sub job_claim {
74 75
  my $self = shift;
  $self->{'_job_claim'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
76 77 78 79
  return $self->{'_job_claim'};
}

sub status {
80 81
  my $self = shift;
  $self->{'_status'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
82 83 84
  return $self->{'_status'};
}

85 86 87 88 89 90 91
sub update_status {
  my ($self, $status ) = @_;
  return unless($self->adaptor);
  $self->status($status);
  $self->adaptor->update_status($self);
}

Jessica Severin's avatar
Jessica Severin committed
92
sub retry_count {
93 94
  my $self = shift;
  $self->{'_retry_count'} = shift if(@_);
95
  $self->{'_retry_count'} = 0 unless(defined($self->{'_retry_count'}));
Jessica Severin's avatar
Jessica Severin committed
96 97 98 99
  return $self->{'_retry_count'};
}

sub completed {
100 101
  my $self = shift;
  $self->{'_completed'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
102 103 104
  return $self->{'_completed'};
}

105
sub runtime_msec {
106 107
  my $self = shift;
  $self->{'_runtime_msec'} = shift if(@_);
108 109 110 111 112
  $self->{'_runtime_msec'} = 0 unless(defined($self->{'_runtime_msec'}));
  return $self->{'_runtime_msec'};
}

sub query_count {
113 114
  my $self = shift;
  $self->{'_query_count'} = shift if(@_);
115 116 117 118
  $self->{'_query_count'} = 0 unless(defined($self->{'_query_count'}));
  return $self->{'_query_count'};
}

119 120 121 122 123 124 125 126 127 128 129 130 131
sub semaphore_count {
  my $self = shift;
  $self->{'_semaphore_count'} = shift if(@_);
  $self->{'_semaphore_count'} = 0 unless(defined($self->{'_semaphore_count'}));
  return $self->{'_semaphore_count'};
}

sub semaphored_job_id {
  my $self = shift;
  $self->{'_semaphored_job_id'} = shift if(@_);
  return $self->{'_semaphored_job_id'};
}

Jessica Severin's avatar
Jessica Severin committed
132
sub stdout_file {
133 134
  my $self = shift;
  $self->{'_stdout_file'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
135 136 137 138
  return $self->{'_stdout_file'};
}

sub stderr_file {
139 140
  my $self = shift;
  $self->{'_stderr_file'} = shift if(@_);
Jessica Severin's avatar
Jessica Severin committed
141 142 143
  return $self->{'_stderr_file'};
}

144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165

=head2 autoflow

    Title   :  autoflow
    Function:  Gets/sets flag for whether the job should
               be automatically dataflowed on branch 1 when the job completes.
               If the subclass manually sends a job along branch 1 with dataflow_output_id,
               the autoflow will turn itself off.
    Returns :  boolean (1=default|0)

=cut

sub autoflow {
  my $self = shift;

  $self->{'_autoflow'} = shift if(@_);
  $self->{'_autoflow'} = 1 unless(defined($self->{'_autoflow'}));  

  return $self->{'_autoflow'};
}


166 167
##-----------------[indicators to the Worker]--------------------------------

168

Leo Gordon's avatar
Leo Gordon committed
169 170 171
sub lethal_for_worker {     # Job should set this to 1 prior to dying (or before running code that might cause death - such as RunnableDB's compilation)
                            # if it believes that the state of things will not allow the Worker to continue normally.
                            # The Worker will check the flag and commit suicide if it is set to true.
172 173 174
    my $self = shift;
    $self->{'_lethal_for_worker'} = shift if(@_);
    return $self->{'_lethal_for_worker'};
175 176
}

177 178 179 180 181
sub transient_error {       # Job should set this to 1 prior to dying (or before running code that might cause death)
                            # if it believes that it makes sense to retry the same job without any changes.
                            # It may also set it to 0 prior to dying (or before running code that might cause death)
                            # if it believes that there is no point in re-trying (say, if the parameters are wrong).
                            # The Worker will check the flag and make necessary adjustments to the database state.
182 183 184 185 186 187 188 189 190 191
    my $self = shift;
    $self->{'_transient_error'} = shift if(@_);
    return $self->{'_transient_error'};
}

sub incomplete {            # Job should set this to 0 prior to throwing if the job is done,
                            # but it wants the thrown message to be recorded with is_error=0.
    my $self = shift;
    $self->{'_incomplete'} = shift if(@_);
    return $self->{'_incomplete'};
192 193 194 195
}

##-----------------[/indicators to the Worker]-------------------------------

196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270

=head2 dataflow_output_id

    Title        :  dataflow_output_id
    Arg[1](req)  :  <string> $output_id 
    Arg[2](opt)  :  <int> $branch_code (optional, defaults to 1)
    Arg[3](opt)  :  <hashref> $create_job_options (optional, defaults to {}, options added to the CreateNewJob method)
    Usage        :  $self->dataflow_output_id($output_id, $branch_code);
    Function:  
      If a RunnableDB(Process) needs to create jobs, this allows it to have jobs 
      created and flowed through the dataflow rules of the workflow graph.
      This 'output_id' becomes the 'input_id' of the newly created job at
      the ends of the dataflow pipes.  The optional 'branch_code' determines
      which dataflow pipe(s) to flow the job through.      

=cut

sub dataflow_output_id {
    my ($self, $output_ids, $branch_code, $create_job_options) = @_;

    $output_ids  ||= [ $self->input_id() ];                                 # replicate the input_id in the branch_code's output by default
    $output_ids    = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY');   # force previously used single values into an arrayref

    $branch_code        ||=  1;     # default branch_code is 1
    $create_job_options ||= {};     # { -block => 1 } or { -semaphore_count => scalar(@fan_job_ids) } or { -semaphored_job_id => $funnel_job_id }

        # this tricky code is responsible for correct propagation of semaphores down the dataflow pipes:
    my $propagate_semaphore = not exists ($create_job_options->{'-semaphored_job_id'});     # CONVENTION: if zero is explicitly supplied, it is a request not to propagate

        # However if nothing is supplied, semaphored_job_id will be propagated from the parent job:
    my $semaphored_job_id = $create_job_options->{'-semaphored_job_id'} ||= $self->semaphored_job_id();

        # if branch_code is set to 1 (explicitly or impliticly), turn off automatic dataflow:
    $self->autoflow(0) if($branch_code==1);

    my @output_job_ids = ();
    my $rules       = $self->adaptor->db->get_DataflowRuleAdaptor->fetch_from_analysis_id_branch_code($self->analysis_id, $branch_code);
    foreach my $rule (@{$rules}) {

        my $substituted_template;
        if(my $template = $rule->input_id_template()) {
            $substituted_template = $self->param_substitute($template);
        }

        my $target_analysis_or_table = $rule->to_analysis();

        foreach my $output_id ($substituted_template ? ($substituted_template) : @$output_ids) {

            if($target_analysis_or_table->can('dataflow')) {

                my $insert_id = $target_analysis_or_table->dataflow( $output_id );

            } else {
                if(my $job_id = $self->adaptor->CreateNewJob(
                    -input_id       => $output_id,
                    -analysis       => $target_analysis_or_table,
                    -input_job_id   => $self->dbID,  # creator_job's id
                    %$create_job_options
                )) {
                    if($semaphored_job_id and $propagate_semaphore) {
                        $self->adaptor->increase_semaphore_count_for_jobid( $semaphored_job_id ); # propagate the semaphore
                    }
                        # only add the ones that were indeed created:
                    push @output_job_ids, $job_id;

                } elsif($semaphored_job_id and !$propagate_semaphore) {
                    $self->adaptor->decrease_semaphore_count_for_jobid( $semaphored_job_id );     # if we didn't succeed in creating the job, fix the semaphore
                }
            }
        }
    }
    return \@output_job_ids;
}


Jessica Severin's avatar
Jessica Severin committed
271 272
sub print_job {
  my $self = shift;
Leo Gordon's avatar
Leo Gordon committed
273 274 275 276 277
  my $logic_name = $self->adaptor()
      ? $self->adaptor->db->get_AnalysisAdaptor->fetch_by_dbID($self->analysis_id)->logic_name()
      : '';

  printf("job_id=%d %35s(%5d) retry=%d input_id='%s'\n", 
Jessica Severin's avatar
Jessica Severin committed
278 279 280 281 282
       $self->dbID,
       $logic_name,
       $self->analysis_id,
       $self->retry_count,
       $self->input_id);
Jessica Severin's avatar
Jessica Severin committed
283 284 285
}

1;