Skip to content

Commit de83218

Browse files
committed
Add guc gp_hot_standby_snapshot_restore_point_name
For hot standby cluster, only at the restore_point it can reach a consistent state. So the snapshot on hot standby should be token at thre restore_point. Add the guc gp_hot_standby_snapshot_restore_point_name, it sets the restore point name that the snapshot should be token at. Add a new type of xlog: XLOG_RESTORE_POINT_RUNNING_XACTS, it records the running xids and the restore point name. On hot standby, when doing recovery, it the xlogRecord's restore point name equals the gp_hot_standby_snapshot_restore_point_name, then we will update the hot standby snapshot. Beside the XLOG_RESTORE_POINT_RUNNING_XACTS, I think the for checkpoint shutdown xlogrecord, the hot standby cluster is also consistent, we can update the snapshot too. TODO: I have added the test hot_standby/snapshot. But the test failed. It is expected to fail when query table hs_sh on hot standby before the restore point "r1" created. It means this commit cannot work correctly, I must miss something.
1 parent 1ee2fcc commit de83218

File tree

10 files changed

+205
-7
lines changed

10 files changed

+205
-7
lines changed

src/backend/access/rmgrdesc/standbydesc.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ standby_desc_running_xacts(StringInfo buf, xl_running_xacts *xlrec)
3636
appendStringInfoString(buf, "; subxid ovf");
3737
}
3838

39+
static void
40+
standby_desc_rp_running_xacts(StringInfo buf, xl_running_xacts *xlrec, const char *rpName)
41+
{
42+
standby_desc_running_xacts(buf, xlrec);
43+
appendStringInfo(buf, "restore_point %s", rpName);
44+
}
45+
3946
void
4047
standby_desc(StringInfo buf, XLogReaderState *record)
4148
{
@@ -73,6 +80,24 @@ standby_desc(StringInfo buf, XLogReaderState *record)
7380
gxid = *((DistributedTransactionId *) rec);
7481
appendStringInfo(buf, UINT64_FORMAT, gxid);
7582
}
83+
else if (info == XLOG_RESTORE_POINT_RUNNING_XACTS)
84+
{
85+
xl_restore_point_running_xacts *xlrec
86+
= (xl_restore_point_running_xacts *) rec;
87+
xl_running_xacts *xlrecRunning= (xl_running_xacts*) palloc(offsetof(xl_running_xacts, xids) +
88+
(xlrec->xcnt + xlrec->subxcnt) * sizeof(TransactionId));
89+
xlrecRunning->xcnt = xlrec->xcnt;
90+
xlrecRunning->subxcnt = xlrec->subxcnt;
91+
xlrecRunning->subxid_overflow = xlrec->subxid_overflow;
92+
xlrecRunning->nextXid = xlrec->nextXid;
93+
xlrecRunning->oldestRunningXid = xlrec->oldestRunningXid;
94+
xlrecRunning->latestCompletedXid = xlrec->latestCompletedXid;
95+
memcpy(xlrecRunning->xids, xlrec->xids,
96+
(xlrec->xcnt + xlrec->subxcnt) * sizeof(TransactionId));
97+
98+
standby_desc_rp_running_xacts(buf, xlrecRunning, xlrec->rpName);
99+
pfree(xlrecRunning);
100+
}
76101

77102
}
78103

@@ -95,6 +120,9 @@ standby_identify(uint8 info)
95120
case XLOG_LATESTCOMPLETED_GXID:
96121
id = "XLOG_LATESTCOMPLETED_GXID";
97122
break;
123+
case XLOG_RESTORE_POINT_RUNNING_XACTS:
124+
id = "XLOG_RESTORE_POINT_RUNNING_XACTS";
125+
break;
98126
}
99127

100128
return id;

src/backend/access/transam/xlog.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10757,6 +10757,9 @@ XLogRestorePoint(const char *rpName)
1075710757
xlrec.rp_time = GetCurrentTimestamp();
1075810758
strlcpy(xlrec.rp_name, rpName, MAXFNAMELEN);
1075910759

10760+
/* LogHotStandby for the restore here */
10761+
LogRestorePointStandbySnapshot(rpName);
10762+
1076010763
XLogBeginInsert();
1076110764
XLogRegisterData((char *) &xlrec, sizeof(xl_restore_point));
1076210765

src/backend/replication/logical/decode.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,9 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
374374
case XLOG_LATESTCOMPLETED_GXID:
375375
/* FIXME: need to decode this part? */
376376
break;
377+
case XLOG_RESTORE_POINT_RUNNING_XACTS:
378+
/* FIXME: same as XLOG_LATESTCOMPLETED_GXID */
379+
break;
377380
default:
378381
elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
379382
}

