Handle DROP DATABASE getting interrupted
authorAndres Freund
Thu, 13 Jul 2023 20:03:31 +0000 (13:03 -0700)
committerAndres Freund
Thu, 13 Jul 2023 20:04:45 +0000 (13:04 -0700)
Until now, when DROP DATABASE got interrupted in the wrong moment, the removal
of the pg_database row would also roll back, even though some irreversible
steps have already been taken. E.g. DropDatabaseBuffers() might have thrown
out dirty buffers, or files could have been unlinked. But we continued to
allow connections to such a corrupted database.

To fix this, mark databases invalid with an in-place update, just before
starting to perform irreversible steps. As we can't add a new column in the
back branches, we use pg_database.datconnlimit = -2 for this purpose.

An invalid database cannot be connected to anymore, but can still be
dropped.

Unfortunately we can't easily add output to psql's \l to indicate that some
database is invalid, it doesn't fit in any of the existing columns.

Add tests verifying that a interrupted DROP DATABASE is handled correctly in
the backend and in various tools.

Reported-by: Evgeny Morozov
Author: Andres Freund 
Reviewed-by: Daniel Gustafsson
Reviewed-by: Thomas Munro
Discussion: https://postgr.es/m/20230509004637[email protected]
Discussion: https://postgr.es/m/20230314174521[email protected]
Backpatch: 11-, bug present in all supported versions

19 files changed:
doc/src/sgml/catalogs.sgml
src/backend/commands/dbcommands.c
src/backend/commands/vacuum.c
src/backend/postmaster/autovacuum.c
src/backend/utils/init/postinit.c
src/bin/pg_amcheck/pg_amcheck.c
src/bin/pg_amcheck/t/002_nonesuch.pl
src/bin/pg_dump/pg_dumpall.c
src/bin/pg_dump/t/002_pg_dump.pl
src/bin/pg_upgrade/t/002_pg_upgrade.pl
src/bin/scripts/clusterdb.c
src/bin/scripts/reindexdb.c
src/bin/scripts/t/011_clusterdb_all.pl
src/bin/scripts/t/050_dropdb.pl
src/bin/scripts/t/091_reindexdb_all.pl
src/bin/scripts/t/101_vacuumdb_all.pl
src/bin/scripts/vacuumdb.c
src/include/catalog/pg_database.h
src/test/recovery/t/037_invalid_database.pl [new file with mode: 0644]

index 1985705546f9988bfbeb8c0149e59fc50e4ff2b7..7aaf72966abb598798dd86880277b43739bab62e 100644 (file)
@@ -3003,7 +3003,8 @@ SCRAM-SHA-256$<iteration count>:&l
       
       
        Sets maximum number of concurrent connections that can be made
-       to this database.  -1 means no limit.
+       to this database.  -1 means no limit, -2 indicates the database is
+       invalid.
       
      
 
index ef737356f2762cdc4a2e9891a442af35fa1a0e79..93f0c739e5519711a44f3e1d6e9e9540891b598a 100644 (file)
@@ -715,7 +715,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
    int         encoding = -1;
    bool        dbistemplate = false;
    bool        dballowconnections = true;
-   int         dbconnlimit = -1;
+   int         dbconnlimit = DATCONNLIMIT_UNLIMITED;
    char       *dbcollversion = NULL;
    int         notherbackends;
    int         npreparedxacts;
@@ -914,7 +914,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
    if (dconnlimit && dconnlimit->arg)
    {
        dbconnlimit = defGetInt32(dconnlimit);
-       if (dbconnlimit < -1)
+       if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                     errmsg("invalid connection limit: %d", dbconnlimit)));
@@ -965,6 +965,16 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
                 errmsg("template database \"%s\" does not exist",
                        dbtemplate)));
 
