#!/usr/bin/perl
####################################################################################################################################
# pgaudit_analyze - Log analyzer for pgaudit
####################################################################################################################################

####################################################################################################################################
# Perl includes
####################################################################################################################################
use strict;
use warnings FATAL => qw(all);
use Carp qw(confess);

use DBI;
use File::Basename qw(dirname);
use Getopt::Long qw(GetOptions);
use Pod::Usage;
use POSIX qw(setsid);

use lib dirname($0) . '/../lib';
use PgAudit::CSV;
use PgAudit::Wait;

####################################################################################################################################
# Usage
####################################################################################################################################
=head1 SYNOPSIS

pgaudit_analyze [options] <pg-log-path>

 Configuration Options:
   --daemon             run as a daemon (consider running under upstart or systemctl)
   --port               port that PostgreSQL is running on (defaults to 5432)
   --socket-path        PostgreSQL host or socket directory (default to system default directory)
   --log-file           location of the log file for pgaudit_analyze (defaults to /var/log/pgaudit_analyze.log)
   --user               specify postgres user instead of using pgaudit_analyze invoker

 General Options:
   --help               display usage and exit
=cut

####################################################################################################################################
# Handle die as a confess instead to get more detailed error information
####################################################################################################################################
$SIG{__DIE__} = sub {Carp::confess @_};

####################################################################################################################################
# Define column positions of log and audit data
####################################################################################################################################
use constant
{
    # Session unique fields
    LOG_FIELD_USER_NAME                 => 1,
    LOG_FIELD_DATABASE_NAME             => 2,
    LOG_FIELD_PROCESS_ID                => 3,
    LOG_FIELD_CONNECTION_FROM           => 4,
    LOG_FIELD_SESSION_ID                => 5,
    LOG_FIELD_SESSION_START_TIME        => 8,

    # Log fields
    LOG_FIELD_LOG_TIME                  => 0,
    LOG_FIELD_SESSION_LINE_NUM          => 6,
    LOG_FIELD_COMMAND_TAG               => 7,
    LOG_FIELD_VIRTUAL_TRANSACTION_ID    => 9,
    LOG_FIELD_TRANSACTION_ID            => 10,
    LOG_FIELD_ERROR_SEVERITY            => 11,
    LOG_FIELD_SQL_STATE_CODE            => 12,
    LOG_FIELD_MESSAGE                   => 13,
    LOG_FIELD_DETAIL                    => 14,
    LOG_FIELD_HINT                      => 15,
    LOG_FIELD_INTERNAL_QUERY            => 16,
    LOG_FIELD_INTERNAL_QUERY_POS        => 17,
    LOG_FIELD_CONTEXT                   => 18,
    LOG_FIELD_QUERY                     => 19,
    LOG_FIELD_QUERY_POS                 => 20,
    LOG_FIELD_LOCATION                  => 21,
    LOG_FIELD_APPLICATION_NAME          => 22
};

use constant
{
    AUDIT_FIELD_AUDIT_TYPE          => 0,
    AUDIT_FIELD_STATEMENT_ID        => 1,
    AUDIT_FIELD_SUBSTATEMENT_ID     => 2,
    AUDIT_FIELD_CLASS               => 3,
    AUDIT_FIELD_COMMAND             => 4,
    AUDIT_FIELD_OBJECT_TYPE         => 5,
    AUDIT_FIELD_OBJECT_NAME         => 6,
    AUDIT_FIELD_STATEMENT           => 7
};

use constant
{
    COMMAND_TAG_AUTHENTICATION  => 'authentication'
};

use constant
{
    ERROR_SEVERITY_ERROR  => 'error',
    ERROR_SEVERITY_FATAL  => 'fatal',
    ERROR_SEVERITY_PANIC  => 'panic'
};

use constant
{
    STATE_OK    => 'ok',
    STATE_ERROR => 'error'
};

use constant
{
    true    => 1,
    false   => 0
};

####################################################################################################################################
# Parse options
####################################################################################################################################
my $bHelp = false;
my $bDaemon = false;
my $iPort = 5432;
my $strSocketPath;
my $strLogOutFile = '/var/log/pgaudit_analyze.log';
my $strDbUser = getpwuid($<);


GetOptions ('help' => \$bHelp,
            'daemon' => \$bDaemon,
            'port=s' => \$iPort,
            'socket-path=s' => \$strSocketPath,
            'log-file=s' => \$strLogOutFile,
            'user=s' => \$strDbUser)
    or pod2usage(2);

# Display version and exit if requested
if ($bHelp)
{
    syswrite(*STDOUT, "PostgreSQL Audit Log Analyzer\n\n");
    pod2usage();

    exit 0;
}

####################################################################################################################################
# Connect to Postgres
####################################################################################################################################
my %oDbHash;
my $strAuditUserName = 'pgaudit_etl';
my $strAuditSchemaName = 'pgaudit';

sub databaseGet
{
    my $strDatabaseName = shift;

    # Check if the database session already exists
    if (defined($oDbHash{$strDatabaseName}))
    {
        # Return false if this database was found not to have an audit schema
        return false
            if (!$oDbHash{$strDatabaseName}{log});

        # Else return true to allow audit logging
        return true;
    }

    # Connect to the database
    $oDbHash{$strDatabaseName}{hDb} = DBI->connect(
        "dbi:Pg:dbname=${strDatabaseName};port=${iPort};" .
        (defined($strSocketPath) ? "host=${strSocketPath}" : ''),
        $strDbUser, undef,
        {AutoCommit => 0, RaiseError => 1});

    # Check for the audit schema to determine whether or not to log this database
    my $hSqlSchemaSelect = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "select count(*) = 1\n" .
        "  from pg_namespace\n" .
        " where nspname = ?");

    $oDbHash{$strDatabaseName}{hDb}->do("set session authorization ${strAuditUserName}");

    $hSqlSchemaSelect->execute($strAuditSchemaName);

    $oDbHash{$strDatabaseName}{log} = ($hSqlSchemaSelect->fetchrow_array())[0] ? true : false;
    undef($hSqlSchemaSelect);

    # Disconnect from database and return false if there is no audit schema
    if (!$oDbHash{$strDatabaseName}{log})
    {
        $oDbHash{$strDatabaseName}{hDb}->disconnect();
        undef($oDbHash{$strDatabaseName}{hDb});
        return false;
    }

    $oDbHash{$strDatabaseName}{hSqlSessionInsert} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "insert into pgaudit.session (session_id, process_id, session_start_time, user_name, application_name,\n" .
        "                             connection_from, state)\n" .
        "                     values (?, ?, ?, ?, ?, ?, ?)");

    $oDbHash{$strDatabaseName}{hSqlSessionSelect} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "with session_line_num as\n" .
        "(\n" .
        "    select session_id,\n" .
        "           max(session_line_num) as session_line_num_max\n" .
        "      from pgaudit.log_event\n" .
        "     where session_id = ?\n" .
        "     group by session_id\n" .
        "),\n" .
        "audit as\n" .
        "(\n" .
        "    select session_id,\n" .
        "           max(statement_id) as statement_id_max,\n" .
        "           max(substatement_id) as substatement_id_max\n" .
        "      from pgaudit.audit_substatement\n" .
        "     where session_id = ?\n" .
        "     group by session_id\n" .
        ")\n" .
        "select session.application_name,\n" .
        "       session.state,\n" .
        "       coalesce(session_line_num.session_line_num_max, 0) as session_line_num_max,\n" .
        "       coalesce(audit.statement_id_max, 0) as statement_id_max,\n" .
        "       coalesce(audit.substatement_id_max, 0) as substatement_id_max\n" .
        " from pgaudit.session\n" .
        "      left outer join session_line_num\n" .
        "           on session_line_num.session_id = session.session_id\n" .
        "      left outer join audit\n" .
        "           on audit.session_id = session.session_id\n" .
        " where session.session_id = ?");

    $oDbHash{$strDatabaseName}{hSqlSessionUpdate} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "update pgaudit.session\n" .
        "   set application_name = ?\n" .
        " where session_id = ?");

    $oDbHash{$strDatabaseName}{hSqlLogonSelect} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "select last_success,\n" .
        "       current_success,\n" .
        "       last_failure,\n" .
        "       failures_since_last_success\n".
        "  from pgaudit.logon\n" .
        " where user_name = ?");

    $oDbHash{$strDatabaseName}{hSqlLogonInsert} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "insert into pgaudit.logon (user_name, current_success, last_failure, failures_since_last_success)\n" .
        "                   values (?, ?, ?, ?)");

    $oDbHash{$strDatabaseName}{hSqlLogonUpdate} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "update pgaudit.logon\n" .
        "   set last_success = ?,\n" .
        "       current_success = ?,\n" .
        "       last_failure = ?,\n" .
        "       failures_since_last_success = ?\n".
        " where user_name = ?");

    $oDbHash{$strDatabaseName}{hSqlLogInsert} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "insert into pgaudit.log_event (session_id, log_time, session_line_num, command, error_severity, sql_state_code,\n" .
        "                               virtual_transaction_id, transaction_id, message, detail, hint, query, query_pos,\n" .
        "                               internal_query, internal_query_pos, context, location)\n" .
        "                       values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");

    $oDbHash{$strDatabaseName}{hSqlLogUpdate} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "update pgaudit.log_event\n" .
        "   set user_name = ?,\n" .
        "       database_name = ?,\n" .
        "       connection_from = ?,\n" .
        "       state = ?\n" .
        " where id = ?");

    $oDbHash{$strDatabaseName}{hSqlAuditStmtInsert} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "insert into pgaudit.audit_statement (session_id, statement_id)\n" .
        "                             values (?, ?)");

    $oDbHash{$strDatabaseName}{hSqlAuditStmtErrorUpdate} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "update pgaudit.audit_statement as audit_statement_update\n" .
        "   set state = 'error',\n" .
        "       error_session_line_num = ?" .
        " where exists\n" .
        "(\n" .
        "    select true\n" .
        "      from pgaudit.log_event\n" .
        "           inner join pgaudit.audit_substatement_detail\n" .
        "                on audit_substatement_detail.session_id = log_event.session_id\n" .
        "               and audit_substatement_detail.session_line_num = log_event.session_line_num\n" .
        "               and audit_substatement_detail.statement_id = audit_statement_update.statement_id\n" .
        "               and audit_substatement_detail.session_id = audit_statement_update.session_id\n" .
        "           inner join pgaudit.audit_statement\n" .
        "                on audit_statement.session_id = audit_substatement_detail.session_id\n" .
        "               and audit_statement.statement_id = audit_substatement_detail.statement_id\n" .
        "     where log_event.session_id = ?\n" .
        "       and log_event.virtual_transaction_id = ?\n" .
        ")");

    $oDbHash{$strDatabaseName}{hSqlAuditSubStmtInsert} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "insert into pgaudit.audit_substatement (session_id, statement_id, substatement_id, substatement)\n" .
        "                                values (?, ?, ?, ?)");

    $oDbHash{$strDatabaseName}{hSqlAuditSubStmtDetailInsert} = $oDbHash{$strDatabaseName}{hDb}->prepare(
        "insert into pgaudit.audit_substatement_detail (session_id, statement_id, substatement_id, session_line_num,\n" .
        "                                               audit_type, class, command, object_type, object_name)\n" .
        "                                       values (?, ?, ?, ?, ?, ?, ?, ?, ?)");

    return true;
}

####################################################################################################################################
# sessionGet
####################################################################################################################################
my %oSessionHash;
my %oLogonHash;

sub sessionGet
{
    my $strSessionId = shift;
    my $lSessionLineNum = shift;
    my $strProcessId = shift;
    my $strSessionStartTime = shift;
    my $strUserName = shift;
    my $strDatabaseName = shift;
    my $strApplicationName = shift;
    my $strConnectionFrom = shift;
    my $strCommandTag = shift;
    my $strErrorSeverity = shift;

    # Set connection from to a default if not defined yet
    if (!defined($strApplicationName))
    {
        $strApplicationName = '[unknown]';
    }

    # Set connection from to a default if not defined yet
    if (!defined($strConnectionFrom))
    {
        $strConnectionFrom = '[unknown]';
    }

    # Set state to OK
    my $strState = STATE_OK;

    # Set state to ERROR on authentication failure
    if (defined($strCommandTag) && lc($strCommandTag) eq COMMAND_TAG_AUTHENTICATION &&
        defined($strErrorSeverity) && lc($strErrorSeverity) eq ERROR_SEVERITY_FATAL)
    {
        $strState = STATE_ERROR;
    }

    # If session id does not exist in the cache
    if (!defined($oSessionHash{$strSessionId}))
    {
        # Attempt to select from database
        $oDbHash{$strDatabaseName}{hSqlSessionSelect}->execute($strSessionId, $strSessionId, $strSessionId);

        ($oSessionHash{$strSessionId}{application_name}, $oSessionHash{$strSessionId}{state},
         $oSessionHash{$strSessionId}{session_line_num}, $oSessionHash{$strSessionId}{statement_id},
         $oSessionHash{$strSessionId}{substatement_id}) = $oDbHash{$strDatabaseName}{hSqlSessionSelect}->fetchrow_array();

        # If state is defined then the select was successful
        if (defined($oSessionHash{$strSessionId}{state}))
        {
            print timestampGet() . ": session select ${strSessionId}:" .
                  " session_line_num $oSessionHash{$strSessionId}{session_line_num}" .
                  ", statement_id $oSessionHash{$strSessionId}{statement_id}" .
                  ", substatement_id $oSessionHash{$strSessionId}{substatement_id}\n";
        }
        # Else the session does not exist and must be inserted
        else
        {
            # Insert session row
            $oDbHash{$strDatabaseName}{hSqlSessionInsert}->execute(
                $strSessionId, $strProcessId, $strSessionStartTime, $strUserName, $strApplicationName,
                $strConnectionFrom, $strState);

            # Set session cache so the session does not have to be queried every time
            $oSessionHash{$strSessionId}{application_name} = $strApplicationName;
            $oSessionHash{$strSessionId}{state} = $strState;
            $oSessionHash{$strSessionId}{session_line_num} = 0;
            $oSessionHash{$strSessionId}{statement_id} = 0;
            $oSessionHash{$strSessionId}{substatement_id} = 0;

            # Get logon Info
            $oDbHash{$strDatabaseName}{hSqlLogonSelect}->execute($strUserName);
            my ($strLastSuccess, $strCurrentSuccess, $strLastFailure,
                $iFailuresSinceLastSuccess) = $oDbHash{$strDatabaseName}{hSqlLogonSelect}->fetchrow_array();

            # If the user has logged on before
            if (defined($iFailuresSinceLastSuccess))
            {
                # If this logon was successful
                if ($strState eq STATE_OK)
                {
                    # If the last logon was successful
                    if (defined($strCurrentSuccess))
                    {
                        $strLastSuccess = $strCurrentSuccess;
                        undef($strLastFailure);
                        $iFailuresSinceLastSuccess = 0;
                    }

                    $strCurrentSuccess = $strSessionStartTime;
                }
                # Else this logon was not successful
                else
                {
                    undef($strCurrentSuccess);
                    $strLastFailure = $strSessionStartTime;
                    $iFailuresSinceLastSuccess += 1;
                }

                # Update the logon row
                $oDbHash{$strDatabaseName}{hSqlLogonUpdate}->execute(
                    $strLastSuccess, $strCurrentSuccess, $strLastFailure, $iFailuresSinceLastSuccess, $strUserName);
            }
            # Else the user has never logged on
            else
            {
                # If this logon was successful
                if ($strState eq STATE_OK)
                {
                    $strCurrentSuccess = $strSessionStartTime;
                    $iFailuresSinceLastSuccess = 0;
                }
                # Else this logon was not successful
                else
                {
                    $strLastFailure = $strSessionStartTime;
                    $iFailuresSinceLastSuccess = 1;
                }

                # Insert the logon row
                $oDbHash{$strDatabaseName}{hSqlLogonInsert}->execute(
                    $strUserName, $strCurrentSuccess, $strLastFailure, $iFailuresSinceLastSuccess);
            }

            $oDbHash{$strDatabaseName}{hDb}->commit();

            print timestampGet() . ": session insert =  ${strSessionId}\n";
        }
    }


    # If the application name has changed update it (only the last application name is preserved)
    if ($lSessionLineNum > $oSessionHash{$strSessionId}{session_line_num} &&
        $strApplicationName ne $oSessionHash{$strSessionId}{application_name})
    {
        $oDbHash{$strDatabaseName}{hSqlSessionUpdate}->execute($strApplicationName, $strSessionId);
        $oSessionHash{$strSessionId}{application_name} = $strApplicationName;

        print timestampGet() . ": session update = ${strSessionId}, application = ${strApplicationName}\n";
    }

    # Add to the local cache
    $oSessionHash{$strSessionId}{last_log} = time();
}

####################################################################################################################################
# logWrite
####################################################################################################################################
sub logWrite
{
    my $strSessionId = shift;
    my $strDatabaseName = shift;
    my $strLogTime = shift;
    my $lSessionLineNum = shift;
    my $strCommandTag = shift;
    my $strErrorSeverity = shift;
    my $strSqlStateCode = shift;
    my $strVirtualTransationId = shift;
    my $lTransactionId = shift;
    my $strMessage = shift;
    my $strDetail = shift;
    my $strHint = shift;
    my $strQuery = shift;
    my $iQueryPos = shift;
    my $strInternalQuery = shift;
    my $iInternalQueryPos = shift;
    my $strContext = shift;
    my $strLocation = shift;

    if (auditWrite($strSessionId, $strDatabaseName, $lSessionLineNum, $strMessage))
    {
        undef($strMessage);
    }

    if ($lSessionLineNum > $oSessionHash{$strSessionId}{session_line_num})
    {
        $oDbHash{$strDatabaseName}{hSqlLogInsert}->execute(
            $strSessionId, $strLogTime, $lSessionLineNum, $strCommandTag, $strErrorSeverity, $strSqlStateCode,
            $strVirtualTransationId, $lTransactionId, $strMessage, $strDetail, $strHint, $strQuery, $iQueryPos, $strInternalQuery,
            $iInternalQueryPos, $strContext, $strLocation);
        $oSessionHash{$strSessionId}{session_line_num} = $lSessionLineNum;

        if (defined($strErrorSeverity) && ($strErrorSeverity eq ERROR_SEVERITY_ERROR ||
            $strErrorSeverity eq ERROR_SEVERITY_FATAL || $strErrorSeverity eq ERROR_SEVERITY_PANIC))
        {
            $oDbHash{$strDatabaseName}{hSqlAuditStmtErrorUpdate}->execute($lSessionLineNum, $strSessionId, $strVirtualTransationId);
        }
    }
}

####################################################################################################################################
# auditWrite
####################################################################################################################################
my $oAuditCSV;

sub auditWrite
{
    my $strSessionId = shift;
    my $strDatabaseName = shift;
    my $lSessionLineNum = shift;
    my $strMessage = shift;

    if ($strMessage =~ /^AUDIT\:\ /)
    {
        $oAuditCSV->parse(substr($strMessage, 7));
        my @stryRow = $oAuditCSV->fields();
        my $lStatementId = $stryRow[AUDIT_FIELD_STATEMENT_ID];
        my $lSubStatementId = $stryRow[AUDIT_FIELD_SUBSTATEMENT_ID];

        if ($lStatementId > $oSessionHash{$strSessionId}{statement_id})
        {
            $oDbHash{$strDatabaseName}{hSqlAuditStmtInsert}->execute($strSessionId, $lStatementId);
            $oSessionHash{$strSessionId}{statement_id} = $lStatementId;
            $oSessionHash{$strSessionId}{substatement_id} = 0;
        }

        if ($lStatementId == $oSessionHash{$strSessionId}{statement_id} &&
            $lSubStatementId > $oSessionHash{$strSessionId}{substatement_id})
        {
            $oDbHash{$strDatabaseName}{hSqlAuditSubStmtInsert}->execute(
                $strSessionId, $lStatementId, $lSubStatementId, $stryRow[AUDIT_FIELD_STATEMENT]);
            $oSessionHash{$strSessionId}{substatement_id} = $lSubStatementId;
        }

        if ($lSessionLineNum > $oSessionHash{$strSessionId}{session_line_num})
        {
            $oDbHash{$strDatabaseName}{hSqlAuditSubStmtDetailInsert}->execute(
                $strSessionId, $lStatementId, $lSubStatementId, $lSessionLineNum, lc($stryRow[AUDIT_FIELD_AUDIT_TYPE]),
                lc($stryRow[AUDIT_FIELD_CLASS]), lc($stryRow[AUDIT_FIELD_COMMAND]),
                defined($stryRow[AUDIT_FIELD_OBJECT_TYPE]) ? lc($stryRow[AUDIT_FIELD_OBJECT_TYPE]) : undef,
                defined($stryRow[AUDIT_FIELD_OBJECT_NAME]) ? lc($stryRow[AUDIT_FIELD_OBJECT_NAME]) : undef);
        }

        return true;
    }

    return false;
}

####################################################################################################################################
# nextLogFile
#
# Find next log file to be analyzed.
####################################################################################################################################
sub nextLogFile
{
    my $strPath = shift;
    my $strLastLogFile = shift;

    # Open the log file path
    my $hPath;

    if (!opendir($hPath, $strPath))
    {
        confess "unable to open database log directory";
    }

    # Get the list of log files and make sure there are some
    my @stryFileList = sort(grep(/.*\.csv$/i, readdir($hPath)));

    if (@stryFileList == 0)
    {
        confess "no csv log files found";
    }

    # If there is no last log file return the first log in the list
    if (!defined($strLastLogFile))
    {
        return $stryFileList[0];
    }

    # Else return the first log file alphabetically greater than the last log
    for (my $iIndex = 0; $iIndex < @stryFileList; $iIndex++)
    {
        return $stryFileList[$iIndex]
            if ($stryFileList[$iIndex] gt $strLastLogFile);
    }

    # Else there is no next file
    return undef;
}


####################################################################################################################################
# Daemonize this process
####################################################################################################################################
sub daemonInit
{
    my $pid;

    chdir '/'
        or confess "chdir() failed: $!";

    # Close stdin/stdout/stderr
    open STDIN, '<', '/dev/null'
        or confess "Couldn't close standard input: $!";
    open STDOUT, '>', $strLogOutFile
        or confess "Couldn't close standard output: $!";
    open STDERR, '<', '/dev/null'
        or confess "Couldn't close standard error: $!";

    # Create new process
    defined($pid = fork)
        or confess "fork() failed: $!";

    exit if $pid;

    # Create new session group
    setsid() or confess("setsid() failed: $!");
}

####################################################################################################################################
# Get current timestamp as a string (used for logging)
####################################################################################################################################
sub timestampGet
{
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime(time);
    return sprintf ( "%04d-%02d-%02d %02d:%02d:%02d", $year+1900,$mon+1,$mday,$hour,$min,$sec);
}

####################################################################################################################################
# Main loop
####################################################################################################################################
my $strLogPath = $ARGV[0];

if (!defined($strLogPath))
{
    confess "log path must be passed";
}

my $hFile;
my $strLogFile;
my $oLogCSV;
my $bDone = false;

# Find the first log file
my $strNextLogFile = nextLogFile($strLogPath);

# Open log file
open(my $hLog, '>>', $strLogOutFile)
    or confess "unable to open pgAudit Analyze log file $strLogOutFile: $!";

# Daemonize the process
daemonInit()
    if ($bDaemon);

while(!$bDone)
{
    # If the audit CSV object is undefined (e.g. reset if an error occurred or never created) then create it
    if (!defined($oAuditCSV))
    {
        $oAuditCSV = new PgAudit::CSV({binary => 1, empty_is_undef => 1});
    }

    eval
    {
        if (!defined($strNextLogFile))
        {
            if (-d $strLogPath)
            {
                $strNextLogFile = nextLogFile($strLogPath, $strLogFile);

                waitHiRes(.1);
            }
            else
            {
                $bDone = true;
            }
        }
        else
        {
            if (defined($hFile))
            {
                close($hFile);
            }

            $strLogFile = $strNextLogFile;
            undef($strNextLogFile);

            syswrite($hLog, timestampGet() . ": reading ${strLogFile}\n");

            # Read updating file
            # http://stackoverflow.com/questions/1425223/how-do-i-read-a-file-which-is-constantly-updating

            open($hFile, '<', "${strLogPath}/${strLogFile}")
                or confess "unable to open ${strLogPath}/${strLogFile}";

            # Read the log file
            $oLogCSV = new PgAudit::CSV({binary => 1, empty_is_undef => 1});
        }

        # Perl 5.36 fixed an issue where you could continue reading a file that was being appended to after hitting EOF, see:
        # https://groups.google.com/g/linux.debian.bugs.dist/c/ZaxLI8YufO8. Therefore we must seek to the current location to
        # continue reading if the log is being appended to.
        seek($hFile, 0, 1)
            or confess "unable to seek to current position in ${strLogPath}/${strLogFile}";

        # Parse all rows in the file into CSV
        while (my $stryRow = $oLogCSV->getline($hFile))
        {
            my $strSessionId = $$stryRow[LOG_FIELD_SESSION_ID];
            my $lSessionLineNum = $$stryRow[LOG_FIELD_SESSION_LINE_NUM];
            my $strUserName = $$stryRow[LOG_FIELD_USER_NAME];
            my $strDatabaseName = $$stryRow[LOG_FIELD_DATABASE_NAME];

            if (defined($strUserName) && $strAuditUserName ne $strUserName &&
                defined($strDatabaseName) && databaseGet($strDatabaseName) &&
                (!defined($oSessionHash{$strSessionId}) || !defined($oSessionHash{$strSessionId}{session_line_num}) ||
                 $lSessionLineNum > $oSessionHash{$strSessionId}{session_line_num}))
            {
                sessionGet($strSessionId, $lSessionLineNum, $$stryRow[LOG_FIELD_PROCESS_ID], $$stryRow[LOG_FIELD_SESSION_START_TIME],
                           $strUserName, $strDatabaseName, $$stryRow[LOG_FIELD_APPLICATION_NAME], $$stryRow[LOG_FIELD_CONNECTION_FROM],
                           $$stryRow[LOG_FIELD_COMMAND_TAG], $$stryRow[LOG_FIELD_ERROR_SEVERITY]);

                logWrite($strSessionId, $strDatabaseName, $$stryRow[LOG_FIELD_LOG_TIME], $lSessionLineNum,
                         defined($$stryRow[LOG_FIELD_COMMAND_TAG]) ? lc($$stryRow[LOG_FIELD_COMMAND_TAG]) : undef,
                         defined($$stryRow[LOG_FIELD_ERROR_SEVERITY]) ? lc($$stryRow[LOG_FIELD_ERROR_SEVERITY]) : undef,
                         defined($$stryRow[LOG_FIELD_SQL_STATE_CODE]) ? lc($$stryRow[LOG_FIELD_SQL_STATE_CODE]) : undef,
                         $$stryRow[LOG_FIELD_VIRTUAL_TRANSACTION_ID],
                         $$stryRow[LOG_FIELD_TRANSACTION_ID], $$stryRow[LOG_FIELD_MESSAGE], $$stryRow[LOG_FIELD_DETAIL],
                         $$stryRow[LOG_FIELD_HINT], $$stryRow[LOG_FIELD_QUERY], $$stryRow[LOG_FIELD_QUERY_POS],
                         $$stryRow[LOG_FIELD_INTERNAL_QUERY], $$stryRow[LOG_FIELD_INTERNAL_QUERY_POS], $$stryRow[LOG_FIELD_CONTEXT],
                         $$stryRow[LOG_FIELD_LOCATION]);

                $oDbHash{$strDatabaseName}{hDb}->commit();
            }
        }
    };

    # If there was an error then log it and reset
    if ($@)
    {
        my $strMessage = $@;

        syswrite($hLog, timestampGet() . ": $@\n");
        sleep(5);

        # Reset everything and start again
        undef($oAuditCSV);
        undef(%oDbHash);
        undef(%oSessionHash);
        undef(%oLogonHash);
        $strNextLogFile = nextLogFile($strLogPath)
    }
}
