#!/usr/bin/perl
#-------------------------------------------------------------------------
# pg_dbms_job
#
#	Perl scheduler daemon to emulate Oracle DBMS_JOB for PostgreSQL.
#
# Author: Gilles Darold <gilles@darold.net>
# Licence: PostgreSQL
# Copyright (c) 2021-2022, MigOps Inc,
#-------------------------------------------------------------------------
use vars qw($VERSION $PROGRAM);
use strict;

use IO::File;
use Getopt::Long qw(:config bundling no_ignore_case_always);
use POSIX qw(locale_h sys_wait_h _exit strftime);
setlocale(LC_NUMERIC, '');
setlocale(LC_ALL, 'C');
use DBI;
use DBD::Pg;
use Time::HiRes;
use Time::Piece;

$| = 1;

$VERSION = '1.5';
$PROGRAM = 'pg_dbms_job';

$SIG{'CHLD'} = 'DEFAULT';

# Global variables
my $CONFIG_FILE  = "/etc/pg_dbms_job/pg_dbms_job.conf";
my %CONFIG = (
	'debug' => 0,
	'pidfile' => '/tmp/pg_dbms_job.pid',
	'logfile' => '',
);
my $ABORT        = 0;
my $KILL         = 0;
my $RELOAD       = 0;
my %RUNNING_PIDS = ();
my %DBINFO       = ();
my $HELP         = 0;
my $SHOWVER      = 0;
my %SCHEDULED_JOBS   = ();
my %ASYNC_JOBS   = ();
my $SINGLE       = 0;

my %PQSTATUS = (
	0 => "Empty query string",
	1 => "Query returning no data success",
	2 => "Query returning data success",
	3 => "COPY OUT transfer in progress",
	4 => "COPY IN transfer is in progress",
	5 => "Unexpected response received",
	6 => "A nonfatal error occurred",
	7 => "The query failed",
	8 => "COPY IN/OUT transfer in progress",
	9 => "Single tuple from larger resultset",
	10 => "Pipeline synchronization point",
	11 => "Command didn't run because of an abort earlier in a pipeline"
);

# Time to wait in the main loop before each runs to free some CPU cycle
my $naptime = 0.1;

# Main database connection handler
my $dbh = undef;

# Flag to terminate all actions and exit when SIGINT is received
my $fini         = 0;

# Flag to signal that the configuration need to be reread
my $config_invalidated = 0;

# Flag for initial import of jobs
my $startup = 1;

# Maximum number of job processed at the same time
my $job_queue_processes = 1000;

# Interval to force a poll of the job queue in case there was no
# notification to execute old scheduled jobs. Default 5 seconds.
my $job_queue_interval = 5;

# Global variable to register current log file for rotation
my $old_log_file = '';

# Process command line options
my $result = GetOptions
(
        "c|config=s"  => \$CONFIG_FILE,
        "d|debug!"    => \$CONFIG{ 'debug' },
        "h|help!"     => \$HELP,
        "k|kill!"     => \$KILL,
        "m|immediate!"=> \$ABORT,
        "r|reload!"   => \$RELOAD,
        "s|single!"   => \$SINGLE,
        "v|version!"  => \$SHOWVER,
);

# Display usage if help is asked and exit
usage() if $HELP;

# Show version if asked end exit
if ($SHOWVER)
{
	print "Version: $VERSION\n";
	exit 0;
}

# Look at configuration file
read_config();

####
# The daemon should be stopped, send signal to the process and exit
####
if ($KILL) {
	signal_handling('TERM');
} elsif ($ABORT) {
	signal_handling('INT');
} elsif ($RELOAD) {
	signal_handling('HUP');
}

# Verify that an other process is not running
if (-e $CONFIG{ 'pidfile' }) {
	die "FATAL: pid file already exists at $CONFIG{ 'pidfile' }, does another pg_dbms_job process is running?\n";
}

####
# Method used to fork a subprocess
####
sub spawn
{
        my $coderef = shift;

        unless (@_ == 0 and $coderef and ref($coderef) eq 'CODE') {
                print "usage: spawn CODEREF";
                exit 0;
        }

	my $pid;
        if (!defined($pid = fork)) {
                dprint('ERROR', "cannot fork: $!\n");
                return;
        } elsif ($pid) {
		$RUNNING_PIDS{$pid} = 1;
                return; # the parent
        }
        # the child -- go spawn
        $< = $>;
        $( = $); # suid progs only

        exit &$coderef();
}

####
# Die cleanly on signal
####
sub terminate
{
	my $sig = shift;

	$fini = 1;

	dprint('LOG', "Received terminating signal $sig.");

	$SIG{INT}  = \&terminate;
	$SIG{TERM} = \&terminate;
	$SIG{HUP}  = \&reload;

	unlink("$CONFIG{ 'pidfile' }") if (-f $CONFIG{ 'pidfile' });

	# Wait for all child processes to die
	&wait_all_childs();

	$dbh->disconnect() if (defined $dbh);

	_exit(0);
}

####
# Instruct the program to reread configuration
# file and refresh all the jobs definitions.
####
sub reload
{
	my $sig = shift;

	dprint('LOG', "Received reload signal $sig.");

	$SIG{INT}  = \&terminate;
	$SIG{TERM} = \&terminate;
	$SIG{HUP}  = \&reload;

	my $old_pidfile = $CONFIG{ 'pidfile' };
	&read_config(1);

	# In case the pid file path have change rename
	# it or keep the old one in case of failure
	if ($CONFIG{ 'pidfile' } and $CONFIG{ 'pidfile' } ne $old_pidfile)
	{
		unless (rename($old_pidfile, "$CONFIG{ 'pidfile' }"))
		{
			dprint('ERROR', "can't change path to pid keeping old one $old_pidfile, $!");
			$CONFIG{ 'pidfile' } = $old_pidfile
		}
		else
		{
			dprint('LOG', "path to pid file has changed, rename $old_pidfile into $CONFIG{ 'pidfile' }");
		}
	}

	# instruct main loop that it have to rebuild jobs cache
	$config_invalidated = 1;
}

# Die on kill -2, -3 or -15
$SIG{'INT'} = $SIG{'TERM'} = \&terminate;
$SIG{'HUP'} = \&reload;

# Detach from terminal if we are not running in single mode
if (!$SINGLE)
{
	my $pid = fork;
	exit 0 if ($pid);
	die "FATAL: Couldn't fork: $!" unless defined($pid);
	POSIX::setsid() or die "Can't detach: \$!";
	&dprint('DEBUG', "Detach from terminal with pid: $$");
	open(STDIN , '<', "/dev/null");
	open(STDOUT, '>', "/dev/null");
}

# Set name of the program without path
my $orig_name = $0;
$0 = $PROGRAM;

# Create pid file
my $fhp = IO::File->new($CONFIG{ 'pidfile' }, 'w');
if (not defined $fhp) {
	die "FATAL: can't create pid file $CONFIG{ 'pidfile' }, $!\n";
}
print $fhp $$;
close($fhp);

####
# Entering main loop
####
dprint('LOG', "Entering main loop.");
my $previous_async_exec = 0;
my $previous_scheduled_exec = 0;
# exit loop when SIGTERM is received or we are running in single mode
while (!$fini)
{
	# Stores loop start time
	my $t0 = time;

	####
	# look if there are some child processes dead to register
	####
	foreach my $k (keys %RUNNING_PIDS)
	{
		my $kid = waitpid(-1, WNOHANG);
		if ($kid > 0) {
			delete $RUNNING_PIDS{$kid};
		}
	}

	# When the configuration is invalidated we must also disconnect
	# from the database in case the connection settings have changed
	if ($config_invalidated)
	{
		# We need to reconnect in the main loop
		$dbh->disconnect() if (defined $dbh);
		$dbh = undef;
	}

	# Connect to the database if this is not already the case
	$dbh = connect_db() if (not defined $dbh);

	####
	# Look if we have received some notification from the database
	####
	my $async_count = 0;
	my $scheduled_count = 0;
	if (defined $dbh)
	{
		$config_invalidated = 0;
		while (my $notify = $dbh->pg_notifies)
		{
			my ($topic, $pid, $payload) = @$notify;
			dprint('DEBUG', "Received notification: ($topic, $pid, $payload)");
			$async_count++ if ($topic eq 'dbms_job_async_notify');
			$scheduled_count++ if ($topic eq 'dbms_job_scheduled_notify');
		}
	}
	else
	{
		# In case we can not connect to the database or we
		# are on a standby wait three seconds and try again.
		sleep(3);
		$startup = 1;
		$config_invalidated = 1;
		next;
	}

	####
	# In case we do not receive notification in the job_queue_interval
	# interval, force the collect of the async job to execute in case
	# there is an old one with a next_date that has expired.
	####
	if ( !$async_count and !$startup and $t0 >= ($previous_async_exec+$job_queue_interval) ) {
		dprint('DEBUG', "job_queue_interval reached, forcing collect of asynchronous jobs");
		$async_count = 1;
	}
	if ( !$scheduled_count and !$startup and $t0 >= ($previous_scheduled_exec+$job_queue_interval) ) {
		dprint('DEBUG', "job_queue_interval reached, forcing collect of scheduledhronous jobs");
		$scheduled_count = 1;
	}

	####
	# Get a list of asynchrous queued jobs to execute
	####
	if ($async_count || $startup)
	{
		%ASYNC_JOBS = get_async_jobs($async_count);
		# Register last execution time
		$previous_async_exec = time;
	}

	####
	# Get jobs defined in the remote database if they are not
	# already cached or that the cache have been invalidated.
	####
	if ($scheduled_count || $startup)
	{
		%SCHEDULED_JOBS = get_scheduled_jobs();
		# Register last execution time
		$previous_scheduled_exec = time;
		# If we lost the connection 
		if ($config_invalidated) {
			sleep(3);
			$startup = 1;
			next;
		}
	}

	# Init flags
	$config_invalidated = 0;
	$startup = 0;

	####
	# Process all the scheduled jobs in a dedicated process each.
	####
	foreach my $job (sort { $a <=> $b } keys %SCHEDULED_JOBS)
	{
		# If we have forked too much processes wait one second until a process die
		while (scalar keys %RUNNING_PIDS >= $job_queue_processes) {
			dprint('WARNING', "max job queue size is reached ($job_queue_processes) waiting the end of an other job");
			sleep(1);
		}

		spawn sub
		{
			&subprocess_scheduled_jobs( $job );
		};
	}
	%SCHEDULED_JOBS = ();

	####
	# Process all the asynchronous jobs in a dedicated process each.
	####
	foreach my $job (sort { $a <=> $b } keys %ASYNC_JOBS)
	{
		# If we have forked too much processes wait one second until a process die
		while (scalar keys %RUNNING_PIDS >= $job_queue_processes) {
			dprint('WARNING', "max job queue size is reached ($job_queue_processes) waiting the end of an other job");
			sleep(1);
		}

		spawn sub
		{
			&subprocess_asynchronous_jobs( $job );
		};
	}
	%ASYNC_JOBS = ();

	last if ($SINGLE);

	# We can not loop quicker than naptime, 0.1 second by default
	Time::HiRes::sleep($naptime);
}

# Wait for last child stop
&wait_all_childs();

if (-f $CONFIG{ 'pidfile' }) {
        unlink("$CONFIG{ 'pidfile' }") or dprint('ERROR', "Unable to remove pid file $CONFIG{ 'pidfile' }, $!");
}

dprint('LOG', "pg_dbms_job scheduler stopped.");

exit 0;


#------------------------------------------------------------------------------
#                                   METHODS
#------------------------------------------------------------------------------

####
# Show help
####
sub usage
{
	print qq{
usage: $PROGRAM [options]

options:

  -c, --config  file  configuration file. Default: $CONFIG_FILE
  -d, --debug         run in debug mode.
  -k, --kill          stop current running daemon gracefully waiting
                      for all job completion.
  -m, --immediate     stop running daemon and jobs immediatly.
  -r, --reload        reload configuration file and jobs definition.
  -s, --single        do not detach and run in single loop mode and exit.
};

	exit 0;
}

####
# Same as local_die but with pid file cleanup
####
sub local_die
{
	unlink("$CONFIG{ 'pidfile' }") if (-f $CONFIG{ 'pidfile' });
	die "$_[0]";
}

####
# Send a signal to the pg_dbms_job daemon
####
sub signal_handling
{
	my $sig = shift;

        my $proc = '';
        if (-e "$CONFIG{ 'pidfile' }") {
                $proc = `cat $CONFIG{ 'pidfile' }`;
        } else {
                $proc = `ps h -opid -C$PROGRAM | head -1`;
        }
        chomp($proc);
        $proc =~ s/ //g;

        if (!$proc) {
                die "ERROR: can't find a pid to send SIG$sig, is $PROGRAM running?\n";
        }

        kill "-$sig", $proc;
        if ($? == -1) {
                print "FATAL: failed to execute: $!\n";
        } elsif ($? & 127) {
                printf "ERROR: child died with signal %d, %s coredump\n", ($? & 127),  ($? & 128) ? 'with' : 'without';
        } else {
                printf "OK: $PROGRAM exited with value %d\n", $? >> 8;
        }
        exit 0;
}

####
# Read configuration file
####
sub read_config
{
	my $nodie = shift; # When reload is called, set to 1

	if (!-e $CONFIG_FILE) {
		if (!$nodie) {
			die("FATAL: can not find the configuration file $CONFIG_FILE\n");
		} else {
			dprint('ERROR', "can not find the configuration file $CONFIG_FILE");
			return;
		}
	}

	my $curfh = IO::File->new($CONFIG_FILE, 'r');
	if (defined $curfh)
	{
		my @content = <$curfh>;
		$curfh->close();

		# Set the logfile first to be able to log the configuratioin changes
		foreach my $l ( @content )
		{
			chomp($l);
			# cleanup the line
			$l =~ s/\r//s;
			$l =~ s/\#.*//;
			$l =~ s/^\s+//;
			$l =~ s/\s+$//;
			next if (!$l);

			my ($var, $val) = split(/\s*=\s*/, $l);
			$var = lc($var);
			if ($var eq 'logfile')
			{
				if ($CONFIG{ 'logfile' } ne $val)
				{
					$CONFIG{ 'logfile' } = $val;
					dprint('LOG', "Setting logfile from configuration file to $CONFIG{ 'logfile' }");
				}
			}
		}

		foreach my $l ( @content )
		{
			chomp($l);
			# cleanup the line
			$l =~ s/\r//s;
			$l =~ s/\#.*//;
			$l =~ s/^\s+//;
			$l =~ s/\s+$//;
			next if (!$l);

			my ($var, $val) = split(/\s*=\s*/, $l);
			$var = lc($var);
			if ($var eq 'pidfile')
			{
				if ($CONFIG{ 'pidfile' } ne $val)
				{
					$CONFIG{ 'pidfile' } = $val;
					dprint('LOG', "Setting pidfile from configuration file to $CONFIG{ 'pidfile' }");
				}
			}
			elsif ($var eq 'debug')
			{
				$val = int($val);
				if ($CONFIG{ 'debug' } ne $val)
				{
					$CONFIG{ 'debug' } = $val;
					dprint('LOG', "Setting debug from configuration file to $CONFIG{ 'debug' }");
				}
			}
			elsif ($var eq 'job_queue_interval')
			{
				$val = int($val);
				if ($job_queue_interval != $val)
				{
					$job_queue_interval = $val;
					dprint('LOG', "Setting job_queue_interval from configuration file to $job_queue_interval");
				}
			}
			elsif ($var eq 'job_queue_processes')
			{
				$val = int($val);
				if ($job_queue_processes != $val)
				{
					$job_queue_processes = $val;
					dprint('LOG', "Setting job_queue_processes from configuration file to $job_queue_processes");
				}
			}
			elsif ($var =~ /^(host|database|user|passwd|port)$/)
			{
				$val = int($val) if ($var eq 'port');
				if (not exists $DBINFO{$var} or $DBINFO{$var} ne $val)
				{
					$DBINFO{$var} = $val;
					dprint('LOG', "Setting $var from configuration file to $DBINFO{$var}");
				}
			}
		}
	}
}

####
# Wait for all subprocesses die
####
sub wait_all_childs
{
	while (scalar keys %RUNNING_PIDS > 0)
	{
		my $kid = waitpid(-1, WNOHANG);
		if ($kid > 0)
		{
			delete $RUNNING_PIDS{$kid};
		}
		sleep(1);
	}
}

####
# Log messages to file 
####
sub dprint
{
	my ($level, $msg) = @_;

	my ($package, $filename, $line) = caller;

	return if ($KILL or (uc($level) eq 'DEBUG' and !$CONFIG{ 'debug' }));

	my $t = strftime('%Y-%m-%d %H:%M:%S', localtime);

	my $fname = $CONFIG{ 'logfile' } || '';

	# Apply the strftime formatting if required
	$fname = localtime->strftime($fname) if ($fname =~ /\%/);
	# If required we truncate the log file on rotation
	if ($CONFIG{ 'log_truncate_on_rotation' } && $old_log_file)
	{
		if ($fname ne $old_log_file && -e $fname) {
			unlink($fname);
		}
	}
	$old_log_file = $fname;

	if ($fname && open(my $out, '>>', $fname))
	{
		flock($out, 2) or return;
		print $out "$t [$$]: [$line] $level:  $msg\n";
		close($out);
	} else {
		print STDERR "ERROR: can't write to log file $fname, $!\n";
		print STDERR "$t [$$]: [$line] $level:  $msg\n";
	}
}

####
# Connect to the database and return the global database connection handler
####
sub connect_db
{
	$0 = 'pg_dbms_job:main';
	my $ldbh = DBI->connect("dbi:Pg:dbname=$DBINFO{database};host=$DBINFO{host};port=$DBINFO{port}",
						$DBINFO{user},
						$DBINFO{passwd},
						{AutoInactiveDestroy => 1, PrintError => 0, AutoCommit => 1});
	# Check for connection failure
	if (not defined $ldbh)
	{
		dprint('FATAL', "can't connect to \"dbi:Pg:dbname=$DBINFO{database};host=$DBINFO{host};port=$DBINFO{port}\", $DBI::errstr");
		return undef;
	}

	# Set application name to pg_dbms_job:main
	if (not $ldbh->do("SET application_name TO 'pg_dbms_job:main'"))
	{
		dprint('ERROR', "can not set application_name, reason: " . $ldbh->errstr);
		$ldbh->disconnect() if (defined $ldbh);
		return undef;
	}

	####
	# Now that we are connected verify that there is no other
	# scheduler already runing on this database by looking for
	# entries with the same application_name in pg_stat_activity.
	# We additionaly detect if we are running on a standby server.
	####
	my $query = "SELECT count(*), pg_is_in_recovery() FROM pg_catalog.pg_stat_activity WHERE datname='$DBINFO{database}' AND application_name='pg_dbms_job:main'";
	my $sth = $ldbh->prepare($query);
	if (!defined $sth)
	{
		dprint('ERROR', "can't prepare statement, $DBI::errstr");
		$ldbh->disconnect() if (defined $ldbh);
		return undef;
	}

	$sth->execute;
	if ($sth->err)
	{
		dprint('ERROR', "can't execute statement, $DBI::errstr");
		$ldbh->disconnect() if (defined $ldbh);
		return undef;
	}
	my @row = $sth->fetchrow;
	if ($row[0] > 1)
	{
		dprint('FATAL', "another pg_dbms_job process is running on this database! Aborting.");
		$ldbh->disconnect() if (defined $ldbh);
		local_die("FATAL: another pg_dbms_job process is running on this database! Aborting.\n");
	}

	# Close the connection if we are running on a standby,
	# there is nothing to do on a standby it is readonly.
	if ($row[1])
	{
		$ldbh->disconnect() if (defined $ldbh);
		return undef;
	}

	# Subscribe to pg_dbms_job notification channels
	$ldbh->do("LISTEN dbms_job_cache_invalidate");
	$ldbh->do("LISTEN dbms_job_async_notify");

	return $ldbh;
}

####
# Get all jobs defined in the remote database.
####
sub get_scheduled_jobs
{
	dprint('DEBUG', 'Get scheduled jobs to run');

	my %alljobs = ();

	# Get all scheduled jobs from table ALL_JOBS that
	# must be run and set the next execution date.
	my $query  = "UPDATE dbms_job.all_scheduled_jobs SET";
	$query .= " this_date = current_timestamp,";
       	$query .= " next_date = dbms_job.get_next_date(interval),";
       	$query .= " instance = instance+1"; # internal used only to not be notified for this change
        $query .= " WHERE interval IS NOT NULL AND NOT broken AND this_date IS NULL";
        $query .= " AND next_date <= current_timestamp RETURNING *";
	my $sth = $dbh->prepare($query);
	if (!defined $sth)
	{
		dprint('ERROR', "can't prepare statement, $DBI::errstr");
		$config_invalidated = 1 if (!$SINGLE);
		return %alljobs;
	}

	$sth->execute;
	if ($sth->err)
	{
		dprint('ERROR', "can't execute statement, $DBI::errstr");
		$config_invalidated = 1 if (!$SINGLE);
		return %alljobs;
	}

	while (my $row = $sth->fetchrow_hashref)
	{
		# Register the jobs information into a hash by job id
		foreach my $k (keys %$row) {
			$alljobs{$row->{job}}{$k} = $row->{$k}; 
		}
	}
	$sth->finish();

	dprint('DEBUG', "Found " . (scalar keys %alljobs) . " scheduled jobs to run");

	return %alljobs;
}

####
# Get asynchronous jobs to execute and delete them from the queue.
####
sub get_async_jobs
{
	my %asyncjobs = ();
	my $limit = '';

	# Get all jobs to be executed asap from table ALL_ASYNC_JOBS
	# we change this_date to avoid reading it again
	my $query  = "UPDATE dbms_job.all_async_jobs SET";
	$query .= " this_date = current_timestamp";
        $query .= " WHERE this_date IS NULL RETURNING *";
	my $sth = $dbh->prepare($query);
	if (!defined $sth)
	{
		dprint('ERROR', "can't prepare statement, $DBI::errstr");
		return %asyncjobs;
	}

	$sth->execute;
	if ($sth->err)
	{
		dprint('ERROR', "can't execute statement, $DBI::errstr");
		return %asyncjobs;
	}
	while (my $row = $sth->fetchrow_hashref)
	{
		# Register the jobs information into a hash by job id
		foreach my $k (keys %$row) {
			$asyncjobs{$row->{job}}{$k} = $row->{$k}; 
		}
	}
	$sth->finish();

	# Get all jobs with no interval from table ALL_SCHEDULEd_JOBS
	# where the next_date value is lower or equal to current timestamp
	# as they also need to be executed immediately and removed from the
	# dbms_job.all_scheduled_jobs table as they are no more used.
	$query  = "UPDATE dbms_job.all_scheduled_jobs SET";
	$query .= " this_date = current_timestamp";
        $query .= " WHERE this_date IS NULL AND interval IS NULL AND next_date <= current_timestamp RETURNING *";
	$sth = $dbh->prepare($query);
	if (!defined $sth)
	{
		dprint('ERROR', "can't prepare statement, $DBI::errstr");
		return %asyncjobs;
	}

	$sth->execute;
	if ($sth->err)
	{
		dprint('ERROR', "can't execute statement, $DBI::errstr");
		return %asyncjobs;
	}
	while (my $row = $sth->fetchrow_hashref)
	{
		# Register the jobs information into a hash by job id
		foreach my $k (keys %$row) {
			$asyncjobs{$row->{job}}{$k} = $row->{$k}; 
		}
	}
	$sth->finish();

	dprint('DEBUG', "Found " . (scalar keys %asyncjobs) . " asynchronous jobs to run");

	return %asyncjobs;
}

####
# Deleting job from queue
####
sub delete_job
{
	my ($ldbh, $jobid) = @_;

	dprint('DEBUG', "Deleting asynchronous job $jobid from queue");

	my $query  = "DELETE FROM dbms_job.all_async_jobs WHERE job = $jobid RETURNING job";
	my $sth = $ldbh->prepare($query);
	if (!defined $sth) {
		dprint('ERROR', "can't prepare statement, $DBI::errstr");
		end_subprocess($ldbh, $jobid);
	}
	$sth->execute;
	if ($sth->err) {
		dprint('ERROR', "can't execute statement, $DBI::errstr");
		end_subprocess($ldbh, $jobid);
	}
	my $row = $sth->fetchrow;
	$sth->finish();

	# If no deleted job was returned, this was a scheduled job without interval
	if (not defined $row)
	{
		$query  = "DELETE FROM dbms_job.all_scheduled_jobs WHERE job = $jobid";
		my $sth = $ldbh->prepare($query);
		if (!defined $sth) {
			dprint('ERROR', "can't prepare statement, $DBI::errstr");
			end_subprocess($ldbh, $jobid);
		}
		$sth->execute;
		if ($sth->err) {
			dprint('ERROR', "can't execute statement, $DBI::errstr");
			end_subprocess($ldbh, $jobid);
		}
	}
}

####
# End a subprocess with some cleanup
####
sub end_subprocess
{
	my ($ldbh, $jobid) = @_;

	$ldbh->disconnect() if (defined $ldbh);
	dprint('LOG', "end of subprocess executing job $jobid");
	exit(0);
}

####
# Execute the asynchronous job's plpgsql code on the remote database
####
sub subprocess_asynchronous_jobs
{
	my $jobid = shift;

	my $start_t = strftime('%Y-%m-%d %H:%M:%S', localtime);

	$0 = "pg_dbms_job:async:$jobid";

	# Subprocess must completed their work in case
	# of interruption unless we received SIGINT
	$SIG{TERM} = 'IGNORE';
	$SIG{HUP}  = 'IGNORE';

	dprint('LOG', "executing job $jobid");

	if ($CONFIG{ 'debug' })
	{
		foreach my $k (sort { $a <=> $b } keys %{ $ASYNC_JOBS{ $jobid } })
		{
			dprint('DEBUG', "job $jobid with parameter $k: $ASYNC_JOBS{ $jobid }{ $k }");
		}
	}

	# Clone the connection to the database
	my $ldbh = $dbh->clone();

	if (defined $ldbh)
	{
		####
		# Set application name to pg_dbms_job:jobid
		####
		if (not $ldbh->do("SET application_name TO 'pg_dbms_job:async:$jobid'")) {
			dprint('ERROR', "can not set application_name, reason: " . $ldbh->errstr);
			end_subprocess($ldbh, $jobid);
		}

		####
		# Set role for the code execution
		####
		if ($ASYNC_JOBS{ $jobid }{ 'log_user' })
		{
			if (not $ldbh->do("SET ROLE $ASYNC_JOBS{ $jobid }{ 'log_user' }")) {
				dprint('ERROR', "can not change role, reason: " . $ldbh->errstr);
				end_subprocess($ldbh, $jobid);
			}
		}

		####
		# start a transaction
		####
		if (not $ldbh->do("BEGIN")) {
			dprint('ERROR', "can not start a transaction, reason: " . $ldbh->errstr);
			end_subprocess($ldbh, $jobid);
		}

		####
		# Set search_path for the code execution
		####
		if ($ASYNC_JOBS{ $jobid }{ 'schema_user' })
		{
			if (not $ldbh->do("SET LOCAL search_path TO $ASYNC_JOBS{ $jobid }{ 'schema_user' }")) {
				dprint('ERROR', "can not change the search_path, reason: " . $ldbh->errstr);
				end_subprocess($ldbh, $jobid);
			}
		}

		####
		# Execute the code of the WHAT column
		####
		my $success = 1;
		my $errstr = '';
		my $status = '';
		my $sqlstate = '';
		my $t0 = time;

		my $codetoexec = qq{DO \$\$
DECLARE
	job bigint := $jobid;
	next_date timestamp with time zone := current_timestamp;
	broken boolean := false;
BEGIN
	$ASYNC_JOBS{ $jobid }{ 'what' }
END;
\$\$;
};
		dprint('DEBUG', "executing code of job id $jobid: $ASYNC_JOBS{ $jobid }{ 'what' }");
		if (not $ldbh->do($codetoexec))
		{
			$success = 0;
			$errstr = $ldbh->errstr;
			$status = $PQSTATUS{$ldbh->err} || '';
			$sqlstate = $ldbh->state || '';
			dprint('ERROR', "job $jobid failure, reason: $errstr");
			# Rollback the transaction
			if (not $ldbh->do("ROLLBACK")) {
				dprint('ERROR', "can not rollback a transaction, reason: " . $ldbh->errstr);
				delete_job($ldbh, $jobid);
				end_subprocess($ldbh, $jobid);
			}
		}
		else
		{
			# Commit the transaction
			if (not $ldbh->do("COMMIT")) {
				dprint('ERROR', "can not commit a transaction, reason: " . $ldbh->errstr);
				delete_job($ldbh, $jobid);
				end_subprocess($ldbh, $jobid);
			}
		}
		delete_job($ldbh, $jobid);

		my $t1 = time;
		# Store the execution result in table all_job_run_details
		my @ret = store_job_execution_details(  $ldbh, $ASYNC_JOBS{ $jobid }{ 'log_user' },
							$jobid, $start_t, $t1 - $t0,
							$status, $errstr, $success, $sqlstate);

		$ldbh->disconnect() if (defined $ldbh);
		if ($#ret >= 0) {
			print STDERR "ERROR: can't execute statement: \"$ret[0]\", $ret[1]\n";
		}
	}

	end_subprocess($ldbh, $jobid);
}

####
# Store the history of the job execution
####
sub store_job_execution_details
{
	my ($ldbh, $owner, $jobid, $start_date, $duration, $errstr, $pqstatus, $success, $sqlstate) = @_;

	$pqstatus =~ s/'/''/g;
	$errstr =~ s/'/''/g;
	$sqlstate =~ s/'/''/g;

	my $query = "INSERT INTO dbms_job.all_scheduler_job_run_details (owner, job_name, status, error, req_start_date, actual_start_date, run_duration, slave_pid, additional_info)";
	$query .= " VALUES ('$owner', '$jobid', ";
	$query .= "'$pqstatus', '$sqlstate', NULL, '$start_date', $duration, ";
	$query .= "$$, '$errstr')";

	if (not $ldbh->do($query))
	{
		dprint('ERROR', "can't execute statement: \"$query\", $DBI::errstr");
		return ($query, $DBI::errstr);;
	}
	return ();
}

####
# Execute the scheduled job's plpgsql code on the remote database
####
sub subprocess_scheduled_jobs
{
	my $jobid = shift;

	my $start_t = strftime('%Y-%m-%d %H:%M:%S', localtime);

	$0 = "pg_dbms_job:scheduled:$jobid";

	# Subprocess must completed their work in case
	# of interruption unless we received SIGINT
	$SIG{TERM} = 'IGNORE';
	$SIG{HUP}  = 'IGNORE';

	dprint('LOG', "executing job $jobid");
	if ($CONFIG{ 'debug' })
	{
		foreach my $k (sort { $a <=> $b } keys %{ $SCHEDULED_JOBS{ $jobid } })
		{
			dprint('DEBUG', "job $jobid with parameter $k: $SCHEDULED_JOBS{ $jobid }{ $k }");
		}
	}

	# Clone the connection to the database
	my $ldbh = $dbh->clone();

	if (defined $ldbh)
	{
		my $t0 = time;

		# Set application name to pg_dbms_job:jobid
		if (not $ldbh->do("SET application_name TO 'pg_dbms_job:scheduled:$jobid'"))
		{
			dprint('ERROR', "can not set application_name, reason: " . $ldbh->errstr);
			end_subprocess($ldbh, $jobid);
		}

		# Set role for the code execution
		if ($SCHEDULED_JOBS{ $jobid }{ 'log_user' })
		{
			if (not $ldbh->do("SET ROLE $SCHEDULED_JOBS{ $jobid }{ 'log_user' }"))
			{
				dprint('ERROR', "can not change role, reason: " . $ldbh->errstr);
				end_subprocess($ldbh, $jobid);
			}
		}

		# start a transaction
		if (not $ldbh->do("BEGIN"))
		{
			dprint('ERROR', "can not start a transaction, reason: " . $ldbh->errstr);
			end_subprocess($ldbh, $jobid);
		}

		# Set search_path for the code execution
		if ($SCHEDULED_JOBS{ $jobid }{ 'schema_user' })
		{
			if (not $ldbh->do("SET LOCAL search_path TO $SCHEDULED_JOBS{ $jobid }{ 'schema_user' }"))
			{
				dprint('ERROR', "can not change the search_path, reason: " . $ldbh->errstr);
				end_subprocess($ldbh, $jobid);
			}
		}

		my $success = 1;
		my $errstr = '';
		my $status = '';
		my $sqlstate = '';

		my $codetoexec = qq{DO \$\$
DECLARE
	job bigint := $jobid;
	next_date timestamp with time zone := current_timestamp;
	broken boolean := false;
BEGIN
	$SCHEDULED_JOBS{ $jobid }{ 'what' }
END;
\$\$;
};
		dprint('DEBUG', "executing code of job id $jobid: $SCHEDULED_JOBS{ $jobid }{ 'what' }");

		# -----------------------------------------------------------------------------
		# In 12c when a job fails dbms_jobs.last_date retains the previous date
		# of the last successful run and dbms_jobs.failures is incremented.
		# When the job completes successfully dbms_jobs.last_date is updated and
		# dbms_jobs.failures is set to zero. If a new job never runs successfully
		# dbms_jobs.last_date will remain null and dbms_jobs.failures increments.
		# 
		# In 19c when a job fails dbms_jobs.last_date is updated and dbms_jobs.failures
		# is incremented. When the job completes successfully dbms_jobs.last_date is
		# updated however dbms_jobs.failures does not get reset to zero. If a new job
		# never runs successfully dbms_jobs.last_date is updated and dbms_jobs.failures
		# increments.
		#
		# Here this is the 12c behavior who is retained.
		# -----------------------------------------------------------------------------

		# Execute the code of the WHAT column
		if (not $ldbh->do($codetoexec))
		{
			$success = 0;
			$errstr = $ldbh->errstr;
			$status = $PQSTATUS{$ldbh->err} || '';
			$sqlstate = $ldbh->state || '';
			dprint('ERROR', "job $jobid failure, reason: $errstr");
			# Rollback the transaction
			if (not $ldbh->do("ROLLBACK"))
			{
				dprint('ERROR', "can not rollback a transaction, reason: " . $ldbh->errstr);
				end_subprocess($ldbh, $jobid);
			}
			else
			{
				my $t1 = time;
				my $timediff = $t1 - $t0;
				# Update the begin execution date for this job and the total
				# number of times that the job has failed to complete since
				# it’s last successful execution.
				my $updt = "UPDATE dbms_job.all_scheduled_jobs SET";
				$updt   .= " this_date = NULL,";
				$updt   .= " failures = failures+1";
				$updt   .= " WHERE job = $jobid";
				if (not $ldbh->do($updt))
				{
					dprint('ERROR', "can not update dbms_job.all_scheduled_jobs for job id $jobid, reason: " . $ldbh->errstr);
					end_subprocess($ldbh, $jobid);
				}
			}
		}
		else
		{
			# Commit the transaction
			if (not $ldbh->do("COMMIT"))
			{
				dprint('ERROR', "can not commit a transaction, reason: " . $ldbh->errstr);
				end_subprocess($ldbh, $jobid);
			}
		}

		my $t1 = time;

		my $timediff = $t1 - $t0;
		# Update the begin execution date for this job, the last successful execution date
		# the total execution time of the job and reset the failure counter
		my $updt = "UPDATE dbms_job.all_scheduled_jobs SET";
		$updt   .= " this_date = NULL,";
		$updt   .= " last_date = current_timestamp,";
		$updt   .= " total_time = '$timediff seconds'::interval,";
		$updt   .= " failures = 0,";
		$updt   .= " instance = instance+1"; # internal used only to not be notified for this change
		$updt   .= " WHERE job = $jobid";
		if (not $ldbh->do($updt))
		{
			dprint('ERROR', "can not update dbms_job.all_scheduled_jobs for job id $jobid, reason: " . $ldbh->errstr);
			end_subprocess($ldbh, $jobid);
		}

		# Store the execution result in table all_job_run_details
		my @ret = store_job_execution_details(  $ldbh, $SCHEDULED_JOBS{ $jobid }{ 'log_user' },
							$jobid, $start_t, $t1 - $t0,
							$status, $errstr, $success, $sqlstate);

		$ldbh->disconnect() if (defined $ldbh);
		if ($#ret >= 0) {
			print STDERR "ERROR: can't execute statement: \"$ret[0]\", $ret[1]\n";
		}
	}
	end_subprocess($ldbh, $jobid);
}