+   /*
+    * If the source database was in the process of being dropped, we can't
+    * use it as a template.
+    */
+   if (database_is_invalid_oid(src_dboid))
+       ereport(ERROR,
+               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+               errmsg("cannot use invalid database \"%s\" as template", dbtemplate),
+               errhint("Use DROP DATABASE to drop invalid databases."));
+
    /*
     * Permission check: to copy a DB that's not marked datistemplate, you
     * must be superuser or the owner thereof.
@@ -1513,6 +1523,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
    bool        db_istemplate;
    Relation    pgdbrel;
    HeapTuple   tup;
+   Form_pg_database datform;
    int         notherbackends;
    int         npreparedxacts;
    int         nslots,
@@ -1628,17 +1639,6 @@ dropdb(const char *dbname, bool missing_ok, bool force)
                        dbname),
                 errdetail_busy_db(notherbackends, npreparedxacts)));
 
-   /*
-    * Remove the database's tuple from pg_database.
-    */
-   tup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(db_id));
-   if (!HeapTupleIsValid(tup))
-       elog(ERROR, "cache lookup failed for database %u", db_id);
-
-   CatalogTupleDelete(pgdbrel, &tup->t_self);
-
-   ReleaseSysCache(tup);
-
    /*
     * Delete any comments or security labels associated with the database.
     */
@@ -1655,6 +1655,37 @@ dropdb(const char *dbname, bool missing_ok, bool force)
     */
    dropDatabaseDependencies(db_id);
 
+   /*
+    * Tell the cumulative stats system to forget it immediately, too.
+    */
+   pgstat_drop_database(db_id);
+
+   tup = SearchSysCacheCopy1(DATABASEOID, ObjectIdGetDatum(db_id));
+   if (!HeapTupleIsValid(tup))
+       elog(ERROR, "cache lookup failed for database %u", db_id);
+   datform = (Form_pg_database) GETSTRUCT(tup);
+
+   /*
+    * Except for the deletion of the catalog row, subsequent actions are not
+    * transactional (consider DropDatabaseBuffers() discarding modified
+    * buffers). But we might crash or get interrupted below. To prevent
+    * accesses to a database with invalid contents, mark the database as
+    * invalid using an in-place update.
+    *
+    * We need to flush the WAL before continuing, to guarantee the
+    * modification is durable before performing irreversible filesystem
+    * operations.
+    */
+   datform->datconnlimit = DATCONNLIMIT_INVALID_DB;
+   heap_inplace_update(pgdbrel, tup);
+   XLogFlush(XactLastRecEnd);
+
+   /*
+    * Also delete the tuple - transactionally. If this transaction commits,
+    * the row will be gone, but if we fail, dropdb() can be invoked again.
+    */
+   CatalogTupleDelete(pgdbrel, &tup->t_self);
+
    /*
     * Drop db-specific replication slots.
     */
@@ -1667,11 +1698,6 @@ dropdb(const char *dbname, bool missing_ok, bool force)
     */
    DropDatabaseBuffers(db_id);
 
-   /*
-    * Tell the cumulative stats system to forget it immediately, too.
-    */
-   pgstat_drop_database(db_id);
-
    /*
     * Tell checkpointer to forget any pending fsync and unlink requests for
     * files in the database; else the fsyncs will fail at next checkpoint, or
@@ -2188,7 +2214,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
    ListCell   *option;
    bool        dbistemplate = false;
    bool        dballowconnections = true;
-   int         dbconnlimit = -1;
+   int         dbconnlimit = DATCONNLIMIT_UNLIMITED;
    DefElem    *distemplate = NULL;
    DefElem    *dallowconnections = NULL;
    DefElem    *dconnlimit = NULL;
@@ -2259,7 +2285,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
    if (dconnlimit && dconnlimit->arg)
    {
        dbconnlimit = defGetInt32(dconnlimit);
-       if (dbconnlimit < -1)
+       if (dbconnlimit < DATCONNLIMIT_UNLIMITED)
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                     errmsg("invalid connection limit: %d", dbconnlimit)));
@@ -2286,6 +2312,14 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
    datform = (Form_pg_database) GETSTRUCT(tuple);
    dboid = datform->oid;
 
+   if (database_is_invalid_form(datform))
+   {
+       ereport(FATAL,
+               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+               errmsg("cannot alter invalid database \"%s\"", stmt->dbname),
+               errhint("Use DROP DATABASE to drop invalid databases."));
+   }
+
    if (!pg_database_ownercheck(dboid, GetUserId()))
        aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_DATABASE,
                       stmt->dbname);
@@ -3007,6 +3041,42 @@ get_database_name(Oid dbid)
    return result;
 }
 
+
+/*
+ * While dropping a database the pg_database row is marked invalid, but the
+ * catalog contents still exist. Connections to such a database are not
+ * allowed.
+ */
+bool
+database_is_invalid_form(Form_pg_database datform)
+{
+   return datform->datconnlimit == DATCONNLIMIT_INVALID_DB;
+}
+
+
+/*
+ * Convenience wrapper around database_is_invalid_form()
+ */
+bool
+database_is_invalid_oid(Oid dboid)
+{
+   HeapTuple   dbtup;
+   Form_pg_database dbform;
+   bool        invalid;
+
+   dbtup = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dboid));
+   if (!HeapTupleIsValid(dbtup))
+       elog(ERROR, "cache lookup failed for database %u", dboid);
+   dbform = (Form_pg_database) GETSTRUCT(dbtup);
+
+   invalid = database_is_invalid_form(dbform);
+
+   ReleaseSysCache(dbtup);
+
+   return invalid;
+}
+
+
 /*
  * recovery_create_dbdir()
  *
index fca5b6c855de25706e133a6bea31c7f80c27d5ad..75b0ca9534707dbf4a93fc3dcedf7e9a014e90a5 100644 (file)
@@ -1749,6 +1749,20 @@ vac_truncate_clog(TransactionId frozenXID,
        Assert(TransactionIdIsNormal(datfrozenxid));
        Assert(MultiXactIdIsValid(datminmxid));
 
+       /*
+        * If database is in the process of getting dropped, or has been
+        * interrupted while doing so, no connections to it are possible
+        * anymore. Therefore we don't need to take it into account here.
+        * Which is good, because it can't be processed by autovacuum either.
+        */
+       if (database_is_invalid_form((Form_pg_database) dbform))
+       {
+           elog(DEBUG2,
+                "skipping invalid database \"%s\" while computing relfrozenxid",
+                NameStr(dbform->datname));
+           continue;
+       }
+
        /*
         * If things are working properly, no database should have a
         * datfrozenxid or datminmxid that is "in the future".  However, such
index a3f7d8f0dcb6f5caab07e8ab4788a7d7914eacb9..72a5c4aefe99a17d3de7751404c773220ac7dca7 100644 (file)
@@ -1915,6 +1915,18 @@ get_database_list(void)
        avw_dbase  *avdb;
        MemoryContext oldcxt;
 
+       /*
+        * If database has partially been dropped, we can't, nor need to,
+        * vacuum it.
+        */
+       if (database_is_invalid_form(pgdatabase))
+       {
+           elog(DEBUG2,
+                "autovacuum: skipping invalid database \"%s\"",
+                NameStr(pgdatabase->datname));
+           continue;
+       }
+
        /*
         * Allocate our results in the caller's context, not the
         * transaction's. We do this inside the loop, and restore the original
index ae03706d84010b683f3836fcd96b637ae85f37c3..884cb2785f8241ddcf6b6d45d588eafa12c4c285 100644 (file)
@@ -1042,6 +1042,7 @@ InitPostgres(const char *in_dbname, Oid dboid,
    if (!bootstrap)
    {
        HeapTuple   tuple;
+       Form_pg_database datform;
 
        tuple = GetDatabaseTuple(dbname);
        if (!HeapTupleIsValid(tuple) ||
@@ -1051,6 +1052,15 @@ InitPostgres(const char *in_dbname, Oid dboid,
                    (errcode(ERRCODE_UNDEFINED_DATABASE),
                     errmsg("database \"%s\" does not exist", dbname),
                     errdetail("It seems to have just been dropped or renamed.")));
+
+       datform = (Form_pg_database) GETSTRUCT(tuple);
+       if (database_is_invalid_form(datform))
+       {
+           ereport(FATAL,
+                   errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                   errmsg("cannot connect to invalid database \"%s\"", dbname),
+                   errhint("Use DROP DATABASE to drop invalid databases."));
+       }
    }
 
    /*
index 3cff319f0257987b9b9c180681ecfa4679f2efef..6a83fae6174d17778680647e726aef7afa38aa6f 100644 (file)
@@ -1590,7 +1590,7 @@ compile_database_list(PGconn *conn, SimplePtrList *databases,
                         "FROM pg_catalog.pg_database d "
                         "LEFT OUTER JOIN exclude_raw e "
                         "ON d.datname ~ e.rgx "
-                        "\nWHERE d.datallowconn "
+                        "\nWHERE d.datallowconn AND datconnlimit != -2 "
                         "AND e.pattern_id IS NULL"
                         "),"
 
index 0c07016aa0c55053d8f2534cb3207480bdba28cd..6c89ead2fbdd010446065b6b6e2ab972996007d4 100644 (file)
@@ -291,6 +291,40 @@ $node->command_checks_all(
    'many unmatched patterns and one matched pattern under --no-strict-names'
 );
 
+
+#########################################
+# Test that an invalid / partially dropped database won't be targeted
+
+$node->safe_psql(
+   'postgres', q(
+   CREATE DATABASE regression_invalid;
+   UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+
+$node->command_checks_all(
+   [
+       'pg_amcheck', '-d', 'regression_invalid'
+   ],
+   1,
+   [qr/^$/],
+   [
+       qr/pg_amcheck: error: no connectable databases to check matching "regression_invalid"/,
+   ],
+   'checking handling of invalid database');
+
+$node->command_checks_all(
+   [
+       'pg_amcheck', '-d', 'postgres',
+       '-t', 'regression_invalid.public.foo',
+   ],
+   1,
+   [qr/^$/],
+   [
+       qr/pg_amcheck: error: no connectable databases to check matching "regression_invalid.public.foo"/,
+   ],
+   'checking handling of object in invalid database');
+
+
 #########################################
 # Test checking otherwise existent objects but in databases where they do not exist
 
index ae41a652d799e6337cff1eb79e0c168a0d1e0804..7e639f9aab9cd291f6abc1a24b9da1f289cbe691 100644 (file)
@@ -1178,7 +1178,7 @@ dropDBs(PGconn *conn)
    res = executeQuery(conn,
                       "SELECT datname "
                       "FROM pg_database d "
-                      "WHERE datallowconn "
+                      "WHERE datallowconn AND datconnlimit != -2 "
                       "ORDER BY datname");
 
    if (PQntuples(res) > 0)
@@ -1321,7 +1321,7 @@ dumpDatabases(PGconn *conn)
    res = executeQuery(conn,
                       "SELECT datname "
                       "FROM pg_database d "
-                      "WHERE datallowconn "
+                      "WHERE datallowconn AND datconnlimit != -2 "
                       "ORDER BY (datname <> 'template1'), datname");
 
    if (PQntuples(res) > 0)
index 695baccf841c7bb1cd5e186f9f0f23e32202447c..32c3e0666661c53512cb4d745c711b2477972694 100644 (file)
@@ -1575,6 +1575,17 @@ my %tests = (
        },
    },
 
+   'CREATE DATABASE regression_invalid...' => {
+       create_order => 1,
+       create_sql => q(
+           CREATE DATABASE regression_invalid;
+           UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid'),
+       regexp => qr/^CREATE DATABASE regression_invalid/m,
+       not_like => {
+           pg_dumpall_dbprivs => 1,
+       },
+   },
+
    'CREATE ACCESS METHOD gist2' => {
        create_order => 52,
        create_sql =>
@@ -4028,6 +4039,14 @@ command_fails_like(
    qr/pg_dump: error: connection to server .* failed: FATAL:  database "qqq" does not exist/,
    'connecting to a non-existent database');
 
+#########################################
+# Test connecting to an invalid database
+
+$node->command_fails_like(
+   [ 'pg_dump', '-d', 'regression_invalid' ],
+   qr/pg_dump: error: connection to server .* failed: FATAL:  cannot connect to invalid database "regression_invalid"/,
+   'connecting to an invalid database');
+
 #########################################
 # Test connecting with an unprivileged user
 
index 46bfc9a80335925cf0ed33ca8fe4ca7c27185794..4dcf5c01ce5ac538803c7d4d34284bcfb06e3abb 100644 (file)
@@ -254,6 +254,12 @@ if (defined($ENV{oldinstall}))
    }
 }
 
+# Create an invalid database, will be deleted below
+$oldnode->safe_psql('postgres', qq(
+  CREATE DATABASE regression_invalid;
+  UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+
 # In a VPATH build, we'll be started in the source directory, but we want
 # to run pg_upgrade in the build directory so that any files generated finish
 # in it, like delete_old_cluster.{sh,bat}.
@@ -282,6 +288,26 @@ ok(-d $newnode->data_dir . "/pg_upgrade_output.d",
    "pg_upgrade_output.d/ not removed after pg_upgrade failure");
 rmtree($newnode->data_dir . "/pg_upgrade_output.d");
 
+# Check that pg_upgrade aborts when encountering an invalid database
+command_checks_all(
+   [
+       'pg_upgrade', '--no-sync', '-d', $oldnode->data_dir,
+       '-D', $newnode->data_dir, '-b', $oldbindir,
+       '-B', $newbindir, '-s', $newnode->host,
+       '-p', $oldnode->port, '-P', $newnode->port,
+       '--check',
+   ],
+   1,
+   [qr/invalid/], # pg_upgrade prints errors on stdout :(
+   [qr//],
+   'invalid database causes failure');
+rmtree($newnode->data_dir . "/pg_upgrade_output.d");
+
+# And drop it, so we can continue
+$oldnode->start;
+$oldnode->safe_psql('postgres', 'DROP DATABASE regression_invalid');
+$oldnode->stop;
+
 # --check command works here, cleans up pg_upgrade_output.d.
 command_ok(
    [
index df1766679b5d0d614280bec718d25fdeaaef3a0c..bc5a109ab878c7d21866a1bdd5141391d7e13050 100644 (file)
@@ -234,7 +234,9 @@ cluster_all_databases(ConnParams *cparams, const char *progname,
    int         i;
 
    conn = connectMaintenanceDatabase(cparams, progname, echo);
-   result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", echo);
+   result = executeQuery(conn,
+                         "SELECT datname FROM pg_database WHERE datallowconn AND datconnlimit <> -2 ORDER BY 1;",
+                         echo);
    PQfinish(conn);
 
    for (i = 0; i < PQntuples(result); i++)
index f3b03ec3256557642cafe0beb406f7bb6d89871a..fc2ad9d508a955e9f8f1574112486ccfc12addbd 100644 (file)
@@ -730,7 +730,9 @@ reindex_all_databases(ConnParams *cparams,
    int         i;
 
    conn = connectMaintenanceDatabase(cparams, progname, echo);
-   result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", echo);
+   result = executeQuery(conn,
+                         "SELECT datname FROM pg_database WHERE datallowconn AND datconnlimit <> -2 ORDER BY 1;",
+                         echo);
    PQfinish(conn);
 
    for (i = 0; i < PQntuples(result); i++)
index 7818988976cbb0c31f556b24226963a4dd0058af..7a209cf64e53dd2b084eb8dc79c2d20632eae2be 100644 (file)
@@ -21,4 +21,18 @@ $node->issues_sql_like(
    qr/statement: CLUSTER.*statement: CLUSTER/s,
    'cluster all databases');
 
+$node->safe_psql(
+   'postgres', q(
+   CREATE DATABASE regression_invalid;
+   UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+$node->command_ok([ 'clusterdb', '-a' ],
+  'invalid database not targeted by clusterdb -a');
+
+# Doesn't quite belong here, but don't want to waste time by creating an
+# invalid database in 010_clusterdb.pl as well.
+$node->command_fails_like([ 'clusterdb', '-d', 'regression_invalid'],
+  qr/FATAL:  cannot connect to invalid database "regression_invalid"/,
+  'clusterdb cannot target invalid database');
+
 done_testing();
index 4e946667d5e193ba5a5717940db3a767bb8b792a..1ca9ab84d29be905c650804ae1677d56889145e2 100644 (file)
@@ -31,4 +31,13 @@ $node->issues_sql_like(
 $node->command_fails([ 'dropdb', 'nonexistent' ],
    'fails with nonexistent database');
 
+# check that invalid database can be dropped with dropdb
+$node->safe_psql(
+   'postgres', q(
+   CREATE DATABASE regression_invalid;
+   UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+$node->command_ok([ 'dropdb', 'regression_invalid' ],
+  'invalid database can be dropped');
+
 done_testing();
index 378bf9060e92114b091cf7f39c5ecb792c90287d..b197ef90ceae16803b8e12ed5bf6a9799ef1f232 100644 (file)
@@ -18,4 +18,18 @@ $node->issues_sql_like(
    qr/statement: REINDEX.*statement: REINDEX/s,
    'reindex all databases');
 
+$node->safe_psql(
+   'postgres', q(
+   CREATE DATABASE regression_invalid;
+   UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+$node->command_ok([ 'reindexdb', '-a' ],
+  'invalid database not targeted by reindexdb -a');
+
+# Doesn't quite belong here, but don't want to waste time by creating an
+# invalid database in 090_reindexdb.pl as well.
+$node->command_fails_like([ 'reindexdb', '-d', 'regression_invalid'],
+  qr/FATAL:  cannot connect to invalid database "regression_invalid"/,
+  'reindexdb cannot target invalid database');
+
 done_testing();
index 1dcf411767122133c77fc4f6b0359083bcabe9c8..d3fe59bacc9065a41eceef14b571ff9e51c0b6de 100644 (file)
@@ -16,4 +16,18 @@ $node->issues_sql_like(
    qr/statement: VACUUM.*statement: VACUUM/s,
    'vacuum all databases');
 
+$node->safe_psql(
+   'postgres', q(
+   CREATE DATABASE regression_invalid;
+   UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+$node->command_ok([ 'vacuumdb', '-a' ],
+  'invalid database not targeted by vacuumdb -a');
+
+# Doesn't quite belong here, but don't want to waste time by creating an
+# invalid database in 010_vacuumdb.pl as well.
+$node->command_fails_like([ 'vacuumdb', '-d', 'regression_invalid'],
+  qr/FATAL:  cannot connect to invalid database "regression_invalid"/,
+  'vacuumdb cannot target invalid database');
+
 done_testing();
index 92f1ffe14796787c186b0ee3873120147d1e71d6..2c5763450586cfa73cc9551b47d396d0292e4466 100644 (file)
@@ -741,7 +741,7 @@ vacuum_all_databases(ConnParams *cparams,
 
    conn = connectMaintenanceDatabase(cparams, progname, echo);
    result = executeQuery(conn,
-                         "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
+                         "SELECT datname FROM pg_database WHERE datallowconn AND datconnlimit <> -2 ORDER BY 1;",
                          echo);
    PQfinish(conn);
 
index 611c95656a92311739bbf58f7032498e209e8e18..48dec14acc39034e8617f62c9fa872e3aadff4c4 100644 (file)
@@ -49,7 +49,10 @@ CATALOG(pg_database,1262,DatabaseRelationId) BKI_SHARED_RELATION BKI_ROWTYPE_OID
    /* new connections allowed? */
    bool        datallowconn;
 
-   /* max connections allowed (-1=no limit) */
+   /*
+    * Max connections allowed. Negative values have special meaning, see
+    * DATCONNLIMIT_* defines below.
+    */
    int32       datconnlimit;
 
    /* all Xids < this are frozen in this DB */
@@ -100,4 +103,19 @@ DECLARE_UNIQUE_INDEX_PKEY(pg_database_oid_index, 2672, DatabaseOidIndexId, on pg
 DECLARE_OID_DEFINING_MACRO(Template0DbOid, 4);
 DECLARE_OID_DEFINING_MACRO(PostgresDbOid, 5);
 
+/*
+ * Special values for pg_database.datconnlimit. Normal values are >= 0.
+ */
+#define          DATCONNLIMIT_UNLIMITED    -1  /* no limit */
+
+/*
+ * A database is set to invalid partway through being dropped.  Using
+ * datconnlimit=-2 for this purpose isn't particularly clean, but is
+ * backpatchable.
+ */
+#define          DATCONNLIMIT_INVALID_DB   -2
+
+extern bool database_is_invalid_form(Form_pg_database datform);
+extern bool database_is_invalid_oid(Oid dboid);
+
 #endif                         /* PG_DATABASE_H */
diff --git a/src/test/recovery/t/037_invalid_database.pl b/src/test/recovery/t/037_invalid_database.pl
new file mode 100644 (file)
index 0000000..a061fab
--- /dev/null
@@ -0,0 +1,157 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+#
+# Test we handle interrupted DROP DATABASE correctly.
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('node');
+$node->init;
+$node->append_conf(
+   "postgresql.conf", qq(
+autovacuum = off
+max_prepared_transactions=5
+log_min_duration_statement=0
+log_connections=on
+log_disconnections=on
+));
+
+$node->start;
+
+
+# First verify that we can't connect to or ALTER an invalid database. Just
+# mark the database as invalid ourselves, that's more reliable than hitting the
+# required race conditions (see testing further down)...
+
+$node->safe_psql(
+   "postgres", qq(
+CREATE DATABASE regression_invalid;
+UPDATE pg_database SET datconnlimit = -2 WHERE datname = 'regression_invalid';
+));
+
+my $psql_stdout = '';
+my $psql_stderr = '';
+
+is($node->psql('regression_invalid', '', stderr => \$psql_stderr),
+   2, "can't connect to invalid database - error code");
+like(
+   $psql_stderr,
+   qr/FATAL:\s+cannot connect to invalid database "regression_invalid"/,
+   "can't connect to invalid database - error message");
+
+is($node->psql('postgres', 'ALTER DATABASE regression_invalid CONNECTION LIMIT 10'),
+   2, "can't ALTER invalid database");
+
+# check invalid database can't be used as a template
+is( $node->psql('postgres', 'CREATE DATABASE copy_invalid TEMPLATE regression_invalid'),
+   3,
+   "can't use invalid database as template");
+
+
+# Verify that VACUUM ignores an invalid database when computing how much of
+# the clog is needed (vac_truncate_clog()). For that we modify the pg_database
+# row of the invalid database to have an outdated datfrozenxid.
+$psql_stderr = '';
+$node->psql(
+   'postgres',
+   qq(
+UPDATE pg_database SET datfrozenxid = '123456' WHERE datname = 'regression_invalid';
+DROP TABLE IF EXISTS foo_tbl; CREATE TABLE foo_tbl();
+VACUUM FREEZE;),
+   stderr => \$psql_stderr);
+unlike(
+   $psql_stderr,
+   qr/some databases have not been vacuumed in over 2 billion transactions/,
+   "invalid databases are ignored by vac_truncate_clog");
+
+
+# But we need to be able to drop an invalid database.
+is( $node->psql(
+       'postgres', 'DROP DATABASE regression_invalid',
+       stdout => \$psql_stdout,
+       stderr => \$psql_stderr),
+   0,
+   "can DROP invalid database");
+
+# Ensure database is gone
+is($node->psql('postgres', 'DROP DATABASE regression_invalid'),
+   3, "can't drop already dropped database");
+
+
+# Test that interruption of DROP DATABASE is handled properly. To ensure the
+# interruption happens at the appropriate moment, we lock pg_tablespace. DROP
+# DATABASE scans pg_tablespace once it has reached the "irreversible" part of
+# dropping the database, making it a suitable point to wait.
+my $bgpsql_in    = '';
+my $bgpsql_out   = '';
+my $bgpsql_err   = '';
+my $bgpsql_timer = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+my $bgpsql = $node->background_psql('postgres', \$bgpsql_in, \$bgpsql_out,
+   $bgpsql_timer, on_error_stop => 0);
+$bgpsql_out = '';
+$bgpsql_in .= "SELECT pg_backend_pid();\n";
+
+pump_until($bgpsql, $bgpsql_timer, \$bgpsql_out, qr/\d/);
+
+my $pid = $bgpsql_out;
+$bgpsql_out = '';
+
+# create the database, prevent drop database via lock held by a 2PC transaction
+$bgpsql_in .= qq(
+  CREATE DATABASE regression_invalid_interrupt;
+  BEGIN;
+  LOCK pg_tablespace;
+  PREPARE TRANSACTION 'lock_tblspc';
+  \\echo done
+);
+
+ok(pump_until($bgpsql, $bgpsql_timer, \$bgpsql_out, qr/done/),
+   "blocked DROP DATABASE completion");
+$bgpsql_out = '';
+
+# Try to drop. This will wait due to the still held lock.
+$bgpsql_in .= qq(
+  DROP DATABASE regression_invalid_interrupt;
+  \\echo DROP DATABASE completed
+);
+$bgpsql->pump_nb;
+
+# Ensure we're waiting for the lock
+$node->poll_query_until('postgres',
+   qq(SELECT EXISTS(SELECT * FROM pg_locks WHERE NOT granted AND relation = 'pg_tablespace'::regclass AND mode = 'AccessShareLock');)
+);
+
+# and finally interrupt the DROP DATABASE
+ok($node->safe_psql('postgres', "SELECT pg_cancel_backend($pid)"),
+   "canceling DROP DATABASE");
+
+# wait for cancellation to be processed
+ok( pump_until(
+       $bgpsql, $bgpsql_timer, \$bgpsql_out, qr/DROP DATABASE completed/),
+   "cancel processed");
+$bgpsql_out = '';
+
+# verify that connection to the database aren't allowed
+is($node->psql('regression_invalid_interrupt', ''),
+   2, "can't connect to invalid_interrupt database");
+
+# To properly drop the database, we need to release the lock previously preventing
+# doing so.
+$bgpsql_in .= qq(
+  ROLLBACK PREPARED 'lock_tblspc';
+  \\echo ROLLBACK PREPARED
+);
+ok(pump_until($bgpsql, $bgpsql_timer, \$bgpsql_out, qr/ROLLBACK PREPARED/),
+   "unblock DROP DATABASE");
+$bgpsql_out = '';
+
+is($node->psql('postgres', "DROP DATABASE regression_invalid_interrupt"),
+   0, "DROP DATABASE invalid_interrupt");
+
+$bgpsql_in .= "\\q\n";
+$bgpsql->finish();
+
+done_testing();