src/backend/storage/ipc/standby.c

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ int vacuum_defer_cleanup_age;
4242
int max_standby_archive_delay = 30 * 1000;
4343
int max_standby_streaming_delay = 30 * 1000;
4444
bool log_recovery_conflict_waits = false;
45+
char *gp_hot_standby_snapshot_restore_point_name = NULL;
4546

4647
static HTAB *RecoveryLockLists;
4748

@@ -55,7 +56,9 @@ static void ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlis
5556
uint32 wait_event_info,
5657
bool report_waiting);
5758
static void SendRecoveryConflictWithBufferPin(ProcSignalReason reason);
59+
static XLogRecPtr LogStandbySnapshotImpl(const char *rpName);
5860
static XLogRecPtr LogCurrentRunningXacts(RunningTransactions CurrRunningXacts);
61+
static XLogRecPtr CbdbLogRestorePointRunningXacts(RunningTransactions CurrRunningXacts, const char *rpName);
5962
static void LogAccessExclusiveLocks(int nlocks, xl_standby_lock *locks);
6063
static const char *get_recovery_conflict_desc(ProcSignalReason reason);
6164

@@ -1129,6 +1132,12 @@ standby_redo(XLogReaderState *record)
11291132
}
11301133
else if (info == XLOG_RUNNING_XACTS)
11311134
{
1135+
if (EnableHotStandby && gp_hot_standby_snapshot_restore_point_name)
1136+
{
1137+
elog(LOG, "not calling redo");
1138+
return;
1139+
}
1140+
11321141
xl_running_xacts *xlrec = (xl_running_xacts *) XLogRecGetData(record);
11331142
RunningTransactionsData running;
11341143

@@ -1169,6 +1178,31 @@ standby_redo(XLogReaderState *record)
11691178
LWLockRelease(ProcArrayLock);
11701179
}
11711180
}
1181+
else if (info == XLOG_RESTORE_POINT_RUNNING_XACTS)
1182+
{
1183+
if (!gp_hot_standby_snapshot_restore_point_name)
1184+
return;
1185+
xl_restore_point_running_xacts *xlrec =
1186+
(xl_restore_point_running_xacts *) XLogRecGetData(record);
1187+
char *rpName = gp_hot_standby_snapshot_restore_point_name;
1188+
1189+
if (EnableHotStandby && rpName &&
1190+
strcmp(xlrec->rpName, rpName) == 0)
1191+
{
1192+
RunningTransactionsData running;
1193+
1194+
running.xcnt = xlrec->xcnt;
1195+
running.subxcnt = xlrec->subxcnt;
1196+
running.subxid_overflow = xlrec->subxid_overflow;
1197+
running.nextXid = xlrec->nextXid;
1198+
running.latestCompletedXid = xlrec->latestCompletedXid;
1199+
running.oldestRunningXid = xlrec->oldestRunningXid;
1200+
running.xids = xlrec->xids;
1201+
1202+
elog(LOG, "update running xact %s", rpName);
1203+
ProcArrayApplyRecoveryInfo(&running);
1204+
}
1205+
}
11721206
else
11731207
elog(PANIC, "standby_redo: unknown op code %u", info);
11741208
}
@@ -1241,6 +1275,12 @@ standby_redo(XLogReaderState *record)
12411275
*/
12421276
XLogRecPtr
12431277
LogStandbySnapshot(void)
1278+
{
1279+
return LogStandbySnapshotImpl(NULL);
1280+
}
1281+
1282+
static XLogRecPtr
1283+
LogStandbySnapshotImpl(const char *rpName)
12441284
{
12451285
XLogRecPtr recptr;
12461286
RunningTransactions running;
@@ -1278,7 +1318,15 @@ LogStandbySnapshot(void)
12781318
if (wal_level < WAL_LEVEL_LOGICAL)
12791319
LWLockRelease(ProcArrayLock);
12801320

1281-
recptr = LogCurrentRunningXacts(running);
1321+
if (rpName == NULL)
1322+
{
1323+
recptr = LogCurrentRunningXacts(running);
1324+
}
1325+
else
1326+
{
1327+
recptr = CbdbLogRestorePointRunningXacts(running, rpName);
1328+
}
1329+
12821330

12831331
/* Release lock if we kept it longer ... */
12841332
if (wal_level >= WAL_LEVEL_LOGICAL)
@@ -1289,9 +1337,9 @@ LogStandbySnapshot(void)
12891337
if (IS_QUERY_DISPATCHER())
12901338
{
12911339
/*
1292-
* GPDB: write latestCompletedGxid too, because the standby needs this
1340+
* GPDB: write latestCompletedGxid too, because the standby needs this
12931341
* value for creating distributed snapshot. The standby cannot rely on
1294-
* the nextGxid value to set latestCompletedGxid during restart (which
1342+
* the nextGxid value to set latestCompletedGxid during restart (which
12951343
* the primary does) because nextGxid was bumped in the checkpoint.
12961344
*/
12971345
LWLockAcquire(ProcArrayLock, LW_SHARED);
@@ -1370,6 +1418,69 @@ LogCurrentRunningXacts(RunningTransactions CurrRunningXacts)
13701418
return recptr;
13711419
}
13721420

1421+
XLogRecPtr
1422+
LogRestorePointStandbySnapshot(const char *rpName)
1423+
{
1424+
return LogStandbySnapshotImpl(rpName);
1425+
}
1426+
1427+
static XLogRecPtr
1428+
CbdbLogRestorePointRunningXacts(RunningTransactions CurrRunningXacts, const char *rpName)
1429+
{
1430+
xl_restore_point_running_xacts xlrec;
1431+
XLogRecPtr recptr;
1432+
1433+
xlrec.xcnt = CurrRunningXacts->xcnt;
1434+
xlrec.subxcnt = CurrRunningXacts->subxcnt;
1435+
xlrec.subxid_overflow = CurrRunningXacts->subxid_overflow;
1436+
xlrec.nextXid = CurrRunningXacts->nextXid;
1437+
xlrec.oldestRunningXid = CurrRunningXacts->oldestRunningXid;
1438+
xlrec.latestCompletedXid = CurrRunningXacts->latestCompletedXid;
1439+
strlcpy(xlrec.rpName, rpName, MAXFNAMELEN);
1440+
1441+
/* Header */
1442+
XLogBeginInsert();
1443+
XLogSetRecordFlags(XLOG_MARK_UNIMPORTANT);
1444+
XLogRegisterData((char *) (&xlrec), MinSizeOfXactRpRunningXacts);
1445+
1446+
/* array of TransactionIds */
1447+
if (xlrec.xcnt > 0)
1448+
XLogRegisterData((char *) CurrRunningXacts->xids,
1449+
(xlrec.xcnt + xlrec.subxcnt) * sizeof(TransactionId));
1450+
1451+
recptr = XLogInsert(RM_STANDBY_ID, XLOG_RESTORE_POINT_RUNNING_XACTS);
1452+
1453+
/* report error if subxid_overflow */
1454+
if (CurrRunningXacts->subxid_overflow)
1455+
elog(ERROR,
1456+
"snapshot of %u running transactions overflowed (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1457+
CurrRunningXacts->xcnt,
1458+
LSN_FORMAT_ARGS(recptr),
1459+
CurrRunningXacts->oldestRunningXid,
1460+
CurrRunningXacts->latestCompletedXid,
1461+
CurrRunningXacts->nextXid);
1462+
else
1463+
elog(trace_recovery(DEBUG2),
1464+
"snapshot of %u+%u running transaction ids (lsn %X/%X oldest xid %u latest complete %u next xid %u)",
1465+
CurrRunningXacts->xcnt, CurrRunningXacts->subxcnt,
1466+
LSN_FORMAT_ARGS(recptr),
1467+
CurrRunningXacts->oldestRunningXid,
1468+
CurrRunningXacts->latestCompletedXid,
1469+
CurrRunningXacts->nextXid);
1470+
1471+
/*
1472+
* Ensure running_xacts information is synced to disk not too far in the
1473+
* future. We don't want to stall anything though (i.e. use XLogFlush()),
1474+
* so we let the wal writer do it during normal operation.
1475+
* XLogSetAsyncXactLSN() conveniently will mark the LSN as to-be-synced
1476+
* and nudge the WALWriter into action if sleeping. Check
1477+
* XLogBackgroundFlush() for details why a record might not be flushed
1478+
* without it.
1479+
*/
1480+
XLogSetAsyncXactLSN(recptr);
1481+
1482+
return recptr;
1483+
}
13731484
/*
13741485
* Wholesale logging of AccessExclusiveLocks. Other lock types need not be
13751486
* logged, as described in backend/storage/lmgr/README.

src/backend/utils/misc/guc_gp.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "postmaster/postmaster.h"
4848
#include "replication/walsender.h"
4949
#include "storage/proc.h"
50+
#include "storage/standby.h"
5051
#include "task/pg_cron.h"
5152
#include "tcop/idle_resource_cleaner.h"
5253
#include "utils/builtins.h"
@@ -5006,6 +5007,16 @@ struct config_string ConfigureNamesString_gp[] =
50065007
NULL, NULL, NULL
50075008
},
50085009

5010+
{
5011+
{"gp_hot_standby_snapshot_restore_point_name", PGC_SUSET, DEVELOPER_OPTIONS,
5012+
gettext_noop("Specifies the hot standby snapshot's restore point name."),
5013+
gettext_noop(""),
5014+
GUC_NOT_IN_SAMPLE
5015+
},
5016+
&gp_hot_standby_snapshot_restore_point_name,
5017+
"",
5018+
NULL, NULL, NULL
5019+
},
50095020
/* End-of-list marker */
50105021
{
50115022
{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL

src/include/storage/standby.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ extern int vacuum_defer_cleanup_age;
2525
extern int max_standby_archive_delay;
2626
extern int max_standby_streaming_delay;
2727
extern bool log_recovery_conflict_waits;
28+
extern char *gp_hot_standby_snapshot_restore_point_name;
2829

2930
extern void InitRecoveryTransactionEnvironment(void);
3031
extern void ShutdownRecoveryTransactionEnvironment(void);
@@ -60,6 +61,7 @@ extern void StandbyReleaseAllLocks(void);
6061
extern void StandbyReleaseOldLocks(TransactionId oldxid);
6162

6263
#define MinSizeOfXactRunningXacts offsetof(xl_running_xacts, xids)
64+
#define MinSizeOfXactRpRunningXacts offsetof(xl_restore_point_running_xacts, xids)
6365

6466

6567
/*
@@ -84,6 +86,7 @@ typedef struct RunningTransactionsData
8486
TransactionId latestCompletedXid; /* so we can set xmax */
8587

8688
TransactionId *xids; /* array of (sub)xids still running */
89+
char *restore_point_name;
8790
} RunningTransactionsData;
8891

8992
typedef RunningTransactionsData *RunningTransactions;
@@ -95,4 +98,6 @@ extern XLogRecPtr LogStandbySnapshot(void);
9598
extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
9699
bool relcacheInitFileInval);
97100

101+
extern XLogRecPtr LogRestorePointStandbySnapshot(const char *rpName);
102+
98103
#endif /* STANDBY_H */

src/include/storage/standbydefs.h

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ extern void standby_desc_invalidations(StringInfo buf,
3131
/*
3232
* XLOG message types
3333
*/
34-
#define XLOG_STANDBY_LOCK 0x00
35-
#define XLOG_RUNNING_XACTS 0x10
36-
#define XLOG_INVALIDATIONS 0x20
37-
#define XLOG_LATESTCOMPLETED_GXID 0xF0
34+
#define XLOG_STANDBY_LOCK 0x00
35+
#define XLOG_RUNNING_XACTS 0x10
36+
#define XLOG_INVALIDATIONS 0x20
37+
#define XLOG_RESTORE_POINT_RUNNING_XACTS 0xE0
38+
#define XLOG_LATESTCOMPLETED_GXID 0xF0
3839

3940
typedef struct xl_standby_locks
4041
{
@@ -57,6 +58,20 @@ typedef struct xl_running_xacts
5758
TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
5859
} xl_running_xacts;
5960

61+
#define MAXFNAMELEN 64
62+
typedef struct xl_restore_point_running_xacts
63+
{
64+
int xcnt; /* # of xact ids in xids[] */
65+
int subxcnt; /* # of subxact ids in xids[] */
66+
bool subxid_overflow; /* snapshot overflowed, subxids missing */
67+
TransactionId nextXid; /* xid from ShmemVariableCache->nextXid */
68+
TransactionId oldestRunningXid; /* *not* oldestXmin */
69+
TransactionId latestCompletedXid; /* so we can set xmax */
70+
char rpName[MAXFNAMELEN]; /* cbdr: log the running xact with restore_point_name */
71+
72+
TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
73+
} xl_restore_point_running_xacts;
74+
6075
/*
6176
* Invalidations for standby, currently only when transactions without an
6277
* assigned xid commit.

src/include/utils/unsync_guc_name.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@
239239
"gp_max_slices",
240240
"gp_motion_cost_per_row",
241241
"gp_pause_on_restore_point_replay",
242+
"gp_hot_standby_snapshot_restore_point_name",
242243
"gp_predicate_pushdown_sample_rows",
243244
"gp_print_create_gang_time",
244245
"gp_qd_hostname",

src/test/isolation2/hot_standby_schedule

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ test: hot_standby/transaction_isolation
44
test: hot_standby/query_conflict
55
test: hot_standby/faults
66
test: hot_standby/teardown
7+
test: hot_standby/snapshot
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
----------------------------------------------------------------
2+
-- Test guc gp_hot_standby_snapshot_restore_point_name
3+
-- when the guc set, for example rp1, the hot_standby query only can see
4+
-- the data commited before rp1.
5+
----------------------------------------------------------------
6+
!\retcode gpconfig -c gp_hot_standby_snapshot_restore_point_name -v "rp1";
7+
!\retcode gpstop -ar;
8+
9+
1: show gp_hot_standby_snapshot_restore_point_name;
10+
1: create table hs_sh(a int);
11+
1: insert into hs_sh select * from generate_series(1,10);
12+
13+
-1S: select * from hs_sh;;
14+
15+
1: select gp_create_restore_point('rp1');
16+
17+
-1S: select * from hs_sh;
18+
19+
20+

0 commit comments

Comments
 (0)