Commit 94ef7f09 authored by Leo Gordon's avatar Leo Gordon
Browse files

move the bacct output parser into Meadow::LSF module and also call it from find_out_causes

parent 6f757efc
......@@ -153,25 +153,93 @@ sub kill_worker {
}
sub find_out_causes {
my $self = shift @_;
sub parse_report_source_line {
my $bacct_source_line = shift @_;
my %lsf_2_hive = (
my %status_2_cod = (
'TERM_MEMLIMIT' => 'MEMLIMIT',
'TERM_RUNLIMIT' => 'RUNLIMIT',
'TERM_OWNER' => 'KILLED_BY_USER',
);
my %units_2_megs = (
'K' => 1.0/1024,
'M' => 1,
'G' => 1024,
'T' => 1024*1024,
);
local $/ = "------------------------------------------------------------------------------\n\n";
open(my $bacct_fh, $bacct_source_line);
my $record = <$bacct_fh>; # skip the header
my %report_entry = ();
for my $record (<$bacct_fh>) {
chomp $record;
# warn "RECORD:\n$record";
my @lines = split(/\n/, $record);
if( my ($process_id) = $lines[0]=~/^Job <(\d+(?:\[\d+\])?)>/) {
my ($exit_status, $exception_status) = ('' x 2);
my ($completion_datetime, $cod);
foreach (@lines) {
if( /^(\w+\s+\w+\s+\d+\s+\d+:\d+:\d+):\s+Completed\s<(\w+)>(?:\.|;\s+(\w+))/ ) {
$completion_datetime = $1;
$cod = $status_2_cod{$3};
$exit_status = $2 . ($3 ? "/$3" : '');
}
elsif(/^\s*EXCEPTION STATUS:\s*(.*?)\s*$/) {
$exception_status = $1;
$exception_status =~s/\s+/;/g;
}
}
my (@keys) = split(/\s+/, ' '.$lines[@lines-2]);
my (@values) = split(/\s+/, ' '.$lines[@lines-1]);
my %usage; @usage{@keys} = @values;
#warn join(', ', map {sprintf('%s=%s', $_, $usage{$_})} (sort keys %usage)), "\n";
my ($mem_in_units, $mem_unit) = $usage{'MEM'} =~ /^([\d\.]+)([KMGT])$/;
my ($swap_in_units, $swap_unit) = $usage{'SWAP'} =~ /^([\d\.]+)([KMGT])$/;
$report_entry{ $process_id } = {
'completion_datetime' => $completion_datetime,
'cod' => $cod,
'exit_status' => $exit_status,
'exception_status' => $exception_status,
'mem_megs' => $mem_in_units * $units_2_megs{$mem_unit},
'swap_megs' => $swap_in_units * $units_2_megs{$swap_unit},
'pending_sec' => $usage{'WAIT'},
'cpu_sec' => $usage{'CPU_T'},
'lifespan_sec' => $usage{'TURNAROUND'},
};
}
}
close $bacct_fh;
return \%report_entry;
}
sub find_out_causes {
my $self = shift @_;
my %cod = ();
while (my $pid_batch = join(' ', map { "'$_'" } splice(@_, 0, 20))) { # can't fit too many pids on one shell cmdline
my $cmd = "bacct -l $pid_batch";
my $cmd = "bacct -l $pid_batch |";
# warn "LSF::find_out_causes() running cmd:\n\t$cmd\n";
foreach my $section (split(/\-{10,}\s+/, `$cmd`)) {
if($section=~/^Job <(\d+(?:\[\d+\])?)>.+(TERM_MEMLIMIT|TERM_RUNLIMIT|TERM_OWNER): job killed/is) {
$cod{$1} = $lsf_2_hive{$2};
my $report_entries = parse_report_source_line( $cmd );
while( my ($process_id, $report_entry) = each %$report_entries ) {
if(my $entry_cod = $report_entry->{'cod'}) {
$cod{ $process_id } = $entry_cod;
}
}
}
......
#!/usr/bin/env perl
# Obtain bacct data for your pipeline from the LSF and store it in lsf_report table
# Obtain bacct data for your pipeline from the LSF and store it in 'worker_resource_usage' table
use strict;
use warnings;
......@@ -23,67 +23,6 @@ main();
exit(0);
sub parse_bacct_output {
my $bacct_source_line = shift @_;
local $/ = "------------------------------------------------------------------------------\n\n";
my %units_converter = ( 'K' => 1.0/1024, 'M' => 1, 'G' => 1024, 'T' => 1024*1024 );
open(my $bacct_fh, $bacct_source_line);
my $record = <$bacct_fh>; # skip the header
my %report_entry = ();
for my $record (<$bacct_fh>) {
chomp $record;
# warn "RECORD:\n$record";
my @lines = split(/\n/, $record);
if( my ($process_id) = $lines[0]=~/^Job <(\d+(?:\[\d+\])?)>/) {
my ($exit_status, $exception_status) = ('' x 2);
my ($completion_datetime, $cod);
foreach (@lines) {
if( /^(\w+\s+\w+\s+\d+\s+\d+:\d+:\d+):\s+Completed\s<(\w+)>(?:\.|;\s+(\w+))/ ) {
$completion_datetime = $1;
$cod = $3;
$exit_status = $2 . ($3 ? "/$3" : '');
}
elsif(/^\s*EXCEPTION STATUS:\s*(.*?)\s*$/) {
$exception_status = $1;
$exception_status =~s/\s+/;/g;
}
}
my (@keys) = split(/\s+/, ' '.$lines[@lines-2]);
my (@values) = split(/\s+/, ' '.$lines[@lines-1]);
my %usage; @usage{@keys} = @values;
#warn join(', ', map {sprintf('%s=%s', $_, $usage{$_})} (sort keys %usage)), "\n";
my ($mem_in_units, $mem_unit) = $usage{'MEM'} =~ /^([\d\.]+)([KMGT])$/;
my ($swap_in_units, $swap_unit) = $usage{'SWAP'} =~ /^([\d\.]+)([KMGT])$/;
$report_entry{ $process_id } = {
'completion_datetime' => $completion_datetime,
'cod' => $cod,
'exit_status' => $exit_status,
'exception_status' => $exception_status,
'mem_megs' => $mem_in_units * $units_converter{$mem_unit},
'swap_megs' => $swap_in_units * $units_converter{$swap_unit},
'pending_sec' => $usage{'WAIT'},
'cpu_sec' => $usage{'CPU_T'},
'lifespan_sec' => $usage{'TURNAROUND'},
};
}
}
close $bacct_fh;
return \%report_entry;
}
sub main {
my ($url, $reg_conf, $reg_type, $reg_alias, $nosqlvc, $bacct_source_line, $lsf_user, $help, $start_date, $end_date);
......@@ -170,7 +109,7 @@ sub main {
warn 'Will run the following command to obtain '.($tee ? 'and dump ' : '')."bacct information: '$bacct_source_line' (may take a few minutes)\n";
}
my $report_entries = parse_bacct_output( $bacct_source_line );
my $report_entries = Bio::EnsEMBL::Hive::Meadow::LSF::parse_report_source_line( $bacct_source_line );
my $processid_2_workerid = $hive_dba->get_WorkerAdaptor()->fetch_by_meadow_type_AND_meadow_name_HASHED_FROM_process_id_TO_worker_id( 'LSF', $this_lsf_farm );
......@@ -185,7 +124,7 @@ sub main {
}
}
$sth_replace->finish();
warn "\nReport has been loaded into pipeline's lsf_report table. Enjoy.\n";
warn "\nReport has been loaded into pipeline's 'worker_resource_usage' table. Enjoy.\n";
}
__DATA__
......@@ -203,28 +142,28 @@ __DATA__
Based on the command-line parameters 'start_date' and 'end_date', or on the start time of the first
worker and end time of the last worker (as recorded in pipeline DB), it pulls the relevant data out
of LSF's 'bacct' database, parses it and stores in 'lsf_report' table.
of LSF's 'bacct' database, parses it and stores in 'worker_resource_usage' table.
You can join this table to 'worker' table USING(meadow_name,process_id) in the usual MySQL way
to filter by analysis_id, do various stats, etc.
You can optionally ask the script to dump the 'bacct' database in a dump file,
or fill in the 'lsf_report' table from an existing dump file (most time is taken by querying bacct).
or fill in the 'worker_resource_usage' table from an existing dump file (most time is taken by querying bacct).
Please note the script may additionally pull information about LSF processes that you ran simultaneously
with running the pipeline. It is easy to ignore them by joining into 'worker' table.
=head1 USAGE EXAMPLES
# Just run it the usual way: query 'bacct' and load the relevant data into 'lsf_report' table:
# Just run it the usual way: query 'bacct' and load the relevant data into 'worker_resource_usage' table:
lsf_report.pl -url mysql://username:secret@hostname:port/long_mult_test
# The same, but assuming LSF user someone_else ran the pipeline:
lsf_report.pl -url mysql://username:secret@hostname:port/long_mult_test -lsf_user someone_else
# Assuming the dump file existed. Load the dumped bacct data into 'lsf_report' table:
# Assuming the dump file existed. Load the dumped bacct data into 'worker_resource_usage' table:
lsf_report.pl -url mysql://username:secret@hostname:port/long_mult_test -dump long_mult.bacct
# Assuming the dump file did not exist. Query 'bacct', dump the data into a file and load it into 'lsf_report':
# Assuming the dump file did not exist. Query 'bacct', dump the data into a file and load it into 'worker_resource_usage' table:
lsf_report.pl -url mysql://username:secret@hostname:port/long_mult_test -dump long_mult_again.bacct
=head1 OPTIONS
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment