Commit fa6a1518 authored by Leo Gordon's avatar Leo Gordon
Browse files

separating the bacct parser into a subroutine

parent 6163a7c3
......@@ -23,6 +23,67 @@ main();
exit(0);
sub parse_bacct_output {
my $bacct_source_line = shift @_;
local $/ = "------------------------------------------------------------------------------\n\n";
my %units_converter = ( 'K' => 1.0/1024, 'M' => 1, 'G' => 1024, 'T' => 1024*1024 );
open(my $bacct_fh, $bacct_source_line);
my $record = <$bacct_fh>; # skip the header
my %report_entry = ();
for my $record (<$bacct_fh>) {
chomp $record;
# warn "RECORD:\n$record";
my @lines = split(/\n/, $record);
if( my ($process_id) = $lines[0]=~/^Job <(\d+(?:\[\d+\])?)>/) {
my ($exit_status, $exception_status) = ('' x 2);
my ($completion_datetime, $cod);
foreach (@lines) {
if( /^(\w+\s+\w+\s+\d+\s+\d+:\d+:\d+):\s+Completed\s<(\w+)>(?:\.|;\s+(\w+))/ ) {
$completion_datetime = $1;
$cod = $3;
$exit_status = $2 . ($3 ? "/$3" : '');
}
elsif(/^\s*EXCEPTION STATUS:\s*(.*?)\s*$/) {
$exception_status = $1;
$exception_status =~s/\s+/;/g;
}
}
my (@keys) = split(/\s+/, ' '.$lines[@lines-2]);
my (@values) = split(/\s+/, ' '.$lines[@lines-1]);
my %usage; @usage{@keys} = @values;
#warn join(', ', map {sprintf('%s=%s', $_, $usage{$_})} (sort keys %usage)), "\n";
my ($mem_in_units, $mem_unit) = $usage{'MEM'} =~ /^([\d\.]+)([KMGT])$/;
my ($swap_in_units, $swap_unit) = $usage{'SWAP'} =~ /^([\d\.]+)([KMGT])$/;
$report_entry{ $process_id } = {
'completion_datetime' => $completion_datetime,
'cod' => $cod,
'exit_status' => $exit_status,
'exception_status' => $exception_status,
'mem_megs' => $mem_in_units * $units_converter{$mem_unit},
'swap_megs' => $swap_in_units * $units_converter{$swap_unit},
'pending_sec' => $usage{'WAIT'},
'cpu_sec' => $usage{'CPU_T'},
'lifespan_sec' => $usage{'TURNAROUND'},
};
}
}
close $bacct_fh;
return \%report_entry;
}
sub main {
my ($url, $reg_conf, $reg_type, $reg_alias, $nosqlvc, $bacct_source_line, $lsf_user, $help, $start_date, $end_date);
......@@ -108,58 +169,23 @@ sub main {
warn 'Will run the following command to obtain '.($tee ? 'and dump ' : '')."bacct information: '$bacct_source_line' (may take a few minutes)\n";
}
my $processid_2_workerid = $hive_dba->get_WorkerAdaptor()->fetch_by_meadow_type_AND_meadow_name_HASHED_FROM_process_id_TO_worker_id( 'LSF', $this_lsf_farm );
my $sth_replace = $dbc->prepare( 'REPLACE INTO worker_resource_usage (worker_id, exit_status, mem_megs, swap_megs, pending_sec, cpu_sec, lifespan_sec, exception_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)' );
{
local $/ = "------------------------------------------------------------------------------\n\n";
my %units_converter = ( 'K' => 1.0/1024, 'M' => 1, 'G' => 1024, 'T' => 1024*1024 );
open(my $bacct_fh, $bacct_source_line);
my $record = <$bacct_fh>; # skip the header
for my $record (<$bacct_fh>) {
chomp $record;
# warn "RECORD:\n$record";
my @lines = split(/\n/, $record);
if( my ($process_id) = $lines[0]=~/^Job <(\d+(?:\[\d+\])?)>/) {
my $exception_status = '';
foreach (@lines) {
if(/^\s*EXCEPTION STATUS:\s*(.*?)\s*$/) {
$exception_status = $1;
$exception_status =~s/\s+/;/g;
}
}
my (@keys) = split(/\s+/, ' '.$lines[@lines-2]);
my (@values) = split(/\s+/, ' '.$lines[@lines-1]);
my %usage = map { ($keys[$_] => $values[$_]) } (0..@keys-1);
my $report_entries = parse_bacct_output( $bacct_source_line );
my ($mem_in_units, $mem_unit) = $usage{'MEM'} =~ /^([\d\.]+)([KMGT])$/;
my ($swap_in_units, $swap_unit) = $usage{'SWAP'} =~ /^([\d\.]+)([KMGT])$/;
my $processid_2_workerid = $hive_dba->get_WorkerAdaptor()->fetch_by_meadow_type_AND_meadow_name_HASHED_FROM_process_id_TO_worker_id( 'LSF', $this_lsf_farm );
my $mem_megs = $mem_in_units * $units_converter{$mem_unit};
my $swap_megs = $swap_in_units * $units_converter{$swap_unit};
my $sth_replace = $dbc->prepare( 'REPLACE INTO worker_resource_usage (worker_id, exit_status, mem_megs, swap_megs, pending_sec, cpu_sec, lifespan_sec, exception_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)' );
#warn join(', ', map {sprintf('%s=%s', $_, $usage{$_})} (sort keys %usage)), "\n";
while( my ($process_id, $report_entry) = each %$report_entries ) {
if( my $worker_id = $processid_2_workerid->{$process_id} ) {
$sth_replace->execute( $worker_id, $usage{STATUS}, $mem_megs, $swap_megs, $usage{WAIT}, $usage{CPU_T}, $usage{TURNAROUND}, $exception_status );
} else {
warn "\tDiscarding process_id=$process_id as probably not ours because it could not be mapped to a Worker\n";
}
}
if( my $worker_id = $processid_2_workerid->{$process_id} ) {
$sth_replace->execute( $worker_id, @$report_entry{'exit_status', 'mem_megs', 'swap_megs', 'pending_sec', 'cpu_sec', 'lifespan_sec', 'exception_status'} ); # slicing hashref
} else {
warn "\tDiscarding process_id=$process_id as probably not ours because it could not be mapped to a Worker\n";
}
close $bacct_fh;
}
$sth_replace->finish();
warn "\nReport has been loaded into pipeline's lsf_report table. Enjoy.\n";
}
__DATA__
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment