-# $Header: /cvsroot/pgsql/contrib/dblink/Makefile,v 1.4 2001/09/06 10:49:29 petere Exp $
+# $Header: /cvsroot/pgsql/contrib/dblink/Makefile,v 1.5 2002/06/23 21:58:07 momjian Exp $
subdir = contrib/dblink
top_builddir = ../..
--- /dev/null
+
+CREATE TRIGGER "MyTableName_Trig" AFTER INSERT OR DELETE OR UPDATE
+ON "MyTableName" FOR EACH ROW EXECUTE PROCEDURE
+"recordchange" ();
+
--- /dev/null
+ GNU GENERAL PUBLIC LICENSE
+ Version 2, June 1991
+
+ Copyright (C) 1989, 1991 Free Software Foundation, Inc.
+ 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+ Preamble
+
+ The licenses for most software are designed to take away your
+freedom to share and change it. By contrast, the GNU General Public
+License is intended to guarantee your freedom to share and change free
+software--to make sure the software is free for all its users. This
+General Public License applies to most of the Free Software
+Foundation's software and to any other program whose authors commit to
+using it. (Some other Free Software Foundation software is covered by
+the GNU Library General Public License instead.) You can apply it to
+your programs, too.
+
+ When we speak of free software, we are referring to freedom, not
+price. Our General Public Licenses are designed to make sure that you
+have the freedom to distribute copies of free software (and charge for
+this service if you wish), that you receive source code or can get it
+if you want it, that you can change the software or use pieces of it
+in new free programs; and that you know you can do these things.
+
+ To protect your rights, we need to make restrictions that forbid
+anyone to deny you these rights or to ask you to surrender the rights.
+These restrictions translate to certain responsibilities for you if you
+distribute copies of the software, or if you modify it.
+
+ For example, if you distribute copies of such a program, whether
+gratis or for a fee, you must give the recipients all the rights that
+you have. You must make sure that they, too, receive or can get the
+source code. And you must show them these terms so they know their
+rights.
+
+ We protect your rights with two steps: (1) copyright the software, and
+(2) offer you this license which gives you legal permission to copy,
+distribute and/or modify the software.
+
+ Also, for each author's protection and ours, we want to make certain
+that everyone understands that there is no warranty for this free
+software. If the software is modified by someone else and passed on, we
+want its recipients to know that what they have is not the original, so
+that any problems introduced by others will not reflect on the original
+authors' reputations.
+
+ Finally, any free program is threatened constantly by software
+patents. We wish to avoid the danger that redistributors of a free
+program will individually obtain patent licenses, in effect making the
+program proprietary. To prevent this, we have made it clear that any
+patent must be licensed for everyone's free use or not licensed at all.
+
+ The precise terms and conditions for copying, distribution and
+modification follow.
+\f
+ GNU GENERAL PUBLIC LICENSE
+ TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
+
+ 0. This License applies to any program or other work which contains
+a notice placed by the copyright holder saying it may be distributed
+under the terms of this General Public License. The "Program", below,
+refers to any such program or work, and a "work based on the Program"
+means either the Program or any derivative work under copyright law:
+that is to say, a work containing the Program or a portion of it,
+either verbatim or with modifications and/or translated into another
+language. (Hereinafter, translation is included without limitation in
+the term "modification".) Each licensee is addressed as "you".
+
+Activities other than copying, distribution and modification are not
+covered by this License; they are outside its scope. The act of
+running the Program is not restricted, and the output from the Program
+is covered only if its contents constitute a work based on the
+Program (independent of having been made by running the Program).
+Whether that is true depends on what the Program does.
+
+ 1. You may copy and distribute verbatim copies of the Program's
+source code as you receive it, in any medium, provided that you
+conspicuously and appropriately publish on each copy an appropriate
+copyright notice and disclaimer of warranty; keep intact all the
+notices that refer to this License and to the absence of any warranty;
+and give any other recipients of the Program a copy of this License
+along with the Program.
+
+You may charge a fee for the physical act of transferring a copy, and
+you may at your option offer warranty protection in exchange for a fee.
+
+ 2. You may modify your copy or copies of the Program or any portion
+of it, thus forming a work based on the Program, and copy and
+distribute such modifications or work under the terms of Section 1
+above, provided that you also meet all of these conditions:
+
+ a) You must cause the modified files to carry prominent notices
+ stating that you changed the files and the date of any change.
+
+ b) You must cause any work that you distribute or publish, that in
+ whole or in part contains or is derived from the Program or any
+ part thereof, to be licensed as a whole at no charge to all third
+ parties under the terms of this License.
+
+ c) If the modified program normally reads commands interactively
+ when run, you must cause it, when started running for such
+ interactive use in the most ordinary way, to print or display an
+ announcement including an appropriate copyright notice and a
+ notice that there is no warranty (or else, saying that you provide
+ a warranty) and that users may redistribute the program under
+ these conditions, and telling the user how to view a copy of this
+ License. (Exception: if the Program itself is interactive but
+ does not normally print such an announcement, your work based on
+ the Program is not required to print an announcement.)
+\f
+These requirements apply to the modified work as a whole. If
+identifiable sections of that work are not derived from the Program,
+and can be reasonably considered independent and separate works in
+themselves, then this License, and its terms, do not apply to those
+sections when you distribute them as separate works. But when you
+distribute the same sections as part of a whole which is a work based
+on the Program, the distribution of the whole must be on the terms of
+this License, whose permissions for other licensees extend to the
+entire whole, and thus to each and every part regardless of who wrote it.
+
+Thus, it is not the intent of this section to claim rights or contest
+your rights to work written entirely by you; rather, the intent is to
+exercise the right to control the distribution of derivative or
+collective works based on the Program.
+
+In addition, mere aggregation of another work not based on the Program
+with the Program (or with a work based on the Program) on a volume of
+a storage or distribution medium does not bring the other work under
+the scope of this License.
+
+ 3. You may copy and distribute the Program (or a work based on it,
+under Section 2) in object code or executable form under the terms of
+Sections 1 and 2 above provided that you also do one of the following:
+
+ a) Accompany it with the complete corresponding machine-readable
+ source code, which must be distributed under the terms of Sections
+ 1 and 2 above on a medium customarily used for software interchange; or,
+
+ b) Accompany it with a written offer, valid for at least three
+ years, to give any third party, for a charge no more than your
+ cost of physically performing source distribution, a complete
+ machine-readable copy of the corresponding source code, to be
+ distributed under the terms of Sections 1 and 2 above on a medium
+ customarily used for software interchange; or,
+
+ c) Accompany it with the information you received as to the offer
+ to distribute corresponding source code. (This alternative is
+ allowed only for noncommercial distribution and only if you
+ received the program in object code or executable form with such
+ an offer, in accord with Subsection b above.)
+
+The source code for a work means the preferred form of the work for
+making modifications to it. For an executable work, complete source
+code means all the source code for all modules it contains, plus any
+associated interface definition files, plus the scripts used to
+control compilation and installation of the executable. However, as a
+special exception, the source code distributed need not include
+anything that is normally distributed (in either source or binary
+form) with the major components (compiler, kernel, and so on) of the
+operating system on which the executable runs, unless that component
+itself accompanies the executable.
+
+If distribution of executable or object code is made by offering
+access to copy from a designated place, then offering equivalent
+access to copy the source code from the same place counts as
+distribution of the source code, even though third parties are not
+compelled to copy the source along with the object code.
+\f
+ 4. You may not copy, modify, sublicense, or distribute the Program
+except as expressly provided under this License. Any attempt
+otherwise to copy, modify, sublicense or distribute the Program is
+void, and will automatically terminate your rights under this License.
+However, parties who have received copies, or rights, from you under
+this License will not have their licenses terminated so long as such
+parties remain in full compliance.
+
+ 5. You are not required to accept this License, since you have not
+signed it. However, nothing else grants you permission to modify or
+distribute the Program or its derivative works. These actions are
+prohibited by law if you do not accept this License. Therefore, by
+modifying or distributing the Program (or any work based on the
+Program), you indicate your acceptance of this License to do so, and
+all its terms and conditions for copying, distributing or modifying
+the Program or works based on it.
+
+ 6. Each time you redistribute the Program (or any work based on the
+Program), the recipient automatically receives a license from the
+original licensor to copy, distribute or modify the Program subject to
+these terms and conditions. You may not impose any further
+restrictions on the recipients' exercise of the rights granted herein.
+You are not responsible for enforcing compliance by third parties to
+this License.
+
+ 7. If, as a consequence of a court judgment or allegation of patent
+infringement or for any other reason (not limited to patent issues),
+conditions are imposed on you (whether by court order, agreement or
+otherwise) that contradict the conditions of this License, they do not
+excuse you from the conditions of this License. If you cannot
+distribute so as to satisfy simultaneously your obligations under this
+License and any other pertinent obligations, then as a consequence you
+may not distribute the Program at all. For example, if a patent
+license would not permit royalty-free redistribution of the Program by
+all those who receive copies directly or indirectly through you, then
+the only way you could satisfy both it and this License would be to
+refrain entirely from distribution of the Program.
+
+If any portion of this section is held invalid or unenforceable under
+any particular circumstance, the balance of the section is intended to
+apply and the section as a whole is intended to apply in other
+circumstances.
+
+It is not the purpose of this section to induce you to infringe any
+patents or other property right claims or to contest validity of any
+such claims; this section has the sole purpose of protecting the
+integrity of the free software distribution system, which is
+implemented by public license practices. Many people have made
+generous contributions to the wide range of software distributed
+through that system in reliance on consistent application of that
+system; it is up to the author/donor to decide if he or she is willing
+to distribute software through any other system and a licensee cannot
+impose that choice.
+
+This section is intended to make thoroughly clear what is believed to
+be a consequence of the rest of this License.
+\f
+ 8. If the distribution and/or use of the Program is restricted in
+certain countries either by patents or by copyrighted interfaces, the
+original copyright holder who places the Program under this License
+may add an explicit geographical distribution limitation excluding
+those countries, so that distribution is permitted only in or among
+countries not thus excluded. In such case, this License incorporates
+the limitation as if written in the body of this License.
+
+ 9. The Free Software Foundation may publish revised and/or new versions
+of the General Public License from time to time. Such new versions will
+be similar in spirit to the present version, but may differ in detail to
+address new problems or concerns.
+
+Each version is given a distinguishing version number. If the Program
+specifies a version number of this License which applies to it and "any
+later version", you have the option of following the terms and conditions
+either of that version or of any later version published by the Free
+Software Foundation. If the Program does not specify a version number of
+this License, you may choose any version ever published by the Free Software
+Foundation.
+
+ 10. If you wish to incorporate parts of the Program into other free
+programs whose distribution conditions are different, write to the author
+to ask for permission. For software which is copyrighted by the Free
+Software Foundation, write to the Free Software Foundation; we sometimes
+make exceptions for this. Our decision will be guided by the two goals
+of preserving the free status of all derivatives of our free software and
+of promoting the sharing and reuse of software generally.
+
+ NO WARRANTY
+
+ 11. BECAUSE THE PROGRAM IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
+FOR THE PROGRAM, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
+OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
+PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED
+OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE ENTIRE RISK AS
+TO THE QUALITY AND PERFORMANCE OF THE PROGRAM IS WITH YOU. SHOULD THE
+PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL NECESSARY SERVICING,
+REPAIR OR CORRECTION.
+
+ 12. IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
+WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
+REDISTRIBUTE THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES,
+INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING
+OUT OF THE USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED
+TO LOSS OF DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY
+YOU OR THIRD PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER
+PROGRAMS), EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE
+POSSIBILITY OF SUCH DAMAGES.
+
+ END OF TERMS AND CONDITIONS
+\f
+ How to Apply These Terms to Your New Programs
+
+ If you develop a new program, and you want it to be of the greatest
+possible use to the public, the best way to achieve this is to make it
+free software which everyone can redistribute and change under these terms.
+
+ To do so, attach the following notices to the program. It is safest
+to attach them to the start of each source file to most effectively
+convey the exclusion of warranty; and each file should have at least
+the "copyright" line and a pointer to where the full notice is found.
+
+
+ Copyright (C) 19yy
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+
+
+Also add information on how to contact you by electronic and paper mail.
+
+If the program is interactive, make it output a short notice like this
+when it starts in an interactive mode:
+
+ Gnomovision version 69, Copyright (C) 19yy name of author
+ Gnomovision comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
+ This is free software, and you are welcome to redistribute it
+ under certain conditions; type `show c' for details.
+
+The hypothetical commands `show w' and `show c' should show the appropriate
+parts of the General Public License. Of course, the commands you use may
+be called something other than `show w' and `show c'; they could even be
+mouse-clicks or menu items--whatever suits your program.
+
+You should also get your employer (if you work as a programmer) or your
+school, if any, to sign a "copyright disclaimer" for the program, if
+necessary. Here is a sample; alter the names:
+
+ Yoyodyne, Inc., hereby disclaims all copyright interest in the program
+ `Gnomovision' (which makes passes at compilers) written by James Hacker.
+
+ , 1 April 1989
+ Ty Coon, President of Vice
+
+This General Public License does not permit incorporating your program into
+proprietary programs. If your program is a subroutine library, you may
+consider it more useful to permit linking proprietary applications with the
+library. If this is what you want to do, use the GNU Library General
+Public License instead of this License.
--- /dev/null
+#!/usr/bin/perl
+#############################################################################
+#
+# DBMirror.pl
+# Contains the Database mirroring script.
+# This script queries the pending table off the database specified
+# (along with the associated schema) for updates that are pending on a
+# specific host. The database on that host is then updated with the changes.
+#
+#
+# (c) 2001-2002 Navtech Systems Support Inc.
+# Released under the GNU Public License version 2. See COPYING.
+#
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+##############################################################################
+# $Id: DBMirror.pl,v 1.1 2002/06/23 21:58:07 momjian Exp $
+#
+##############################################################################
+
+=head1 NAME
+
+DBMirror.pl - A Perl module to mirror database changes from a master database
+to a slave.
+
+=head1 SYNPOSIS
+
+
+DBMirror.pl slaveConfigfile.conf
+
+
+=head1 DESCRIPTION
+
+This Perl script will connect to the master database and query its pending
+table for a list of pending changes.
+
+The transactions of the original changes to the master will be preserved
+when sending things to the slave.
+
+=cut
+
+
+=head1 METHODS
+
+=over 4
+
+=cut
+
+
+BEGIN {
+ # add in a global path to files
+ # Pg should be included.
+}
+
+
+use strict;
+use Pg;
+use IO::Handle;
+sub mirrorCommand($$$$$$);
+sub mirrorInsert($$$$$);
+sub mirrorDelete($$$$$);
+sub mirrorUpdate($$$$$);
+sub sendQueryToSlaves($$);
+sub logErrorMessage($);
+sub openSlaveConnection($);
+sub updateMirrorHostTable($$);
+ sub extractData($$);
+local $::masterHost;
+local $::masterDb;
+local $::masterUser;
+local $::masterPassword;
+local $::errorThreshold=5;
+local $::errorEmailAddr=undef;
+
+my %slaveInfoHash;
+local $::slaveInfo = \%slaveInfoHash;
+
+my $lastErrorMsg;
+my $repeatErrorCount=0;
+
+my $lastXID;
+my $commandCount=0;
+
+my $masterConn;
+
+Main();
+
+sub Main() {
+
+#run the configuration file.
+ if ($#ARGV != 0) {
+ die "usage: DBMirror.pl configFile\n";
+ }
+ if( ! defined do $ARGV[0]) {
+ logErrorMessage("Invalid Configuration file $ARGV[0]");
+ die;
+ }
+
+
+ my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword";
+
+ $masterConn = Pg::connectdb($connectString);
+
+ unless($masterConn->status == PGRES_CONNECTION_OK) {
+ logErrorMessage("Can't connect to master database\n" .
+ $masterConn->errorMessage);
+ die;
+ }
+
+
+ my $firstTime = 1;
+ while(1) {
+ if($firstTime == 0) {
+ sleep 60;
+ }
+ $firstTime = 0;
+# Open up the connection to the slave.
+ if(! defined $::slaveInfo->{"status"} ||
+ $::slaveInfo->{"status"} == -1) {
+ openSlaveConnection($::slaveInfo);
+ }
+
+
+
+ sendQueryToSlaves(undef,"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+ sendQueryToSlaves(undef,"SET CONSTRAINTS ALL DEFERRED");
+
+
+ #Obtain a list of pending transactions using ordering by our approximation
+ #to the commit time. The commit time approximation is taken to be the
+ #SeqId of the last row edit in the transaction.
+ my $pendingTransQuery = "SELECT pd.\"XID\",MAX(\"SeqId\") FROM \"Pending\" pd";
+ $pendingTransQuery .= " LEFT JOIN \"MirroredTransaction\" mt INNER JOIN";
+ $pendingTransQuery .= " \"MirrorHost\" mh ON mt.\"MirrorHostId\" = ";
+ $pendingTransQuery .= " mh.\"MirrorHostId\" AND mh.\"HostName\"=";
+ $pendingTransQuery .= " '$::slaveInfo->{\"slaveHost\"}' ";
+ $pendingTransQuery .= " ON pd.\"XID\"";
+ $pendingTransQuery .= " = mt.\"XID\" WHERE mt.\"XID\" is null ";
+ $pendingTransQuery .= " GROUP BY pd.\"XID\" ";
+ $pendingTransQuery .= " ORDER BY MAX(pd.\"SeqId\")";
+
+
+ my $pendingTransResults = $masterConn->exec($pendingTransQuery);
+ unless($pendingTransResults->resultStatus==PGRES_TUPLES_OK) {
+ logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage);
+ die;
+ }
+
+ my $numPendingTrans = $pendingTransResults->ntuples;
+ my $curTransTuple = 0;
+
+
+ #
+ # This loop loops through each pending transaction in the proper order.
+ # The Pending row edits for that transaction will be queried from the
+ # master and sent + committed to the slaves.
+ while($curTransTuple < $numPendingTrans) {
+ my $XID = $pendingTransResults->getvalue($curTransTuple,0);
+ my $maxSeqId = $pendingTransResults->getvalue($curTransTuple,1);
+ my $seqId;
+
+ my $pendingQuery = "SELECT pnd.\"SeqId\",pnd.\"TableName\",";
+ $pendingQuery .= " pnd.\"Op\",pnddata.\"IsKey\", pnddata.\"Data\" AS \"Data\" ";
+ $pendingQuery .= " FROM \"Pending\" pnd, \"PendingData\" pnddata ";
+ $pendingQuery .= " WHERE pnd.\"SeqId\" = pnddata.\"SeqId\" AND ";
+
+ $pendingQuery .= " pnd.\"XID\"=$XID ORDER BY \"SeqId\", \"IsKey\" DESC";
+
+
+ my $pendingResults = $masterConn->exec($pendingQuery);
+ unless($pendingResults->resultStatus==PGRES_TUPLES_OK) {
+ logErrorMessage("Can't query pending table\n" . $masterConn->errorMessage);
+ die;
+ }
+
+
+
+ my $numPending = $pendingResults->ntuples;
+ my $curTuple = 0;
+ sendQueryToSlaves(undef,"BEGIN");
+ while ($curTuple < $numPending) {
+ $seqId = $pendingResults->getvalue($curTuple,0);
+ my $tableName = $pendingResults->getvalue($curTuple,1);
+ my $op = $pendingResults->getvalue($curTuple,2);
+
+ $curTuple = mirrorCommand($seqId,$tableName,$op,$XID,
+ $pendingResults,$curTuple) +1;
+ if($::slaveInfo->{"status"}==-1) {
+ last;
+ }
+
+ }
+ #Now commit the transaction.
+ if($::slaveInfo->{"status"}==-1) {
+ last;
+ }
+ sendQueryToSlaves(undef,"COMMIT");
+ updateMirrorHostTable($XID,$seqId);
+ if($commandCount > 5000) {
+ $commandCount = 0;
+ $::slaveInfo->{"status"} = -1;
+ $::slaveInfo->{"slaveConn"}->reset;
+ #Open the connection right away.
+ openSlaveConnection($::slaveInfo);
+
+ }
+
+ $pendingResults = undef;
+ $curTransTuple = $curTransTuple +1;
+ }#while transactions left.
+
+ $pendingTransResults = undef;
+
+ }#while(1)
+}#Main
+
+
+
+=item mirrorCommand(SeqId,tableName,op,transId,pendingResults,curTuple)
+
+Mirrors a single SQL Command(change to a single row) to the slave.
+
+=over 4
+
+=item * SeqId
+
+The id number of the change to mirror. This is the
+primary key of the pending table.
+
+
+=item * tableName
+
+The name of the table the transaction takes place on.
+
+=item * op
+
+The type of operation this transaction is. 'i' for insert, 'u' for update or
+'d' for delete.
+
+=item * transId
+
+The Transaction of of the Transaction that this command is part of.
+
+=item * pendingResults
+
+A Results set structure returned from Pg::execute that contains the
+join of the Pending and PendingData tables for all of the pending row
+edits in this transaction.
+
+=item * currentTuple
+
+
+The tuple(or row) number of the pendingRow for the command that is about
+to be edited. If the command is an update then this points to the row
+with IsKey equal to true. The next row, curTuple+1 is the contains the
+PendingData with IsKey false for the update.
+
+
+=item returns
+
+
+The tuple number of last tuple for this command. This might be equal to
+currentTuple or it might be larger (+1 in the case of an Update).
+
+
+=back
+
+=cut
+
+
+sub mirrorCommand($$$$$$) {
+ my $seqId = $_[0];
+ my $tableName = $_[1];
+ my $op = $_[2];
+ my $transId = $_[3];
+ my $pendingResults = $_[4];
+ my $currentTuple = $_[5];
+
+ if($op eq 'i') {
+ $currentTuple = mirrorInsert($seqId,$tableName,$transId,$pendingResults
+ ,$currentTuple);
+ }
+ if($op eq 'd') {
+ $currentTuple = mirrorDelete($seqId,$tableName,$transId,$pendingResults,
+ $currentTuple);
+ }
+ if($op eq 'u') {
+ $currentTuple = mirrorUpdate($seqId,$tableName,$transId,$pendingResults,
+ $currentTuple);
+ }
+ $commandCount = $commandCount +1;
+ if($commandCount % 100 == 0) {
+ # print "Sent 100 commmands on SeqId $seqId \n";
+ # flush STDOUT;
+ }
+ return $currentTuple
+ }
+
+
+=item mirrorInsert(transId,tableName,transId,pendingResults,currentTuple)
+
+Mirrors an INSERT operation to the slave database. A new row is placed
+in the slave database containing the primary key from pendingKeys along with
+the data fields contained in the row identified by sourceOid.
+
+=over 4
+
+=item * transId
+
+The sequence id of the INSERT operation being mirrored. This is the primary
+key of the pending table.
+
+=item * tableName
+
+
+The name of the table the transaction takes place on.
+
+=item * sourceOid
+
+The OID of the row in the master database for which this transaction effects.
+If the transaction is a delete then the operation is not valid.
+
+=item * transId
+
+The Transaction Id of transaction that this insert is part of.
+
+
+
+=item * pendingResults
+
+A Results set structure returned from Pg::execute that contains the
+join of the Pending and PendingData tables for all of the pending row
+edits in this transaction.
+
+=item * currentTuple
+
+
+The tuple(or row) number of the pendingRow for the command that is about
+to be edited. In the case of an insert this should point to the one
+row for the row edit.
+
+=item returns
+
+The tuple number of the last tuple for the row edit. This should be
+currentTuple.
+
+
+=back
+
+=cut
+
+
+sub mirrorInsert($$$$$) {
+ my $seqId = $_[0];
+ my $tableName = $_[1];
+ my $transId = $_[2];
+ my $pendingResults = $_[3];
+ my $currentTuple = $_[4];
+ my $counter;
+ my $column;
+
+ my $firstIteration=1;
+ my %recordValues = extractData($pendingResults,$currentTuple);
+
+
+ #Now build the insert query.
+ my $insertQuery = "INSERT INTO \"$tableName\" (";
+ my $valuesQuery = ") VALUES (";
+ foreach $column (keys (%recordValues)) {
+ if($firstIteration==0) {
+ $insertQuery .= " ,";
+ $valuesQuery .= " ,";
+ }
+ $insertQuery .= "\"$column\"";
+ if(defined $recordValues{$column}) {
+ my $quotedValue = $recordValues{$column};
+ $quotedValue =~ s/\\/\\\\/g;
+ $quotedValue =~ s/'/\\'/g;
+ $valuesQuery .= "'$quotedValue'";
+ }
+ else {
+ $valuesQuery .= "null";
+ }
+ $firstIteration=0;
+ }
+ $valuesQuery .= ")";
+ sendQueryToSlaves(undef,$insertQuery . $valuesQuery);
+ return $currentTuple;
+}
+
+=item mirrorDelete(SeqId,tableName,transId,pendingResult,currentTuple)
+
+Deletes a single row from the slave database. The row is identified by the
+primary key for the transaction in the pendingKeys table.
+
+=over 4
+
+=item * SeqId
+
+The Sequence id for this delete request.
+
+=item * tableName
+
+The name of the table to delete the row from.
+
+=item * transId
+
+The Transaction Id of the transaction that this command is part of.
+
+
+
+=item * pendingResults
+
+A Results set structure returned from Pg::execute that contains the
+join of the Pending and PendingData tables for all of the pending row
+edits in this transaction.
+
+=item * currentTuple
+
+
+The tuple(or row) number of the pendingRow for the command that is about
+to be edited. In the case of a delete this should point to the one
+row for the row edit.
+
+=item returns
+
+The tuple number of the last tuple for the row edit. This should be
+currentTuple.
+
+
+=back
+
+=cut
+
+
+sub mirrorDelete($$$$$) {
+ my $seqId = $_[0];
+ my $tableName = $_[1];
+ my $transId = $_[2];
+ my $pendingResult = $_[3];
+ my $currentTuple = $_[4];
+ my %dataHash;
+ my $currentField;
+ my $firstField=1;
+ %dataHash = extractData($pendingResult,$currentTuple);
+
+ my $counter=0;
+ my $deleteQuery = "DELETE FROM \"$tableName\" WHERE ";
+ foreach $currentField (keys %dataHash) {
+ if($firstField==0) {
+ $deleteQuery .= " AND ";
+ }
+ my $currentValue = $dataHash{$currentField};
+ $deleteQuery .= "\"";
+ $deleteQuery .= $currentField;
+ if(defined $currentValue) {
+ $deleteQuery .= "\"='";
+ $deleteQuery .= $currentValue;
+ $deleteQuery .= "'";
+ }
+ else {
+ $deleteQuery .= " is null ";
+ }
+ $counter++;
+ $firstField=0;
+ }
+
+ sendQueryToSlaves($transId,$deleteQuery);
+ return $currentTuple;
+}
+
+
+=item mirrorUpdate(seqId,tableName,transId,pendingResult,currentTuple)
+
+Mirrors over an edit request to a single row of the database.
+The primary key from before the edit is used to determine which row in the
+slave should be changed.
+
+After the edit takes place on the slave its primary key will match the primary
+key the master had immediatly following the edit. All other fields will be set
+to the current values.
+
+Data integrity is maintained because the mirroring is performed in an
+SQL transcation so either all pending changes are made or none are.
+
+=over 4
+
+=item * seqId
+
+The Sequence id of the update.
+
+=item * tableName
+
+The name of the table to perform the update on.
+
+=item * transId
+
+The transaction Id for the transaction that this command is part of.
+
+
+=item * pendingResults
+
+A Results set structure returned from Pg::execute that contains the
+join of the Pending and PendingData tables for all of the pending row
+edits in this transaction.
+
+=item * currentTuple
+
+
+The tuple(or row) number of the pendingRow for the command that is about
+to be edited. In the case of a delete this should point to the one
+row for the row edit.
+
+=item returns
+
+The tuple number of the last tuple for the row edit. This should be
+currentTuple +1. Which points to the non key row of the update.
+
+
+=back
+
+=cut
+
+sub mirrorUpdate($$$$$) {
+ my $seqId = $_[0];
+ my $tableName = $_[1];
+ my $transId = $_[2];
+ my $pendingResult = $_[3];
+ my $currentTuple = $_[4];
+
+ my $counter;
+ my $quotedValue;
+ my $updateQuery = "UPDATE \"$tableName\" SET ";
+ my $currentField;
+
+
+
+ my %keyValueHash;
+ my %dataValueHash;
+ my $firstIteration=1;
+
+ #Extract the Key values. This row contains the values of the
+ # key fields before the update occours(the WHERE clause)
+ %keyValueHash = extractData($pendingResult,$currentTuple);
+
+
+ #Extract the data values. This is a SET clause that contains
+ #values for the entire row AFTER the update.
+ %dataValueHash = extractData($pendingResult,$currentTuple+1);
+
+ $firstIteration=1;
+ foreach $currentField (keys (%dataValueHash)) {
+ if($firstIteration==0) {
+ $updateQuery .= ", ";
+ }
+ $updateQuery .= " \"$currentField\"=";
+ my $currentValue = $dataValueHash{$currentField};
+ if(defined $currentValue ) {
+ $quotedValue = $currentValue;
+ $quotedValue =~ s/\\/\\\\/g;
+ $quotedValue =~ s/'/\\'/g;
+ $updateQuery .= "'$quotedValue'";
+ }
+ else {
+ $updateQuery .= "null ";
+ }
+ $firstIteration=0;
+ }
+
+
+ $updateQuery .= " WHERE ";
+ $firstIteration=1;
+ foreach $currentField (keys (%keyValueHash)) {
+ my $currentValue;
+ if($firstIteration==0) {
+ $updateQuery .= " AND ";
+ }
+ $updateQuery .= "\"$currentField\"=";
+ $currentValue = $keyValueHash{$currentField};
+ if(defined $currentValue) {
+ $quotedValue = $currentValue;
+ $quotedValue =~ s/\\/\\\\/g;
+ $quotedValue =~ s/'/\\'/g;
+ $updateQuery .= "'$quotedValue'";
+ }
+ else {
+ $updateQuery .= " null ";
+ }
+ $firstIteration=0;
+ }
+
+ sendQueryToSlaves($transId,$updateQuery);
+ return $currentTuple+1;
+}
+
+
+
+=item sendQueryToSlaves(seqId,sqlQuery)
+
+Sends an SQL query to the slave.
+
+
+=over 4
+
+=item * seqId
+
+The sequence Id of the command being sent. Undef if no command is associated
+with the query being sent.
+
+=item * sqlQuery
+
+
+SQL operation to perform on the slave.
+
+=back
+
+=cut
+
+sub sendQueryToSlaves($$) {
+ my $seqId = $_[0];
+ my $sqlQuery = $_[1];
+
+ if($::slaveInfo->{"status"} == 0) {
+ my $queryResult = $::slaveInfo->{"slaveConn"}->exec($sqlQuery);
+ unless($queryResult->resultStatus == PGRES_COMMAND_OK) {
+ my $errorMessage;
+ $errorMessage = "Error sending query $seqId to " ;
+ $errorMessage .= $::slaveInfo->{"slaveHost"};
+ $errorMessage .=$::slaveInfo->{"slaveConn"}->errorMessage;
+ $errorMessage .= "\n" . $sqlQuery;
+ logErrorMessage($errorMessage);
+ $::slaveInfo->{"slaveConn"}->exec("ROLLBACK");
+ $::slaveInfo->{"status"} = -1;
+ }
+ }
+
+}
+
+
+=item logErrorMessage(error)
+
+Mails an error message to the users specified $errorEmailAddr
+The error message is also printed to STDERR.
+
+=over 4
+
+=item * error
+
+The error message to log.
+
+=back
+
+=cut
+
+sub logErrorMessage($) {
+ my $error = $_[0];
+
+ if(defined $lastErrorMsg and $error eq $lastErrorMsg) {
+ if($repeatErrorCount<$::errorThreshold) {
+ $repeatErrorCount++;
+ warn($error);
+ return;
+ }
+
+ }
+ $repeatErrorCount=0;
+ if(defined $::errorEmailAddr) {
+ my $mailPipe;
+ open (mailPipe, "|/bin/mail -s DBMirror.pl $::errorEmailAddr");
+ print mailPipe "=====================================================\n";
+ print mailPipe " DBMirror.pl \n";
+ print mailPipe "\n";
+ print mailPipe " The DBMirror.pl script has encountred an error. \n";
+ print mailPipe " It might indicate that either the master database has\n";
+ print mailPipe " gone down or that the connection to a slave database can\n";
+ print mailPipe " not be made. \n";
+ print mailPipe " Process-Id: $$ on $::masterHost database $::masterDb\n";
+ print mailPipe "\n";
+ print mailPipe $error;
+ print mailPipe "\n\n\n=================================================\n";
+ close mailPipe;
+ }
+ warn($error);
+
+ $lastErrorMsg = $error;
+
+}
+
+sub openSlaveConnection($) {
+ my $slavePtr = $_[0];
+ my $slaveConn;
+
+
+ my $slaveConnString = "host=" . $slavePtr->{"slaveHost"};
+ $slaveConnString .= " dbname=" . $slavePtr->{"slaveDb"};
+ $slaveConnString .= " user=" . $slavePtr->{"slaveUser"};
+ $slaveConnString .= " password=" . $slavePtr->{"slavePassword"};
+
+ $slaveConn = Pg::connectdb($slaveConnString);
+
+ if($slaveConn->status !=PGRES_CONNECTION_OK) {
+ my $errorMessage = "Can't connect to slave database " ;
+ $errorMessage .= $slavePtr->{"slaveHost"} . "\n";
+ $errorMessage .= $slaveConn->errorMessage;
+ logErrorMessage($errorMessage);
+ $slavePtr->{"status"} = -1;
+ }
+ else {
+ $slavePtr->{"slaveConn"} = $slaveConn;
+ $slavePtr->{"status"} = 0;
+ #Determine the MirrorHostId for the slave from the master's database
+ my $resultSet = $masterConn->exec('SELECT "MirrorHostId" FROM '
+ . ' "MirrorHost" WHERE "HostName"'
+ . '=\'' . $slavePtr->{"slaveHost"}
+ . '\'');
+ if($resultSet->ntuples !=1) {
+ my $errorMessage .= $slavePtr->{"slaveHost"} ."\n";
+ $errorMessage .= "Has no MirrorHost entry on master\n";
+ logErrorMessage($errorMessage);
+ $slavePtr->{"status"}=-1;
+ return;
+
+ }
+ $slavePtr->{"MirrorHostId"} = $resultSet->getvalue(0,0);
+
+
+
+ }
+
+}
+
+
+=item updateMirrorHostTable(lastTransId,lastSeqId)
+
+Updates the MirroredTransaction table to reflect the fact that
+this transaction has been sent to the current slave.
+
+=over 4
+
+=item * lastTransId
+
+The Transaction id for the last transaction that has been succesfully mirrored to
+the currently open slaves.
+
+=item * lastSeqId
+
+The Sequence Id of the last command that has been succefully mirrored
+
+
+=back
+
+
+=cut
+
+sub updateMirrorHostTable($$) {
+ my $lastTransId = shift;
+ my $lastSeqId = shift;
+
+ if($::slaveInfo->{"status"}==0) {
+ my $deleteTransactionQuery;
+ my $deleteResult;
+ my $updateMasterQuery = "INSERT INTO \"MirroredTransaction\" ";
+ $updateMasterQuery .= " (\"XID\",\"LastSeqId\",\"MirrorHostId\")";
+ $updateMasterQuery .= " VALUES ($lastTransId,$lastSeqId,$::slaveInfo->{\"MirrorHostId\"}) ";
+
+ my $updateResult = $masterConn->exec($updateMasterQuery);
+ unless($updateResult->resultStatus == PGRES_COMMAND_OK) {
+ my $errorMessage = $masterConn->errorMessage . "\n";
+ $errorMessage .= $updateMasterQuery;
+ logErrorMessage($errorMessage);
+ die;
+ }
+# print "Updated slaves to transaction $lastTransId\n" ;
+# flush STDOUT;
+
+ #If this transaction has now been mirrored to all mirror hosts
+ #then it can be deleted.
+ $deleteTransactionQuery = 'DELETE FROM "Pending" WHERE "XID"='
+ . $lastTransId . ' AND (SELECT COUNT(*) FROM "MirroredTransaction"'
+ . ' WHERE "XID"=' . $lastTransId . ')=(SELECT COUNT(*) FROM'
+ . ' "MirrorHost")';
+
+ $deleteResult = $masterConn->exec($deleteTransactionQuery);
+ if($deleteResult->resultStatus!=PGRES_COMMAND_OK) {
+ logErrorMessage($masterConn->errorMessage . "\n" .
+ $deleteTransactionQuery);
+ die;
+ }
+
+ }
+
+}
+
+
+sub extractData($$) {
+ my $pendingResult = $_[0];
+ my $currentTuple = $_[1];
+ my $fnumber;
+ my %valuesHash;
+ $fnumber = 4;
+ my $dataField = $pendingResult->getvalue($currentTuple,$fnumber);
+
+ while(length($dataField)>0) {
+ # Extract the field name that is surronded by double quotes
+ $dataField =~ m/(\".*?\")/s;
+ my $fieldName = $1;
+ $dataField = substr $dataField ,length($fieldName);
+ $fieldName =~ s/\"//g; #Remove the surronding " signs.
+
+ if($dataField =~ m/(^= )/s) {
+ #Matched null
+ $dataField = substr $dataField , length($1);
+ $valuesHash{$fieldName}=undef;
+ }
+ elsif ($dataField =~ m/(^=\')/s) {
+ #Has data.
+ my $value;
+ $dataField = substr $dataField ,2; #Skip the ='
+ LOOP: { #This is to allow us to use last from a do loop.
+ #Recommended in perlsyn manpage.
+ do {
+ my $matchString;
+ #Find the substring ending with the first ' or first \
+ $dataField =~ m/(.*?[\'\\])?/s;
+ $matchString = $1;
+ $value .= substr $matchString,0,length($matchString)-1;
+
+ if($matchString =~ m/(\'$)/s) {
+ # $1 runs to the end of the field value.
+ $dataField = substr $dataField,length($matchString)+1;
+ last;
+
+ }
+ else {
+ #deal with the escape character.
+ #It The character following the escape gets appended.
+ $dataField = substr $dataField,length($matchString);
+ $dataField =~ s/(^.)//s;
+ $value .= $1;
+
+
+
+ }
+
+
+ } until(length($dataField)==0);
+ }
+ $valuesHash{$fieldName} = $value;
+
+
+ }#else if
+ else {
+
+ logErrorMessage "Error in PendingData Sequence Id " .
+ $pendingResult->getvalue($currentTuple,0);
+ die;
+ }
+
+
+
+ } #while
+ return %valuesHash;
+
+}
--- /dev/null
+# $Header: /cvsroot/pgsql/contrib/dbmirror/Attic/Makefile,v 1.1 2002/06/23 21:58:07 momjian Exp $
+
+subdir = contrib/dbmirror
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+
+MODULES = pending
+DOCS = README.dbmirror
+
+include $(top_srcdir)/contrib/contrib-global.mk
--- /dev/null
+
+CREATE FUNCTION "recordchange" () RETURNS opaque AS
+'/usr/local/pgsql/lib/pending.so', 'recordchange' LANGUAGE 'C';
+
+CREATE TABLE "MirrorHost" (
+"MirrorHostId" serial,
+"HostName" varchar NOT NULL
+);
+
+
+
+
+
+CREATE TABLE "Pending" (
+"SeqId" serial,
+"TableName" varchar NOT NULL,
+"Op" character,
+"XID" int4 NOT NULL,
+PRIMARY KEY ("SeqId")
+
+);
+
+CREATE INDEX "Pending_XID_Index" ON "Pending" ("XID");
+
+CREATE TABLE "PendingData" (
+"SeqId" int4 NOT NULL,
+"IsKey" bool NOT NULL,
+"Data" varchar,
+PRIMARY KEY ("SeqId", "IsKey") ,
+FOREIGN KEY ("SeqId") REFERENCES "Pending" ("SeqId") ON UPDATE CASCADE ON DELETE CASCADE
+);
+
+
+CREATE TABLE "MirroredTransaction" (
+"XID" int4 NOT NULL,
+"LastSeqId" int4 NOT NULL,
+"MirrorHostId" int4 NOT NULL,
+PRIMARY KEY ("XID","MirrorHostId"),
+FOREIGN KEY ("MirrorHostId") REFERENCES "MirrorHost" ("MirrorHostId") ON UPDATE CASCADE ON DELETE CASCADE,
+FOREIGN KEY ("LastSeqId") REFERENCES "Pending" ("SeqId") ON UPDATE
+CASCADE ON DELETE CASCADE
+);
--- /dev/null
+DBMirror - Postgres Database Mirroring
+===================================================
+
+
+DBMirror is a database mirroring system developed for the Postgres
+
+
+(c) 2001-2002 Navtech Systems Support Inc.
+Released under the GNU Public License version 2. See COPYING.
+
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+
+
+Overrview
+--------------------------------------------------------------------
+
+The mirroring system is trigger based and provides the following key features:
+
+-Support for multiple mirror slaves
+-Transactions are maintained
+-Per table selection of what gets mirrored.
+
+
+The system is based on the idea that a master database exist where all
+edits are made to the tables being mirrored. A trigger attatched to the
+tables being mirrored runs logging information about the edit to
+the Pending table and PendingData table.
+
+A perl script(DBMirror.pl) runs continiously for each slave database(A database
+that the change is supposed to be mirrored to) examining the Pending
+table; searching for transactions that need to be sent to that particular slave
+database. Those transactions are then mirrored to the slave database and
+the MirroredTransaction table is updated to reflect that the transaction has
+been sent.
+
+If the transaction has been sent to all know slave hosts (All entries
+in the MirrorHost table) then all records of it are purged from the
+Pending tables.
+
+Installation Instructions
+------------------------------------------------------------------------
+
+1) Compile pending.c
+
+The file pending.c contains the recordchange trigger. This runs every
+time a row inside of a table being mirrored changes.
+
+
+To build the trigger run make on the "Makefile" in the DBMirror directory.
+
+The Makefile supplied assumes that the postgres include files are in
+/usr/local/pgsql/include/server.
+
+Postgres-7.1.x installations should change this to
+/usr/local/pgsql/include (The server part is for 7.2+)
+
+If you have installed the postgres include files to another location then
+modify the Makefile to reflect this.
+
+The trigger requires that all postgres headers be installed, this is
+accomplished in postgresql(7.1 or 7.2) by running "make install-all-headers"
+in the postgres source directory.
+
+The Makefile should create a file named pending.so that contains the trigger.
+
+Install this file in /usr/local/pgsql/lib (or another suitable location).
+
+If you choose a different location the MirrorSetup.sql script will need
+to be modified to reflect your new location. The CREATE FUNCTION command
+in the MirrorSetup.sql script associates the trigger function with the
+pending.so shared library. Modify the arguments to this command if you
+choose to install the trigger elsewhere.
+
+2) Run MirroSetup.sql
+
+This file contains SQL commands to setup the Mirroring environment.
+This includes
+
+-Telling Postgres about the "recordchange" trigger function.
+-Creating the Pending,PendingData, MirrorHost, MirroredTransaction tables
+
+
+To execute the script use psql as follows
+
+"psql -f MirrorSetup.sql MyDatabaseName"
+
+where MyDatabaseName is the name of the database you wish to install mirroring
+on(Your master).
+
+
+3) Create slaveDatabase.conf files.
+
+Each slave database needs its own configuration file for the
+DBMirror.pl script. See slaveDatabase.conf for a sample.
+
+The master settings refer to the master database(The one that is
+being mirrored).
+
+The slave settings refer to the database that the data is being mirrored to.
+The slaveHost parameter must refer to the machine name of the slave (Either
+a resolvable hostname or an IP address). The value for slave host
+must match the Hostname field in the MirrorHost table(See step 6).
+
+The master user must have sufficient permissions to modify the Pending
+tables and to read all of the tables being mirrored.
+
+The slave user must have enough permissions on the slave database to
+modify(INSERT,UPDATE,DELETE) any tables on the slave system that are being
+mirrored.
+
+4) Add the trigger to tables.
+
+Execute the SQL code in AddTrigger.sql once for each table that should
+be mirrored. Replace MyTableName with the name of the table that should
+be mirrored.
+
+5) Create the slave database.
+
+The DBMirror system keeps the contents of mirrored tables identical on the
+master and slave databases. When you first install the mirror triggers the
+master and slave databases must be the same.
+
+If you are starting with an empty master database then the slave should
+be empty as well. Otherwise use pg_dump to ensure that the slave database
+tables are initially identical to the master.
+
+6) Add entries in the MirrorHost table.
+
+Each slave database must have an entry in the MirrorHost table.
+
+The name of the host in the MirrorHost table must exactly match the
+slaveHost variable for that slave in the configuration file.
+
+For example
+INSERT INTO "MirrorHost" ("HostName") VALUES ('mySlaveMachine.mycompany.com');
+
+
+6) Start DBMirror.pl
+
+
+DBMirror.pl is the perl script that handles the mirroring.
+
+It requires the Perl library Pg(See src/interfaces/perl5 in the postgres
+source distribution).
+
+It takes its configuration file as an argument(The one from step 3)
+One instance of DBMirror.pl runs for each slave machine that is receiving
+mirrored data.
+
+Any errors are printed to standard out and emailed to the address specified in
+the configuration file.
+
+DBMirror can be run from the master, the slave, or a third machine as long
+as it is able to access both the master and slave databases.
+
+7) Periodically run clean_pending.pl
+clean_pending.pl cleans out any entries from the Pending tables that
+have already been mirrored to all hosts in the MirrorHost table.
+It uses the same configuration file as DBMirror.pl.
+
+Normally DBMirror.pl will clean these tables as it goes but in some
+circumstances this will not happen.
+
+For example if a transaction has been mirrored to all slaves except for
+one, then that host is removed from the MirrorHost table(It stops being
+a mirror slave) the transactions that had already been mirrored to
+all the other hosts will not be deleted from the Pending tables by
+DBMirror.pl since DBMirror.pl will run against these transactions again
+since they have already been sent to all the other hosts.
+
+clean_pending.pl will remove these transactions.
+
+TODO(Current Limitations)
+----------
+-Support for selective mirroring based on the content of data.
+-Support for BLOB's.
+-Support for conflict resolution.
+-Batching SQL commands in DBMirror for better performance over WAN's.
+-Better support for dealing with Schema changes.
+
+Tested Platforms:
+------------------
+
+DBMirror has been tested on the following configurations but should
+work on any platform with Postgres >= 7.1 and Perl 5.6.
+
+RedHat Linux 7.1 & 6.2
+ -Postgres 7.1.2
+ -Perl 5.6
+
+Mandrake Linux 8.0(Limited Testing)
+ -Postgres 7.2
+ -Perl 5.6
+
+
+Steven Singer
+Navtech Systems Support Inc.
--- /dev/null
+#!/usr/bin/perl
+# clean_pending.pl
+# This perl script removes entries from the pending,pendingKeys,
+# pendingDeleteData tables that have already been mirrored to all hosts.
+#
+#
+#
+# (c) 2001-2002 Navtech Systems Support Inc.
+# Released under the GNU Public License version 2. See COPYING.
+#
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+##############################################################################
+# $Id: clean_pending.pl,v 1.1 2002/06/23 21:58:08 momjian Exp $
+##############################################################################
+
+
+
+=head1 NAME
+
+clean_pending.pl - A Perl script to remove old entries from the
+pending, pendingKeys, and pendingDeleteData tables.
+
+
+=head1 SYNPOSIS
+
+
+clean_pending.pl databasename
+
+
+=head1 DESCRIPTION
+
+
+This Perl script connects to the database specified as a command line argument
+on the local system. It uses a hard-coded username and password.
+It then removes any entries from the pending, pendingDeleteData, and
+pendingKeys tables that have already been sent to all hosts in mirrorHosts.
+
+
+=cut
+
+BEGIN {
+ # add in a global path to files
+ #Ensure that Pg is in the path.
+}
+
+
+use strict;
+use Pg;
+if ($#ARGV != 0) {
+ die "usage: clean_pending.pl configFile\n";
+}
+
+if( ! defined do $ARGV[0]) {
+ die("Invalid Configuration file $ARGV[0]");
+}
+
+#connect to the database.
+
+my $connectString = "host=$::masterHost dbname=$::masterDb user=$::masterUser password=$::masterPassword";
+
+my $dbConn = Pg::connectdb($connectString);
+unless($dbConn->status == PGRES_CONNECTION_OK) {
+ printf("Can't connect to database\n");
+ die;
+}
+my $result = $dbConn->exec("BEGIN");
+unless($result->resultStatus == PGRES_COMMAND_OK) {
+ die $dbConn->errorMessage;
+}
+
+
+#delete all transactions that have been sent to all mirrorhosts
+#or delete everything if no mirror hosts are defined.
+# Postgres takes the "SELECT COUNT(*) FROM "MirrorHost" and makes it into
+# an InitPlan. EXPLAIN show's this.
+my $deletePendingQuery = 'DELETE FROM "Pending" WHERE (SELECT ';
+$deletePendingQuery .= ' COUNT(*) FROM "MirroredTransaction" WHERE ';
+$deletePendingQuery .= ' "XID"="Pending"."XID") = (SELECT COUNT(*) FROM ';
+$deletePendingQuery .= ' "MirrorHost") OR (SELECT COUNT(*) FROM ';
+$deletePendingQuery .= ' "MirrorHost") = 0';
+
+my $result = $dbConn->exec($deletePendingQuery);
+unless ($result->resultStatus == PGRES_COMMAND_OK ) {
+ printf($dbConn->errorMessage);
+ die;
+}
+$dbConn->exec("COMMIT");
+$result = $dbConn->exec('VACUUM "Pending"');
+unless ($result->resultStatus == PGRES_COMMAND_OK) {
+ printf($dbConn->errorMessage);
+}
+$result = $dbConn->exec('VACUUM "PendingData"');
+unless($result->resultStatus == PGRES_COMMAND_OK) {
+ printf($dbConn->errorMessage);
+}
+$result = $dbConn->exec('VACUUM "MirroredTransaction"');
+unless($result->resultStatus == PGRES_COMMAND_OK) {
+ printf($dbConn->errorMessage);
+}
+
--- /dev/null
+/****************************************************************************
+ * pending.c
+ * $Id: pending.c,v 1.1 2002/06/23 21:58:08 momjian Exp $
+ *
+ * This file contains a trigger for Postgresql-7.x to record changes to tables
+ * to a pending table for mirroring.
+ * All tables that should be mirrored should have this trigger hooked up to it.
+ *
+ * (c) 2001-2002 Navtech Systems Support Inc.
+ * Released under the GNU Public License version 2. See COPYING.
+ *
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ *
+ ***************************************************************************/
+#include
+#include
+
+enum FieldUsage {PRIMARY=0,NONPRIMARY,ALL,NUM_FIELDUSAGE};
+
+int storePending(char * cpTableName, HeapTuple tBeforeTuple,
+ HeapTuple tAfterTuple,
+ TupleDesc tTupdesc,
+ TriggerData * tpTrigdata,char cOp);
+
+int storeKeyInfo(char * cpTableName, HeapTuple tTupleData, TupleDesc tTuplDesc,
+ TriggerData * tpTrigdata);
+int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc,
+ TriggerData * tpTrigData,int iIncludeKeyData);
+
+int2vector * getPrimaryKey(Oid tblOid);
+
+char * packageData(HeapTuple tTupleData, TupleDesc tTupleDecs,
+ TriggerData * tTrigData,
+ enum FieldUsage eKeyUsage );
+
+#define BUFFER_SIZE 256
+#define MAX_OID_LEN 10
+
+
+extern Datum recordchange(PG_FUNCTION_ARGS);
+PG_FUNCTION_INFO_V1(recordchange);
+
+
+/*****************************************************************************
+ * The entry point for the trigger function.
+ * The Trigger takes a single SQL 'text' argument indicating the name of the
+ * table the trigger was applied to. If this name is incorrect so will the
+ * mirroring.
+ ****************************************************************************/
+Datum recordchange(PG_FUNCTION_ARGS) {
+ TriggerData * trigdata;
+ TupleDesc tupdesc;
+ HeapTuple beforeTuple=NULL;
+ HeapTuple afterTuple=NULL;
+ HeapTuple retTuple=NULL;
+ char * tblname;
+ char op;
+ if(fcinfo->context!=NULL) {
+
+ if(SPI_connect() < 0) {
+ elog(NOTICE,"storePending could not connect to SPI");
+ return -1;
+ }
+ trigdata = (TriggerData*)fcinfo->context;
+ /* Extract the table name */
+ tblname = SPI_getrelname(trigdata->tg_relation);
+ tupdesc = trigdata->tg_relation->rd_att;
+ if(TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) {
+ retTuple = trigdata->tg_newtuple;
+ beforeTuple = trigdata->tg_trigtuple;
+ afterTuple = trigdata->tg_newtuple;
+ op='u';
+
+ }
+ else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) {
+ retTuple = trigdata->tg_trigtuple;
+ afterTuple = trigdata->tg_trigtuple;
+ op = 'i';
+ }
+ else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event)) {
+ retTuple = trigdata->tg_trigtuple;
+ beforeTuple = trigdata->tg_trigtuple;
+ op = 'd';
+ }
+
+ if(storePending(tblname,beforeTuple,afterTuple,tupdesc,trigdata,op)) {
+ /* An error occoured. Skip the operation. */
+ elog(ERROR,"Operation could not be mirrored");
+ return PointerGetDatum(NULL);
+
+ }
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,"Returning on success");
+#endif
+ SPI_finish();
+ return PointerGetDatum(retTuple);
+ }
+ else {
+ /*
+ * Not being called as a trigger.
+ */
+ return PointerGetDatum(NULL);
+ }
+}
+
+
+/*****************************************************************************
+ * Constructs and executes an SQL query to write a record of this tuple change
+ * to the pending table.
+ *****************************************************************************/
+int storePending(char * cpTableName, HeapTuple tBeforeTuple,
+ HeapTuple tAfterTuple,
+ TupleDesc tTupDesc,
+ TriggerData * tpTrigData,char cOp) {
+ char * cpQueryBase = "INSERT INTO \"Pending\" (\"TableName\",\"Op\",\"XID\") VALUES ($1,$2,$3)";
+
+ int iResult=0;
+ HeapTuple tCurTuple; // Points the current tuple(before or after)
+ Datum saPlanData[4];
+ Oid taPlanArgTypes[3] = {NAMEOID,CHAROID,INT4OID};
+ void * vpPlan;
+
+ tCurTuple = tBeforeTuple ? tBeforeTuple : tAfterTuple;
+
+
+
+
+ vpPlan = SPI_prepare(cpQueryBase,3,taPlanArgTypes);
+ if(vpPlan==NULL) {
+ elog(NOTICE,"Error creating plan");
+ }
+ // SPI_saveplan(vpPlan);
+
+ saPlanData[0] = PointerGetDatum(cpTableName);
+ saPlanData[1] = CharGetDatum(cOp);
+ saPlanData[2] = Int32GetDatum(GetCurrentTransactionId());
+
+
+ iResult = SPI_execp(vpPlan,saPlanData,NULL,1);
+ if(iResult < 0) {
+ elog(NOTICE,"storedPending fired (%s) returned %d",cpQueryBase,iResult);
+ }
+
+
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,"row successfully stored in pending table");
+#endif
+
+ if(cOp=='d') {
+ /**
+ * This is a record of a delete operation.
+ * Just store the key data.
+ */
+ iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData);
+ }
+ else if (cOp=='i') {
+ /**
+ * An Insert operation.
+ * Store all data
+ */
+ iResult = storeData(cpTableName,tAfterTuple,tTupDesc,tpTrigData,TRUE);
+
+ }
+ else {
+ /* op must be an update. */
+ iResult = storeKeyInfo(cpTableName,tBeforeTuple,tTupDesc,tpTrigData);
+ iResult = iResult ? iResult : storeData(cpTableName,tAfterTuple,tTupDesc,
+ tpTrigData,TRUE);
+ }
+
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,"DOne storing keyinfo");
+#endif
+
+ return iResult;
+
+}
+
+int storeKeyInfo(char * cpTableName, HeapTuple tTupleData,
+ TupleDesc tTupleDesc,
+ TriggerData * tpTrigData) {
+
+ Oid saPlanArgTypes[1] = {NAMEOID};
+ char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'t',$1)";
+ void * pplan;
+ Datum saPlanData[1];
+ char * cpKeyData;
+ int iRetCode;
+
+ pplan = SPI_prepare(insQuery,1,saPlanArgTypes);
+ if(pplan==NULL) {
+ elog(NOTICE,"Could not prepare INSERT plan");
+ return -1;
+ }
+
+ // pplan = SPI_saveplan(pplan);
+ cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,PRIMARY);
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,cpKeyData);
+#endif
+ saPlanData[0] = PointerGetDatum(cpKeyData);
+
+ iRetCode = SPI_execp(pplan,saPlanData,NULL,1);
+
+ if(cpKeyData!=NULL) {
+ SPI_pfree(cpKeyData);
+ }
+
+ if(iRetCode != SPI_OK_INSERT ) {
+ elog(NOTICE,"Error inserting row in pendingDelete");
+ return -1;
+ }
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,"INSERT SUCCESFULL");
+#endif
+
+ return 0;
+
+}
+
+
+
+
+int2vector * getPrimaryKey(Oid tblOid) {
+ char * queryBase;
+ char * query;
+ bool isNull;
+ int2vector * resultKey;
+ int2vector * tpResultKey;
+ HeapTuple resTuple;
+ Datum resDatum;
+ int ret;
+ queryBase = "SELECT indkey FROM pg_index WHERE indisprimary='t' AND indrelid=";
+ query = SPI_palloc(strlen(queryBase) + MAX_OID_LEN+1);
+ sprintf(query,"%s%d",queryBase,tblOid);
+ ret = SPI_exec(query,1);
+ if(ret != SPI_OK_SELECT || SPI_processed != 1 ) {
+ elog(NOTICE,"Could not select primary index key");
+ return NULL;
+ }
+
+ resTuple = SPI_tuptable->vals[0];
+ resDatum = SPI_getbinval(resTuple,SPI_tuptable->tupdesc,1,&isNull);
+
+ tpResultKey = (int2vector*) DatumGetPointer(resDatum);
+ resultKey = SPI_palloc(sizeof(int2vector));
+ memcpy(resultKey,tpResultKey,sizeof(int2vector));
+
+ SPI_pfree(query);
+ return resultKey;
+}
+
+/******************************************************************************
+ * Stores a copy of the non-key data for the row.
+ *****************************************************************************/
+int storeData(char * cpTableName,HeapTuple tTupleData,TupleDesc tTupleDesc,
+ TriggerData * tpTrigData,int iIncludeKeyData) {
+
+ Oid planArgTypes[1] = {NAMEOID};
+ char * insQuery = "INSERT INTO \"PendingData\" (\"SeqId\",\"IsKey\",\"Data\") VALUES(currval('\"Pending_SeqId_seq\"'),'f',$1)";
+ void * pplan;
+ Datum planData[1];
+ char * cpKeyData;
+ int iRetValue;
+
+ pplan = SPI_prepare(insQuery,1,planArgTypes);
+ if(pplan==NULL) {
+ elog(NOTICE,"Could not prepare INSERT plan");
+ return -1;
+ }
+
+ // pplan = SPI_saveplan(pplan);
+ if(iIncludeKeyData==0) {
+ cpKeyData = packageData(tTupleData, tTupleDesc,tpTrigData,NONPRIMARY);
+ }
+ else {
+ cpKeyData = packageData(tTupleData,tTupleDesc,tpTrigData,ALL);
+ }
+
+ planData[0] = PointerGetDatum(cpKeyData);
+ iRetValue = SPI_execp(pplan,planData,NULL,1);
+
+ if(cpKeyData!=0) {
+ SPI_pfree(cpKeyData);
+ }
+
+ if(iRetValue != SPI_OK_INSERT ) {
+ elog(NOTICE,"Error inserting row in pendingDelete");
+ return -1;
+ }
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,"INSERT SUCCESFULL");
+#endif
+
+ return 0;
+
+}
+
+/**
+ * Packages the data in tTupleData into a string of the format
+ * FieldName='value text' where any quotes inside of value text
+ * are escaped with a backslash and any backslashes in value text
+ * are esacped by a second back slash.
+ *
+ * tTupleDesc should be a description of the tuple stored in
+ * tTupleData.
+ *
+ * eFieldUsage specifies which fields to use.
+ * PRIMARY implies include only primary key fields.
+ * NONPRIMARY implies include only non-primary key fields.
+ * ALL implies include all fields.
+ */
+char * packageData(HeapTuple tTupleData, TupleDesc tTupleDesc,
+ TriggerData * tpTrigData,
+ enum FieldUsage eKeyUsage ) {
+ int iNumCols;
+ int2vector * tpPKeys=NULL;
+ int iColumnCounter;
+ char * cpDataBlock;
+ int iDataBlockSize;
+ int iUsedDataBlock;
+
+ iNumCols = tTupleDesc->natts;
+
+ if(eKeyUsage!=ALL) {
+ tpPKeys = getPrimaryKey(tpTrigData->tg_relation->rd_id);
+ if(tpPKeys==NULL) {
+ return NULL;
+ }
+ }
+#if defined DEBUG_OUTPUT
+ if(tpPKeys!=NULL) {
+ elog(NOTICE,"Have primary keys");
+ }
+#endif
+ cpDataBlock = SPI_palloc(BUFFER_SIZE);
+ iDataBlockSize = BUFFER_SIZE;
+ iUsedDataBlock = 0; /* To account for the null */
+
+ for(iColumnCounter=1; iColumnCounter <=iNumCols; iColumnCounter++) {
+ int iIsPrimaryKey;
+ int iPrimaryKeyIndex;
+ char * cpUnFormatedPtr;
+ char * cpFormatedPtr;
+
+ char * cpFieldName;
+ char * cpFieldData;
+ if(eKeyUsage!=ALL) {
+ //Determine if this is a primary key or not.
+ iIsPrimaryKey=0;
+ for(iPrimaryKeyIndex=0; (*tpPKeys)[iPrimaryKeyIndex]!=0;
+ iPrimaryKeyIndex++) {
+ if((*tpPKeys)[iPrimaryKeyIndex]==iColumnCounter) {
+ iIsPrimaryKey=1;
+ break;
+ }
+ }
+ if( iIsPrimaryKey ? (eKeyUsage!=PRIMARY) : (eKeyUsage!=NONPRIMARY)) {
+ /**
+ * Don't use.
+ */
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,"Skipping column");
+#endif
+ continue;
+ }
+ } /* KeyUsage!=ALL */
+ cpFieldName = DatumGetPointer(NameGetDatum(&tTupleDesc->attrs
+ [iColumnCounter-1]->attname));
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,cpFieldName);
+#endif
+ while(iDataBlockSize - iUsedDataBlock < strlen(cpFieldName) +4) {
+ cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize + BUFFER_SIZE);
+ iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
+ }
+ sprintf(cpDataBlock+iUsedDataBlock,"\"%s\"=",cpFieldName);
+ iUsedDataBlock = iUsedDataBlock + strlen(cpFieldName)+3;
+ cpFieldData=SPI_getvalue(tTupleData,tTupleDesc,iColumnCounter);
+
+ cpUnFormatedPtr = cpFieldData;
+ cpFormatedPtr = cpDataBlock + iUsedDataBlock;
+ if(cpFieldData!=NULL) {
+ *cpFormatedPtr='\'';
+ iUsedDataBlock++;
+ cpFormatedPtr++;
+ }
+ else {
+ *cpFormatedPtr=' ';
+ iUsedDataBlock++;
+ cpFormatedPtr++;
+ continue;
+
+ }
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,cpFieldData);
+ elog(NOTICE,"Starting format loop");
+#endif
+ while(*cpUnFormatedPtr!=0) {
+ while(iDataBlockSize - iUsedDataBlock < 2) {
+ cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE);
+ iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
+ cpFormatedPtr = cpDataBlock + iUsedDataBlock;
+ }
+ if(*cpUnFormatedPtr=='\\' || *cpUnFormatedPtr=='\'') {
+ *cpFormatedPtr='\\';
+ cpFormatedPtr++;
+ iUsedDataBlock++;
+ }
+ *cpFormatedPtr=*cpUnFormatedPtr;
+ cpFormatedPtr++;
+ cpUnFormatedPtr++;
+ iUsedDataBlock++;
+ }
+
+ SPI_pfree(cpFieldData);
+
+ while(iDataBlockSize - iUsedDataBlock < 3) {
+ cpDataBlock = SPI_repalloc(cpDataBlock,iDataBlockSize+BUFFER_SIZE);
+ iDataBlockSize = iDataBlockSize + BUFFER_SIZE;
+ cpFormatedPtr = cpDataBlock + iUsedDataBlock;
+ }
+ sprintf(cpFormatedPtr,"' ");
+ iUsedDataBlock = iUsedDataBlock +2;
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,cpDataBlock);
+#endif
+
+ } /* for iColumnCounter */
+ if(tpPKeys!=NULL) {
+ SPI_pfree(tpPKeys);
+ }
+#if defined DEBUG_OUTPUT
+ elog(NOTICE,"Returning");
+#endif
+ memset(cpDataBlock + iUsedDataBlock,0,iDataBlockSize - iUsedDataBlock);
+
+ return cpDataBlock;
+
+}
--- /dev/null
+#########################################################################
+# Config file for DBMirror.pl
+# This file contains a sample configuration file for DBMirror.pl
+# It contains configuration information to mirror data from
+# the master database to a single slave system.
+#
+# $Id: slaveDatabase.conf,v 1.1 2002/06/23 21:58:08 momjian Exp $
+#######################################################################
+
+$masterHost = "masterMachine.mydomain.com";
+$masterDb = "myDatabase";
+$masterUser = "postgres";
+$masterPassword = "postgrespassword";
+
+# Where to email Error messages to
+
+$slaveInfo->{"slaveHost"} = "backupMachine.mydomain.com";
+$slaveInfo->{"slaveDb"} = "myDatabase";
+$slaveInfo->{"slaveUser"} = "postgres";
+$slaveInfo->{"slavePassword"} = "postgrespassword";
+