Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .abi-check/7.1.0/postgres.symbols.ignore
Original file line number Diff line number Diff line change
@@ -1 +1,12 @@
pgarch_start
ConfigureNamesInt_gp
child_triggers
has_update_triggers
ConfigureNamesBool_gp
aocs_beginscan
AppendOnlyBlockDirectory_GetEntry
ConfigureNamesString_gp
gp_pause_on_restore_point_replay
ConfigureNamesReal_gp
TableAmRoutine
MainLWLockNames
5 changes: 5 additions & 0 deletions GNUmakefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ installcheck-gpcheckcat:
$(call recurse,installcheck-world,gpcontrib/gp_replica_check,installcheck)
$(call recurse,installcheck-world,src/bin/pg_upgrade,check)

.PHONY: installcheck-hot-standby
installcheck-hot-standby: submake-generated-headers
$(MAKE) -C src/test/regress installcheck-hot-standby
$(MAKE) -C src/test/isolation2 installcheck-hot-standby

# Run mock tests, that don't require a running server. Arguably these should
# be part of [install]check-world, but we treat them more like part of
# compilation than regression testing, in the CI. But they are too heavy-weight
Expand Down
8 changes: 8 additions & 0 deletions src/backend/access/heap/heapam.c
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,14 @@ heapgetpage(TableScanDesc sscan, BlockNumber page)

LockBuffer(buffer, BUFFER_LOCK_UNLOCK);

#ifdef FAULT_INJECTOR
FaultInjector_InjectFaultIfSet(
"heapgetpage_after_unlock_buffer",
DDLNotSpecified,
"", /* databaseName */
RelationGetRelationName(scan->rs_base.rs_rd)); /* tableName */
#endif

Assert(ntup <= MaxHeapTuplesPerPage);
scan->rs_ntuples = ntup;
}
Expand Down
11 changes: 11 additions & 0 deletions src/backend/access/rmgrdesc/standbydesc.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ standby_desc(StringInfo buf, XLogReaderState *record)
xlrec->dbId, xlrec->tsId,
xlrec->relcacheInitFileInval);
}
else if (info == XLOG_LATESTCOMPLETED_GXID)
{
DistributedTransactionId gxid;

gxid = *((DistributedTransactionId *) rec);
appendStringInfo(buf, UINT64_FORMAT, gxid);
}

}

const char *
Expand All @@ -84,6 +92,9 @@ standby_identify(uint8 info)
case XLOG_INVALIDATIONS:
id = "INVALIDATIONS";
break;
case XLOG_LATESTCOMPLETED_GXID:
id = "XLOG_LATESTCOMPLETED_GXID";
break;
}

return id;
Expand Down
45 changes: 45 additions & 0 deletions src/backend/access/transam/README
Original file line number Diff line number Diff line change
Expand Up @@ -897,3 +897,48 @@ yet simplifies emulation of subtransactions considerably.

Further details on locking mechanics in recovery are given in comments
with the Lock rmgr code.

Distributed Transaction Emulation during Recovery
-------------------------------------

In GPDB, the MVCC snapshot also includes distributed transactions (aka dtx).
Accordingly, on a hot standby we also emulate running dtx. The way to do that
is to re-use the shmCommittedGxidArray which has been used on a primary for dtx
recovery: it tracks all the 2PC dtx that have their PREPARE phase done,
but for which the COMMIT phase hasn't finished (i.e. window between the
XLOG_XACT_DISTRIBUTED_COMMIT record being written and the
XLOG_XACT_DISTRIBUTED_FORGET record being written on the QD). On a hot standby,
any dtx shown in that array are regarded as in-progress. The MVCC snapshot does
not really need to account for dtx not in that array: for a dtx that hasn't
done PREPARE, we know no segment has committed any data yet; for a dtx that
hasn't done COMMIT, we know all segments have committed their data.

Note: dtxes that are preparing will not be tracked in this array, and thus will
not be included in this snapshot. This is slightly different from a primary QD,
where such transactions would have been included in the distributed snapshot's
inProgressXidArray (as we construct the inProgressXidArray from the PGXACTs that
would contain the dummy entries for prepared transactions). However, as
mentioned in CreateDistributedSnapshot, including these is not a requirement for
correctness.

Note: aborted/aborting dtxes are not accounted for by the standby either. Those
are the dtxes that encountered error during preparing. Same as the previous
point, the standby does not need to be aware of them for correctness. Worth also
noting that if a dtx encountered error after being prepared, it cannot be
aborted anymore and must be committed by the dtx recovery process. Until
committed, such a dtx will be seen as in-progress to the standby.

For 1PC dtx, however, there is a known limitation where the hot standby won't
see the last 1PC (or the last few 1PCs if they are all 1PC). This is because
since 1PC does not have any WAL on QD, the standby QD won't advance its
latestCompletedGxid, so its distributed snapshot horizon does not include the
last 1PC - it would view the last 1PC not yet started or at best still in
progress. Only if another 2PC comes, the standby would advance its
latestCompletedGxid and its distributed snapshot will include the previous 1PC.

We don't emulate the full architecture of "running transaction" for dtx because
that is unnecessary, at least ATM. For example, we don't create a dtx-version
of XLOG_RUNNING_XACTS, because we already have that information as part of the
extended checkpoint (see TMGXACT_CHECKPOINT). We also don't need to emulate
other members in RunningTransactionsData, like subxid or xid-pruning related
variables because those do not apply to dtx.
67 changes: 36 additions & 31 deletions src/backend/access/transam/xact.c
Original file line number Diff line number Diff line change
Expand Up @@ -2475,11 +2475,10 @@ StartTransaction(void)

/*
* Transactions may be started while recovery is in progress, if
* hot standby is enabled. This mode is not supported in
* Cloudberry yet.
* hot standby is enabled.
*/
AssertImply(DistributedTransactionContext != DTX_CONTEXT_LOCAL_ONLY,
!s->startedInRecovery);
EnableHotStandby || !s->startedInRecovery);
/*
* MPP Modification
*
Expand Down Expand Up @@ -2526,20 +2525,39 @@ StartTransaction(void)

case DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER:
case DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER:
/*
* Sanity check for the global xid.
*
* Note for hot standby dispatch: the standby QEs are still
* writers, just like primary QEs for SELECT queries. But
* hot standby dispatch never has a valid gxid, so we skip
* the gxid checks for the standby QEs.
*/
if (!IS_HOT_STANDBY_QE())
{
if (QEDtxContextInfo.distributedXid == InvalidDistributedTransactionId)
elog(ERROR,
"distributed transaction id is invalid in context %s",
DtxContextToString(DistributedTransactionContext));

/*
* Update distributed XID info, this is only used for
* debugging.
*/
LocalDistribXactData *ele = &MyProc->localDistribXactData;
ele->distribXid = QEDtxContextInfo.distributedXid;
ele->state = LOCALDISTRIBXACT_STATE_ACTIVE;
}
else
Assert(QEDtxContextInfo.distributedXid == InvalidDistributedTransactionId);

