Commit cd24c5cc authored by Leo Gordon's avatar Leo Gordon
Browse files

Improved output to distinguish multiple Workers' output in the same stream

parent f61411a4
......@@ -359,6 +359,13 @@ sub dataflow_output_id {
}
sub toString {
my $self = shift @_;
return 'Job '.$self->dbID." input_id='".$self->input_id."', retry=".$self->retry_count;
}
sub print_job {
my $self = shift;
my $logic_name = $self->adaptor()
......
......@@ -290,13 +290,19 @@ sub job_count_breakout {
sub print_stats {
my $self = shift;
my $self = shift @_;
printf("%-27s(%2d) ", $self->get_analysis->logic_name, $self->analysis_id );
print $self->toString."\n";
}
sub toString {
my $self = shift @_;
my $analysis = $self->get_analysis;
printf("%-27s(%2d) %11s jobs(Sem:%d, Rdy:%d, InProg:%d, Done+Pass:%d, Fail:%d)=%d Ave_msec:%d, workers(Running:%d, Reqired:%d) ",
$analysis->logic_name,
$self->analysis_id,
my $output = sprintf("%11s jobs(Sem:%d, Rdy:%d, InProg:%d, Done+Pass:%d, Fail:%d)=%d Ave_msec:%d, workers(Running:%d, Reqired:%d) ",
$self->status,
$self->semaphored_job_count,
......@@ -311,9 +317,11 @@ sub print_stats {
$self->num_running_workers,
$self->num_required_workers,
);
print ' h.cap:' .( defined($self->hive_capacity) ? $self->hive_capacity : '-' )
.' a.cap:' .( defined($analysis->analysis_capacity) ? $analysis->analysis_capacity : '-')
." (sync'd " .$self->seconds_since_last_update." sec ago)\n",
$output .= ' h.cap:' .( defined($self->hive_capacity) ? $self->hive_capacity : '-' )
.' a.cap:' .( defined($analysis->analysis_capacity) ? $analysis->analysis_capacity : '-')
." (sync'd " .$self->seconds_since_last_update." sec ago)",
return $output;
}
......
......@@ -166,18 +166,16 @@ sub life_cycle {
sub enter_status {
my ($self, $status) = @_;
my $job = $self->input_job();
my $worker = $self->worker();
my $job = $self->input_job();
if($self->debug) {
print STDERR "\nworker_id=".($worker ? $worker->dbID : '(standalone)').($job ? ', job_id='.$job->dbID : ''). " : $status\n";
}
if($job) {
$job->update_status( $status );
}
if($worker) {
$worker->status( $status );
$worker->adaptor->check_in_worker( $worker );
$job->update_status( $status );
my $status_msg = 'Job '.$job->dbID.' : '.$status;
if(my $worker = $self->worker) {
$worker->enter_status( $status, $status_msg );
} elsif($self->debug) {
print STDERR "Standalone$status_msg\n";
}
}
......
......@@ -259,8 +259,6 @@ sub specialize_new_worker {
# probably scheduled by beekeeper.pl:
} elsif( $stats = $self->suggest_analysis_to_specialize_by_rc_id_meadow_type($worker->resource_class_id, $worker->meadow_type) ) {
print "Queen picked analysis with dbID=".$stats->analysis_id." for the worker\n";
$worker->analysis( undef ); # make sure we reset anything that was there before
$analysis_id = $stats->analysis_id;
} else {
......
......@@ -423,31 +423,28 @@ sub get_stderr_redirector {
}
sub print_worker {
my $self = shift;
sub worker_say {
my ($self, $msg) = @_;
print $self->toString()."\n";
print("\tbatch_size = ", $self->analysis->stats->get_or_estimate_batch_size(),"\n");
my $job_limit = $self->job_limiter->available_capacity();
print("\tjob_limit = $job_limit\n") if(defined($job_limit));
print("\tlife_span = ", $self->life_span,"\n") if(defined($self->life_span));
if(my $worker_log_dir = $self->log_dir) {
print("\tworker_log_dir = $worker_log_dir\n");
} else {
print("\tworker_log_dir = STDOUT/STDERR\n");
}
my $worker_id = $self->dbID();
my $analysis_name = $self->analysis_id ? $self->analysis->logic_name.'('.$self->analysis_id.')' : '';
print "Worker $worker_id [ $analysis_name ] $msg\n";
}
sub toString {
my $self = shift @_;
return "Worker:\t".join(', ',
return join(', ',
'analysis='.($self->analysis_id ? $self->analysis->logic_name.'('.$self->analysis_id.')' : 'UNSPECIALIZED'),
'resource_class_id='.($self->resource_class_id || 'NULL'),
'meadow='.$self->meadow_type.'/'.$self->meadow_name,
'process='.$self->process_id.'@'.$self->host,
'resource_class_id='.($self->resource_class_id || 'NULL'),
'last_check_in='.$self->last_check_in,
'analysis='.$self->analysis->logic_name.'('.$self->analysis->dbID.')',
'batch_size='.($self->analysis_id ? $self->analysis->stats->get_or_estimate_batch_size() : 'UNSPECIALIZED'),
'job_limit='.($self->job_limiter->available_capacity() || 'NONE'),
'life_span='.($self->life_span || 'UNLIM'),
'worker_log_dir='.($self->log_dir || 'STDOUT/STDERR'),
);
}
......@@ -491,6 +488,8 @@ sub run {
my $min_batch_time = Bio::EnsEMBL::Hive::AnalysisStats::min_batch_time();
my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
print "\n"; # to clear beekeeper's prompt in case output is not logged
$self->worker_say( $self->toString() );
$self->specialize_and_compile_wrapper( $specialization_arglist );
while (!$self->cause_of_death) { # Worker's lifespan loop (ends only when the worker dies for any reason)
......@@ -508,16 +507,16 @@ sub run {
if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_worker_id( $self->dbID ) }) ) {
my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
warn "$msg";
$self->worker_say( $msg );
$self->cause_of_death('CONTAMINATED');
$job_adaptor->release_undone_jobs_from_worker($self, $msg);
} elsif( $self->job_limiter->reached()) {
print "job_limit reached (".$self->work_done." jobs completed)\n";
$self->worker_say( "job_limit reached (".$self->work_done." jobs completed)" );
$self->cause_of_death('JOB_LIMIT');
} elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
print "life_span limit reached (alive for $alive_for_secs secs)\n";
$self->worker_say( "life_span limit reached (alive for $alive_for_secs secs)" );
$self->cause_of_death('LIFESPAN');
} else {
......@@ -584,10 +583,12 @@ sub run {
$self->adaptor->register_worker_death($self);
$self->analysis->stats->print_stats if($self->debug);
if($self->debug) {
$self->worker_say( 'AnalysisStats :'.$self->analysis->stats->toString );
$self->worker_say( 'dbc '.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles' );
}
printf("dbc %d disconnect cycles\n", $self->adaptor->db->dbc->disconnect_count);
print("total jobs completed : ", $self->work_done, "\n");
$self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death );
if( $self->log_dir ) {
$self->get_stdout_redirector->pop();
......@@ -603,17 +604,19 @@ sub specialize_and_compile_wrapper {
$self->enter_status('SPECIALIZATION');
my $respecialization_from = $self->analysis_id && $self->analysis->logic_name.'('.$self->analysis_id.')';
$self->adaptor->specialize_new_worker( $self, $specialization_arglist ? @$specialization_arglist : () );
my $specialization_to = $self->analysis->logic_name.'('.$self->analysis_id.')';
if($respecialization_from) {
my $respecialization_to = $self->analysis->logic_name.'('.$self->analysis_id.')';
my $msg = "Respecialization from $respecialization_from to $respecialization_to";
warn "\n$msg\n";
my $msg = "respecializing from $respecialization_from to $specialization_to";
$self->worker_say( $msg );
$self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 0 );
} else {
$self->worker_say( "specializing to $specialization_to" );
}
$self->print_worker();
1;
} or do {
my $msg = $@;
warn "Could not specialize worker:\n\t$msg\n";
chomp $msg;
$self->worker_say( "[re]specialization failed:\t$msg" );
$self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 1 );
$self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
......@@ -635,7 +638,7 @@ sub specialize_and_compile_wrapper {
1;
} or do {
my $msg = $@;
warn "Could not compile Runnable '".$self->analysis->module."' :\n\t$msg\n";
$self->worker_say( "runnable '".$self->analysis->module."' compilation failed :\t$msg" );
$self->adaptor->db->get_LogMessageAdaptor()->store_worker_message($self->dbID, $msg, 1 );
$self->cause_of_death('SEE_MSG') unless($self->cause_of_death()); # some specific causes could have been set prior to die "...";
......@@ -656,14 +659,14 @@ sub run_one_batch {
$self->adaptor->safe_synchronize_AnalysisStats($self->analysis->stats);
if($self->debug) {
$self->analysis->stats->print_stats;
print "claimed ".scalar(@{$jobs})." jobs to process\n";
$self->worker_say( "Stats : ".$self->analysis->stats->toString );
$self->worker_say( 'claimed '.scalar(@{$jobs}).' jobs to process' );
}
my $job_partial_timing;
ONE_BATCH: while(my $job = shift @$jobs) { # to make sure jobs go out of scope without undue delay
$job->print_job if($self->debug);
$self->worker_say( $job->toString ) if($self->debug);
my $job_stopwatch = Bio::EnsEMBL::Hive::Utils::Stopwatch->new();
$job_partial_timing = {};
......@@ -686,9 +689,7 @@ sub run_one_batch {
$accu_adaptor->fetch_structures_for_job_id( $job->dbID ), # FIXME: or should we pass in the original hash to be extended by pushing?
);
if($self->debug()) {
print "\nunsubstituted_param_hash = ".stringify($job->{'_unsubstituted_param_hash'})."\n";
}
$self->worker_say( 'Job '.$job->dbID." unsubstituted_params= ".stringify($job->{'_unsubstituted_param_hash'}) ) if($self->debug());
$runnable_object->input_job( $job ); # "take" the job
$job_partial_timing = $runnable_object->life_cycle();
......@@ -702,18 +703,18 @@ sub run_one_batch {
$job->query_count( $self->adaptor->db->dbc->query_count );
my $job_id = $job->dbID();
my $job_completion_line = "\njob $job_id : complete\n";
my $job_completion_line = "Job $job_id : complete";
if($msg_thrown) { # record the message - whether it was a success or failure:
my $job_status_at_the_moment = $job->status();
my $action = $job->incomplete ? 'died' : 'exited';
$job_completion_line = "\njob $job_id : $action in status '$job_status_at_the_moment' for the following reason: $msg_thrown\n";
$job_completion_line = "Job $job_id : $action in status '$job_status_at_the_moment' for the following reason: $msg_thrown";
$self->adaptor->db->get_LogMessageAdaptor()->store_job_message($job_id, $msg_thrown, $job->incomplete );
}
print STDERR $job_completion_line if($self->log_dir and ($self->debug or $job->incomplete)); # one copy goes to the job's STDERR
print STDERR "\n$job_completion_line\n" if($self->log_dir and ($self->debug or $job->incomplete)); # one copy goes to the job's STDERR
$self->stop_job_output_redirection($job); # and then we switch back to worker's STDERR
print STDERR $job_completion_line; # one copy goes to the worker's STDERR
$self->worker_say( $job_completion_line ); # one copy goes to the worker's STDERR
if($job->incomplete) {
# If the job specifically said what to do next, respect that last wish.
......@@ -727,7 +728,7 @@ sub run_one_batch {
or $job->lethal_for_worker ) { # trust the job's expert knowledge
my $reason = $self->prev_job_error ? 'two failed jobs in a row'
: 'suggested by job itself';
warn "Job's error has contaminated the Worker ($reason), so the Worker will now die\n";
$self->worker_say( "Job's error has contaminated the Worker ($reason), so the Worker will now die" );
$self->cause_of_death('CONTAMINATED');
last ONE_BATCH;
}
......@@ -741,7 +742,7 @@ sub run_one_batch {
}
if($job->lethal_for_worker) {
warn "The Job, although complete, wants the Worker to die\n";
$self->worker_say( "The Job, although complete, wants the Worker to die" );
$self->cause_of_death('CONTAMINATED');
last ONE_BATCH;
}
......@@ -756,15 +757,14 @@ sub run_one_batch {
sub enter_status {
my ($self, $status, $job) = @_;
my ($self, $status, $msg) = @_;
$msg ||= ": $status";
if($self->debug) {
print STDERR "\n". ($job ? 'job '.$job->dbID : 'worker'). " : $status\n";
$self->worker_say( $msg );
}
if($job) {
$job->update_status( $status );
}
$self->status( $status );
$self->adaptor->check_in_worker( $self );
}
......@@ -794,12 +794,12 @@ sub stop_job_output_redirection {
my $force_cleanup = !($self->debug || $job->incomplete);
if($force_cleanup or -z $job->stdout_file) {
warn "Deleting '".$job->stdout_file."' file\n";
$self->worker_say( "Deleting '".$job->stdout_file."' file" );
unlink $job->stdout_file;
$job->stdout_file(undef);
}
if($force_cleanup or -z $job->stderr_file) {
warn "Deleting '".$job->stderr_file."' file\n";
$self->worker_say( "Deleting '".$job->stderr_file."' file" );
unlink $job->stderr_file;
$job->stderr_file(undef);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment