#!/usr/bin/env perl

our $VERSION = "1.09";

=head1 NAME

pg_sample - extract a small, sample dataset from a larger PostgreSQL
database while maintaining referential integrity.

=head1 SYNOPSIS

pg_sample [ option... ] [ dbname ]

=head1 DESCRIPTION

pg_sample is a utility for exporting a small, sample dataset from a
larger PostgreSQL database. The output and command-line options closely
resemble the pg_dump backup utility (although only the plain-text format
is supported).

The sample database produced includes all tables from the original,
maintains referential integrity, and supports circular dependencies.

To build an actual instance of the sample database, the output of this script
can be piped to the psql utility. For example, assuming we have an existing
PostgreSQL database named "mydb", a sample database could be constructed with:

  createdb sampledb
  pg_sample mydb | psql sampledb

=head2 Requirements

* PostgreSQL 8.1 or later

* pg_dump should be in your search path (in order to dump the schema)

* Perl DBI and DBD::Pg (>= 2.0) modules

=head2 Command-line Options

=over

=item I<dbname>

Specifies the database to sample. If not specified, uses the
environment variable PGDATABASE, if defined; otherwise, uses
the username of the user executing the script.

=item B<-a>

=item B<--data-only>

Output only the data, not the schema (data definitions).

=item B<--help>

Output detailed options and exit.

=item B<-E> I<encoding>

=item B<--encoding=>I<encoding>

Use the specified character set encoding. If not specified, uses the
environment variable PGCLIENTENCODING, if defined; otherwise, uses
the encoding of the database.

=item B<-f> I<file>

=item B<--file=>I<file>

Send output to the specified file. If omitted, standard output is used.

=item B<--force>

Drop the sample schema if it exists.

=item B<--keep>

Don't delete the sample schema when the script finishes.

=item B<--limit=>I<limit>

As a numeric value, specifies the default number of rows to copy from
each table (defaults to 100). Note that sample tables may end up with
significantly more rows in order to satisfy foreign key constraints.

If the value is a string, it is interpreted as a pattern/rule pair to
apply to matching tables. Examples:

  # include all rows from the users table
  --limit="users = *"
 
  # include 1,000 rows from users table
  --limit="users = 1000"

  # include all users where deactivated column is false
  --limit="users = NOT deactivated"

  # include all rows from all tables in the forums schema
  --limit="forums.* = *"

The limit option may be specified multiple times. Multiple pattern/rule
pairs can also be specified as a single comma-separated value. For example:

  # include all rows from the ads table; otherwise default to 300 rows
  --limit="ads=*,*=300"

Rules are applied in order with the first match taking precedence.

=item B<--random>

Randomize the rows initially selected from each table. May significantly
increase the running time of the script.

=item B<--schema=>I<name>

The schema name to use for the sample database (defaults to _pg_sample).

=item B<--trace>

Turn on Perl DBI tracing. See the DBI module documentation for details.

=item B<--verbose>

Output status information to standard error.

=back

The following options control the database connection parameters.

=over

=item B<-h> I<host>

=item B<--host>=I<host>

The host name to connect to. Defaults to the PGHOST environment
variable if not specified.

=item B<-p> I<port>

=item B<--port=>I<port>

The database port to connect to. Defaults to the PGPORT environment
variable, if set; otherwise, the default port is used.

=item B<-U> I<username>

=item B<--username=>I<username>

User name to connect as.

=item B<-W> I<password>

=item B<-password=>I<password>

Password to connect with.

=back

=head1 LICENSE

This code is released under the Artistic License. See L<perlartistic>.

=head1 SEE ALSO

L<createdb(1)>, L<pg_dump(1)>, L<psql(1)>

=head1 AUTHOR

Maurice Aubrey <maurice.aubrey@gmail.com>

=cut

# Algorithm: we create a special sample schema in the original database
# (named _pg_sample by default).
#
# For each table in the original database, a similarly named table is created
# in the sample schema, along with a subset of rows (100 rows maximum, by
# default).
#
# Foreign key constraints are then checked, and any rows needed to satisfy them
# are copied into the sample tables.
#
# Finally, the database schema and sample data are output, renaming the sample
# table names to reflect the original database.

use strict;
use warnings;
use Carp;
use Getopt::Long qw/ GetOptions :config no_ignore_case /;
use DBI;
use DBD::Pg 2.0.0;

$SIG{TERM} = $SIG{INT} = $SIG{QUIT} = $SIG{HUP} = sub {
  my $signal = shift;
  die "Received signal '$signal'; cleaning up and terminating.\n";
};

BEGIN {
  # Encapsulate a fully-qualified Postgresql table name
  package Table;
  
  use strict;
  use Carp;
  use overload '""' => 'quoted'; # interpolation results in quoted version
 
  *dbh = \&main::connect_db;
  
  sub new {
    my $class = shift;
  
    $class = ref($class) || $class;
    return bless {
      schema => shift,
      table => shift,
    };
  }
  
  sub quoted {
    my $self = shift;
  
    return $self->dbh->quote_identifier(
      undef,
      $self->{schema},
      $self->{table},
    );
  }

  # only used for command-line matching to keep it simple
  sub unquoted {
    my $self = shift;

    return join '.', $self->{schema}, $self->{table};
  }
  
  sub schema {
    my $self = shift;
  
    $self->{schema} = shift if @_;
    return $self->{schema};
  }
  
  sub table {
    my $self = shift;
  
    $self->{table} = shift if @_;
    return $self->{table};
  }
  
  sub DESTROY {}
  
  sub AUTOLOAD {
    my $self = shift;
  
    our $AUTOLOAD;
    $AUTOLOAD =~ s/.*:://g;
  
    if ($AUTOLOAD =~ /^quoted_(.+)$/) {
      my $method = $1;
      return $self->dbh->quote_identifier($self->$method);
    }
  
    croak "Can't locate object method \"$AUTOLOAD\" via package " .
          __PACKAGE__;
  }
}

sub list {
  map { ref $_ eq 'ARRAY' ? @$_ : $_ } @_
}

my %opt; # closure; all functions have access to options

{
  my $dbh; # cached db handle

  sub connect_db {
    return $dbh if $dbh;

    my $dsn = join ';',
      "dbi:Pg:dbname=$opt{db_name}",
      $opt{db_host} ? "host=$opt{db_host}" : (),
      $opt{db_port} ? "port=$opt{db_port}" : (),
    ;
  
    $dbh = DBI->connect(
      $dsn,
      $opt{db_user},
      $opt{db_pass},
      {
        RaiseError       => 1,
        AutoCommit       => 1,
        # FetchHashKeyName => 'NAME_lc',
        PrintError       => 0,
        HandleError      => sub { confess( shift ) },
      },
    ) or croak "db connection failed!";
  
    $dbh->trace(1) if defined $opt{trace};
  
    return $dbh;
  }
}

# Retrieve PostgreSQL server version.
# Expect something like:
#   PostgreSQL 8.4.8 on x86_64-pc-linux-gnu, compiled by GCC gcc-4.6.real (Ubuntu/Linaro 4.6.0-7ubuntu1) 4.6.1, 64-bit
sub pg_version {
  my ($ver) = connect_db()->selectrow_array("SELECT version()")
    or croak "request to select postgresql server version failed";
  $ver =~ /([\d.]+)/ or croak "unable to parse postgresql version from '$ver'";

  return version->declare($1);
}

# Return a copy of a hash with all keys lowercased.
sub lower_keys($) {
  my $hash = shift or return;

  ref $hash eq 'HASH' or croak "not a hash reference";

  my %lower = map { lc($_) => $hash->{$_} } keys %$hash;
  return \%lower;
}

# See "Identifiers and Key Words" section 4.1.1
# http://www.postgresql.org/docs/8.2/static/sql-syntax-lexical.html
sub unquote_identifier (@) {
  my @unquoted;
  while (@_) {
    my $val = shift @_;
    if ($val =~ /^\s*"(.*)"\s*$/) { # looks quoted?
      $val = $1;
      $val =~ s/(["\\])\1/$1/g; # undo doubled chars
    }
    push @unquoted, $val;
  }

  return wantarray ? @unquoted : $unquoted[0];
}

# See "String Constants" section 4.1.2.1
# http://www.postgresql.org/docs/8.2/static/sql-syntax-lexical.html
sub quote_constant (@) {
  my @quoted;
  while (@_) {
    my $val = shift @_;
    $val =~ s/'/''/g;
    push @quoted, "'$val'";
  }

  return wantarray ? @quoted: $quoted[0];
}

# Encode the actual schema and table name into a new table
# name that lives under our sample schema. e.g., a table like
# users.details (schema users, table details) would be converted
# to something like _pg_sample.users_details (depending on the
# value of --schema).
#
# @param Table instance
sub sample_table ($) {
  my $table = shift;

  my $sample_table = join '_', $table->schema || 'public', $table->table;
  return Table->new($opt{schema}, $sample_table);
}

sub notice (@) {
  return unless $opt{verbose};
  print STDERR join "", @_;
}

%opt = (
  db_host => '',
  db_port => '',
  keep => 0,
  random => 0,
  schema => '_pg_sample',
  verbose => 0,
);

GetOptions(\%opt,
  "data-only|data_only|a",
  "db_name|db-name=s",
  "db_user|db-user|db_username|db-username|username|U=s",
  "db_pass|db-pass|db_password|db-password|password|W=s",
  "db_host|db-host|host=s",
  "db_port|db-port|port=i",
  "encoding|E=s",
  "file|f=s",
  "force",
  "help|h|?|usage",
  "keep",
  "limit=s@",
  "random",
  "schema=s",
  "trace",
  "verbose|v",
  "version|V",
);

if ($opt{version}) {
  print "$VERSION\n";
  exit 0;
}

if ($opt{help}) {
  require Pod::Usage;
  Pod::Usage::pod2usage(-verbose => 2);
  exit 0;
}

@ARGV or die "\nUsage: $0 [ option... ] [ dbname ]\n\n\t" .
             "$0 --help for detailed options\n";

push @{ $opt{limit} }, ".* = 100 "; # append default limit rule

$opt{db_user}  ||= $ENV{PGUSER} || scalar getpwuid($<);
$opt{db_name}  ||= shift() || $ENV{PGDATABASE} || $opt{db_user};
$opt{db_host}  ||= $ENV{PGHOST} if defined $ENV{PGHOST};
$opt{db_port}  ||= $ENV{PGPORT} if defined $ENV{PGPORT};
$opt{encoding} ||= $ENV{PGCLIENTENCODING} if defined $ENV{PGCLIENTENCODING};

my $dbh = connect_db(%opt) or croak "unable to connect to database";

my $pg_version = pg_version;

if ($opt{schema} eq 'public') {
  die "Error: refusing to use 'public' schema for sampling.\n";
}

my ($schema_oid) = $dbh->selectrow_array(qq{
  SELECT oid 
    FROM pg_catalog.pg_namespace
   WHERE nspname = ?
}, undef, $opt{schema});
if ($schema_oid && !$opt{force}) {
  die "Error: schema '$opt{schema}' already exists. " .
      "Use --force option to overwrite.\n";
}

$dbh->do(qq{ SET client_min_messages = warning }); # suppress notice messages
if ($opt{force}) {
  notice "Dropping sample schema $opt{schema}\n";
  $dbh->do(qq{ DROP SCHEMA IF EXISTS $opt{schema} CASCADE });
}

if ($opt{file}) {
  open STDOUT, '>', $opt{file} or croak "unable to redirect stdout: $!";
}

my ($server_encoding) = $dbh->selectrow_array(qq{ SHOW server_encoding });
notice "Server encoding is $server_encoding\n";

$opt{encoding} ||= $server_encoding;
notice "Client encoding is $opt{encoding}\n";
binmode STDOUT, ":encoding($opt{encoding})";

unless ($opt{'data-only'}) {
  notice "Exporting schema\n";

  local $ENV{PGUSER} = $opt{db_user};
  local $ENV{PGDATABASE} = $opt{db_name};
  local $ENV{PGHOST} = $opt{db_host};
  local $ENV{PGPORT} = $opt{db_port};
  local $ENV{PGCLIENTENCODING} = $opt{encoding};

  my $cmd = "pg_dump --schema-only";
  system($cmd) == 0 or croak "command '$cmd' failed: $?";
}

# If running PostgreSQL 9.1 or later, use UNLOGGED tables
my $unlogged = $pg_version >= version->declare('9.1') ? 'UNLOGGED' : '';

notice "Creating sample schema $opt{schema}\n";
$dbh->do(qq{ CREATE SCHEMA $opt{schema} });
my $created_schema = 1; # keep track that we actually did it; see END block

# parse limit rules
my @limits;
foreach my $limit (grep { /\S/ } map { split /\s*,\s*/ } list($opt{limit})) {
  $limit =~ s/^\s+|\s+$//g;
  $limit = ".* = $limit" if $limit =~ /^\d+$/;

  my ($match, $rule) = ($limit =~ /^([^=\s]+) \s* = \s* (.*)$/x)
    or croak "unexpected limit value '$limit'";
  $match = '.*' if $match eq '*';

  push @limits, [$match, $rule];
}
notice "[limit] $_->[0] = $_->[1]\n" foreach @limits;

# create copies of each table in a separate schema and insert no
# more than --limit rows initially.
my @tables;
my %sample_tables; # real table name -> sample table name
my $sth = $dbh->table_info(undef, undef, undef, 'TABLE');
while (my $row = lower_keys($sth->fetchrow_hashref)) {
  next unless uc $row->{table_type} eq 'TABLE'; # skip SYSTEM TABLE values
  next if $row->{table_schem} eq 'information_schema'; # special pg schema

  my $sname = $row->{pg_schema} || unquote_identifier($row->{TABLE_SCHEM})
    or die "no pg_schema or TABLE_SCHEM value?!";

  my $tname = $row->{pg_table} || unquote_identifier($row->{TABLE_NAME})
    or die "no pg_table or TABLE_NAME value?!";

  my $table = Table->new($sname, $tname);
  push @tables, $table;

  my $sample_table = sample_table($table);
  $sample_tables{ $table } = $sample_table;

  notice "Creating table $sample_table ";

  # find first matching limit rule
  my $where = 'TRUE';
  my $limit = '';
  foreach (@limits) {
    $table->unquoted =~ /^$_->[0]$/i || $table->table =~ /^$_->[0]$/i or next;

    if ($_->[1] eq '*') { # include all rows
      $limit = '';
    } elsif ($_->[1] =~ /^\d+$/) { # numeric value turned into LIMIT
      $limit = "LIMIT $_->[1]"; 
    } else { # otherwise treated as subselect
      $where = "($_->[1])";
    }

    last;
  }
  # warn "\n[LIMIT] $table WHERE $where $limit\n";

  my $order = $opt{random} ? 'ORDER BY random()' : '';

  $dbh->do(qq{
    CREATE $unlogged TABLE $sample_table AS
           SELECT *
             FROM $table
            WHERE $where
           $order
           $limit
  });

  if ($opt{verbose}) {
    my ($num_rows) =
      $dbh->selectrow_array(qq{ SELECT count(*) FROM $sample_table });
    notice "$num_rows\n";
  }
}

# Find foreign keys
my @fks;
foreach my $table (@tables) {
  # find all foreign keys referencing this table
  my $sth = $dbh->foreign_key_info(
    undef, $table->schema, $table->table, # target table
    undef, undef, undef,                  # fk table
  ) or next;

  # There can be multiple rows for a single FK (composite FK)
  my %fks;
  while (my $row = lower_keys($sth->fetchrow_hashref)) {
    my $target_col = $row->{uk_column_name} or croak "no uk_column_name?!";
    my $fk_table = Table->new(
      unquote_identifier($row->{fk_table_schem}),
      unquote_identifier($row->{fk_table_name}),
    );
    my $fk_name = "$fk_table.$row->{fk_name}"; # unique key to group FK rows
    my $fk_col = $row->{fk_column_name};

    $fks{ $fk_name } ||= [$fk_table, $table];
    push @{ $fks{ $fk_name } }, [$fk_col, $target_col];
  }
  push @fks, values %fks;
}

# create indexes on sample table FKs
my $idx = 0;
foreach my $fk (@fks) {
  my ($fk_table, $table, @pairs) = @$fk;

  my $sample_fk_table = $sample_tables{ $fk_table };
  my $idx_name = $dbh->quote_identifier($opt{schema} . '_idx' . ++$idx);
  my $fk_cols = join ', ', map { $_->[0] } @pairs;
  $dbh->do(qq{ CREATE INDEX $idx_name ON $sample_fk_table ($fk_cols) });
}

# Keep inserting rows to satisfy any fk constraints until no more
# are inserted. This should handle circular references.
my $num_rows = 1;
while ($num_rows) {
  $num_rows = 0;

  foreach my $fk (@fks) {
    my ($fk_table, $target_table, @pairs) = @$fk;
    my $target_sample_table = $sample_tables{ $target_table };
    my $sample_fk_table = $sample_tables{ $fk_table };

    my $fk_cols = join ', ', map { $_->[0] } @pairs;
    my $target_cols = join ', ', map { $_->[1] } @pairs;

    my $join1 = join ' AND ',
      map { "f1.$_->[0] = t1.$_->[1]" } @pairs;

    my $join2 = join ' AND ',
      map { "t1.$_->[1] = s1.$_->[1]" } @pairs;

    my $where = join ' AND ',
      map { "s1.$_->[1] IS NULL" } @pairs; 

    # Insert into the sample table all the rows needed to
    # satisfy the fk table, except those already present.
    my $query = qq{
      INSERT INTO $target_sample_table
           SELECT DISTINCT t1.*
             FROM $target_table t1
                  JOIN $sample_fk_table f1 ON ($join1)
                  LEFT JOIN $target_sample_table s1 ON ($join2)
            WHERE $where
    };

    #warn "\nQUERY:\n$query\n";
    # my $sth = $dbh->prepare(qq{ EXPLAIN $query });
    # $sth->execute;
    # while (my $row = $sth->fetchrow_arrayref) {
    #   warn @$row, "\n";
    # }
    # warn "\n\n";

    notice
      "Copying $target_table ($target_cols) rows referenced from " .
      "$sample_fk_table ($fk_cols)... "
    ;

    my $count = $dbh->do($query) || 0;
    notice "$count rows\n";

    $num_rows += $count;
  } 
}

# fetch all sequences and current values
$sth = $dbh->prepare(qq{
  SELECT sequence_schema
         ,sequence_name
    FROM information_schema.sequences
   WHERE sequence_schema NOT LIKE 'pg_%'
});
$sth->execute;
my %seq;
while (my $row = $sth->fetchrow_hashref) {
  my $name = Table->new($row->{sequence_schema}, $row->{sequence_name});
  $seq{ $name } = 0;
}
foreach my $name (keys %seq) {
  my ($lastval) = $dbh->selectrow_array(qq{ SELECT last_value FROM $name });
  $seq{ $name } = $lastval;
}

# Most of these were copied from the pg_dump output.
print <<EOF;

SET client_encoding = '$opt{encoding}';
SET standard_conforming_strings = off;
SET check_function_bodies = false;
SET client_min_messages = warning;
SET escape_string_warning = off;

EOF

notice "Exporting sequences\n";
print "\n";
foreach my $name (sort keys %seq) {
  my $constant = quote_constant($name);
  print "SELECT pg_catalog.setval($constant, $seq{$name});\n";
}
print "\n";

# Disable all foreign key constraints temporarily
print "\n";
foreach my $table (@tables) {
  print "ALTER TABLE $table DISABLE TRIGGER ALL;\n";
}
print "\n";

foreach my $table (@tables) {
  my $sample_table = sample_table($table);
  if ($opt{verbose}) {
    my ($count) = $dbh->selectrow_array("SELECT count(*) FROM $sample_table");
    notice "Exporting data from $sample_table ($count)\n";
  }
  print "COPY $table FROM stdin;\n";
  $dbh->do(qq{ COPY $sample_table TO STDOUT });
  my $buffer = '';
  print $buffer while $dbh->pg_getcopydata($buffer) >= 0;
  print "\\.\n\n";
}

# Re-enable all triggers
print "\n";
foreach my $table (@tables) {
  print "ALTER TABLE $table ENABLE TRIGGER ALL;\n";
}
print "\n";

END {
  # remove sample tables unless requested not to
  if ($created_schema && !$opt{keep}) {
    notice "Dropping sample schema $opt{schema}\n";
    $dbh->do("DROP SCHEMA $opt{schema} CASCADE");
  }

  notice "Done.\n";
  exit 0;
}