/* fall through */
case DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT:
{
/* If we're running in test-mode insert a delay in writer. */
if (gp_enable_slow_writer_testmode)
pg_usleep(500000);

if (DistributedTransactionContext != DTX_CONTEXT_QE_AUTO_COMMIT_IMPLICIT &&
QEDtxContextInfo.distributedXid == InvalidDistributedTransactionId)
{
elog(ERROR,
"distributed transaction id is invalid in context %s",
DtxContextToString(DistributedTransactionContext));
}

/*
* Snapshot must not be created before setting transaction
* isolation level.
Expand All @@ -2552,28 +2570,14 @@ StartTransaction(void)
XactReadOnly = isMppTxOptions_ReadOnly(
QEDtxContextInfo.distributedTxnOptions);

/* a hot standby transaction must be read-only */
AssertImply(IS_HOT_STANDBY_QE(), XactReadOnly);

/*
* MPP: we're a QE Writer.
*/
MyTmGxact->gxid = QEDtxContextInfo.distributedXid;

if (DistributedTransactionContext ==
DTX_CONTEXT_QE_TWO_PHASE_EXPLICIT_WRITER ||
DistributedTransactionContext ==
DTX_CONTEXT_QE_TWO_PHASE_IMPLICIT_WRITER)
{
Assert(QEDtxContextInfo.distributedXid !=
InvalidDistributedTransactionId);

/*
* Update distributed XID info, this is only used for
* debugging.
*/
LocalDistribXactData *ele = &MyProc->localDistribXactData;
ele->distribXid = QEDtxContextInfo.distributedXid;
ele->state = LOCALDISTRIBXACT_STATE_ACTIVE;
}

if (SharedLocalSnapshotSlot != NULL)
{
LWLockAcquire(SharedLocalSnapshotSlot->slotLock, LW_EXCLUSIVE);
Expand Down Expand Up @@ -6880,8 +6884,8 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_distrib xl_distrib;
xl_xact_deldbs xl_deldbs;
XLogRecPtr recptr;
bool isOnePhaseQE = (Gp_role == GP_ROLE_EXECUTE && MyTmGxactLocal->isOnePhaseCommit);
bool isDtxPrepared = isPreparedDtxTransaction();
DistributedTransactionId distrib_xid = getDistributedTransactionId();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use EnableHotStandby to control distrib_xid usage ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually no. EnableHotStandby only will be true on hot_standby cluster. On the primary cluster,
it's always false. But the distrib_xid is written to xlog by primary cluster.

Discussed with Max. We prefer to use a Var , may be named 'enable_distri_xid' , it is like the var '
NEXTGXID', it will be both store in a configure file and xlog. Then it will be synced to standby
cluster.

The standby cluster will check the variable 'enable_distri_xid', if it's true, then the cluster
cannot be configured as hot_standby.

How do you think about this idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can optimize wal to counteract this influence.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should reslove conflict then add installcheck-hot-standby pipeline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'm thinking about to add a job to run installcheck-hot-standby in pipeline? @edespino Could you help to add it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add like below in .github/workflows/build-cloudberry.yml

{"test":"ic-isolation2",
               "make_configs":["src/test/isolation2:installcheck-isolation2"]
              },


uint8 info;

Expand Down Expand Up @@ -6971,10 +6975,11 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
}

if (isDtxPrepared || isOnePhaseQE)
/* include distributed xid if there's one */
if (distrib_xid != InvalidDistributedTransactionId)
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_DISTRIB;
xl_distrib.distrib_xid = getDistributedTransactionId();
xl_distrib.distrib_xid = distrib_xid;
}

#if 0
Expand Down
87 changes: 76 additions & 11 deletions src/backend/access/transam/xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ bool track_wal_io_timing = false;
int FileEncryptionEnabled = false;

/* GPDB specific */
bool gp_pause_on_restore_point_replay = false;
char *gp_pause_on_restore_point_replay = "";

/*
* GPDB: Have we reached a specific continuous recovery target? We set this to
* true if WAL replay has found a restore point matching the GPDB-specific GUC
* gp_pause_on_restore_point_replay and a promotion has been requested.
*/
static bool reachedContinuousRecoveryTarget = false;

#ifdef WAL_DEBUG
bool XLOG_DEBUG = false;
Expand Down Expand Up @@ -6012,6 +6019,59 @@ recoveryStopsBefore(XLogReaderState *record)
return stopsHere;
}

/*
* GPDB: Restore point records can act as a point of synchronization to ensure
* cluster-wide consistency during WAL replay. If a restore point is specified
* in the gp_pause_on_restore_point_replay GUC, WAL replay will be paused at
* that restore point until replay is explicitly resumed.
*/
static void
pauseRecoveryOnRestorePoint(XLogReaderState *record)
{
uint8 info;
uint8 rmid;

/*
* Ignore recovery target settings when not in archive recovery (meaning
* we are in crash recovery).
*/
if (!ArchiveRecoveryRequested)
return;

info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
rmid = XLogRecGetRmid(record);

if (rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
xl_restore_point *recordRestorePointData;

recordRestorePointData = (xl_restore_point *) XLogRecGetData(record);

if (strcmp(recordRestorePointData->rp_name, gp_pause_on_restore_point_replay) == 0)
{
ereport(LOG,
(errmsg("setting recovery pause at restore point \"%s\", time %s",
recordRestorePointData->rp_name,
timestamptz_to_str(recordRestorePointData->rp_time))));

SetRecoveryPause(true);
recoveryPausesHere(false);

/*
* If we've unpaused and there is a promotion request, then we've
* reached our continuous recovery target and need to immediately
* promote. We piggyback on the existing recovery target logic to
* do this. See recoveryStopsAfter().
*/
if (CheckForStandbyTrigger())
{
reachedContinuousRecoveryTarget = true;
recoveryTargetAction = RECOVERY_TARGET_ACTION_PROMOTE;
}
}
}
}

/*
* Same as recoveryStopsBefore, but called after applying the record.
*
Expand Down Expand Up @@ -6039,15 +6099,19 @@ recoveryStopsAfter(XLogReaderState *record)
/*
* There can be many restore points that share the same name; we stop at
* the first one.
*
* GPDB: If we've reached the continuous recovery target, we'll use the
* below logic to immediately stop recovery.
*/
if (recoveryTarget == RECOVERY_TARGET_NAME &&
if ((reachedContinuousRecoveryTarget || recoveryTarget == RECOVERY_TARGET_NAME) &&
rmid == RM_XLOG_ID && info == XLOG_RESTORE_POINT)
{
xl_restore_point *recordRestorePointData;

recordRestorePointData = (xl_restore_point *) XLogRecGetData(record);

if (strcmp(recordRestorePointData->rp_name, recoveryTargetName) == 0)
if (reachedContinuousRecoveryTarget ||
strcmp(recordRestorePointData->rp_name, recoveryTargetName) == 0)
{
recoveryStopAfter = true;
recoveryStopXid = InvalidTransactionId;
Expand Down Expand Up @@ -7900,6 +7964,9 @@ StartupXLOG(void)
WalSndWakeup();
}

if (gp_pause_on_restore_point_replay)
pauseRecoveryOnRestorePoint(xlogreader);

/* Exit loop if we reached inclusive recovery target */
if (recoveryStopsAfter(xlogreader))
{
Expand Down Expand Up @@ -8331,6 +8398,8 @@ StartupXLOG(void)
*/
InRecovery = false;

SIMPLE_FAULT_INJECTOR("out_of_recovery_in_startupxlog");

/*
* Hook for plugins to do additional startup works.
*
Expand Down Expand Up @@ -9801,8 +9870,11 @@ CreateCheckPoint(int flags)
* recovery we don't need to write running xact data.
*/
if (!shutdown && XLogStandbyInfoActive())
{
LogStandbySnapshot();

}

SIMPLE_FAULT_INJECTOR("checkpoint_after_redo_calculated");

START_CRIT_SECTION();
Expand Down Expand Up @@ -11126,14 +11198,7 @@ xlog_redo(XLogReaderState *record)
}
else if (info == XLOG_RESTORE_POINT)
{
/*
* GPDB: Restore point records can act as a point of
* synchronization to ensure cluster-wide consistency during WAL
* replay. WAL replay is paused at each restore point until it is
* explicitly resumed.
*/
if (gp_pause_on_restore_point_replay)
SetRecoveryPause(true);
/* nothing to do here */
}
else if (info == XLOG_FPI || info == XLOG_FPI_FOR_HINT)
{
Expand Down
1 change: 1 addition & 0 deletions src/backend/catalog/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
/pg_*_d.h
/gp_*_d.h
/bki-stamp
/system_views_gp.sql
Loading