Skip to content

Commit 26ef7c5

Browse files
committed
Keep synchronizing slots when others are lagging on primary.
Instead of blocking indefinitely for a replication slot to be syncable, introduce a new GUC pg_failover_slots.sync_timeout after which we will move to the next one. To avoid waiting from scratch, we create the replication as temporary ones instead of ephemeral ones, allowing them to keep their state between runs. When the slot is finally synced, we persist it to disk. Since we do not block in waiting state anymore, we need to cleanup the inconsistent slots after promotion.
1 parent 4c95505 commit 26ef7c5

File tree

1 file changed

+146
-75
lines changed

1 file changed

+146
-75
lines changed

pg_failover_slots.c

Lines changed: 146 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ XLogRecPtr standby_slot_names_oldest_flush_lsn = InvalidXLogRecPtr;
107107
/* Slots to sync */
108108
char *pg_failover_slots_dsn;
109109
char *pg_failover_slot_names;
110+
int pg_failover_slots_sync_timeout;
110111
static char *pg_failover_slot_names_str = NULL;
111112
static List *pg_failover_slot_names_list = NIL;
112113
static bool pg_failover_slots_drop = true;
@@ -116,6 +117,12 @@ char *pg_failover_slots_version_str;
116117
void _PG_init(void);
117118
PGDLLEXPORT void pg_failover_slots_main(Datum main_arg);
118119

120+
typedef enum SlotCatchupState {
121+
CatchupSlotDrop = 0,
122+
CatchupSlotSucceeded = 1,
123+
CatchupSlotDirty = 2,
124+
} SlotCatchupState;
125+
119126
static bool
120127
check_failover_slot_names(char **newval, void **extra, GucSource source)
121128
{
@@ -531,14 +538,16 @@ remote_connect(const char *connstr, const char *appname)
531538
* relies on us having already reserved the WAL for the old position of
532539
* `remote_slot` so `slot` can't continue to advance.
533540
*/
534-
static bool
541+
static SlotCatchupState
535542
wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
536543
{
537544
List *slots;
538545
PGconn *conn;
539546
StringInfoData connstr;
540547
TimestampTz cb_wait_start =
541548
0; /* first invocation should happen immediately */
549+
TimestampTz wait_start = GetCurrentTimestamp();
550+
TimestampTz now = 0;
542551

543552
elog(
544553
LOG,
@@ -581,7 +590,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
581590
"replication slot sync wait for slot %s interrupted by promotion",
582591
remote_slot->name)));
583592
PQfinish(conn);
584-
return false;
593+
return CatchupSlotDrop;
585594
}
586595

587596
filter->key = FAILOVERSLOT_FILTER_NAME;
@@ -592,7 +601,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
592601
{
593602
/* Slot on provider vanished */
594603
PQfinish(conn);
595-
return false;
604+
return CatchupSlotDrop;
596605
}
597606

598607
receivePtr = GetWalRcvFlushRecPtr(NULL, NULL);
@@ -613,14 +622,29 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
613622
remote_slot->confirmed_lsn = new_slot->confirmed_lsn;
614623
remote_slot->catalog_xmin = new_slot->catalog_xmin;
615624
PQfinish(conn);
616-
return true;
625+
return CatchupSlotSucceeded;
617626
}
618627

619628
/*
620629
* Invoke any callbacks that will help move the slots along
621630
*/
631+
now = GetCurrentTimestamp();
632+
if (pg_failover_slots_sync_timeout >= 0 && TimestampDifferenceExceeds(
633+
wait_start, now, pg_failover_slots_sync_timeout))
634+
{
635+
elog(
636+
LOG,
637+
"Give up on waiting for remote slot %s lsn (%X/%X) and catalog xmin (%u) to pass local slot lsn (%X/%X) and catalog xmin (%u)",
638+
remote_slot->name, (uint32) (new_slot->restart_lsn >> 32),
639+
(uint32) (new_slot->restart_lsn), new_slot->catalog_xmin,
640+
(uint32) (slot->data.restart_lsn >> 32),
641+
(uint32) (slot->data.restart_lsn),
642+
slot->data.catalog_xmin);
643+
return CatchupSlotDirty;
644+
}
645+
622646
if (TimestampDifferenceExceeds(
623-
cb_wait_start, GetCurrentTimestamp(),
647+
cb_wait_start, now,
624648
Min(wal_retrieve_retry_interval * 5, PG_WAIT_EXTENSION)))
625649
{
626650
if (cb_wait_start > 0)
@@ -636,6 +660,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
636660
cb_wait_start = GetCurrentTimestamp();
637661
}
638662

663+
639664
rc =
640665
WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
641666
wal_retrieve_retry_interval, PG_WAIT_EXTENSION);
@@ -645,6 +670,16 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
645670

646671

647672
ResetLatch(MyLatch);
673+
674+
/*
675+
* The user may change pg_failover_slots_sync_timeout, so update it if needed.
676+
*/
677+
if (ConfigReloadPending)
678+
{
679+
ConfigReloadPending = false;
680+
ProcessConfigFile(PGC_SIGHUP);
681+
}
682+
648683
}
649684
}
650685

@@ -666,6 +701,7 @@ synchronize_one_slot(RemoteSlot *remote_slot)
666701
{
667702
int i;
668703
bool found = false;
704+
SlotCatchupState slot_state = CatchupSlotDirty;
669705

670706
if (!RecoveryInProgress())
671707
{
@@ -711,42 +747,10 @@ synchronize_one_slot(RemoteSlot *remote_slot)
711747
if (found)
712748
{
713749
ReplicationSlotAcquire(remote_slot->name, true);
714-
715-
/*
716-
* We can't satisfy this remote slot's requirements with our known-safe
717-
* local restart_lsn, catalog_xmin and xmin.
718-
*
719-
* This shouldn't happen for existing slots unless someone else messed
720-
* with our physical replication slot on the master.
721-
*/
722-
if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
723-
TransactionIdPrecedes(remote_slot->catalog_xmin,
724-
MyReplicationSlot->data.catalog_xmin))
725-
{
726-
elog(
727-
WARNING,
728-
"not synchronizing slot %s; synchronization would move it backward",
729-
remote_slot->name);
730-
731-
ReplicationSlotRelease();
732-
PopActiveSnapshot();
733-
CommitTransactionCommand();
734-
return;
735-
}
736-
737-
LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
738-
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
739-
remote_slot->catalog_xmin);
740-
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
741-
remote_slot->restart_lsn);
742-
ReplicationSlotMarkDirty();
743-
ReplicationSlotSave();
744-
745-
elog(
746-
DEBUG2,
747-
"synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)",
748-
remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32),
749-
(uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin);
750+
if (MyReplicationSlot->data.persistency == RS_PERSISTENT)
751+
slot_state = CatchupSlotSucceeded;
752+
else
753+
slot_state = CatchupSlotDirty;
750754
}
751755
/*
752756
* Otherwise create the local slot and initialize it to the state of the
@@ -764,10 +768,10 @@ synchronize_one_slot(RemoteSlot *remote_slot)
764768
* don't want it to persist if we fail.
765769
*/
766770
#if PG_VERSION_NUM >= 140000
767-
ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL,
771+
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
768772
remote_slot->two_phase);
769773
#else
770-
ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL);
774+
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY);
771775
#endif
772776
slot = MyReplicationSlot;
773777

@@ -790,56 +794,83 @@ synchronize_one_slot(RemoteSlot *remote_slot)
790794
slot->data.catalog_xmin = xmin_horizon;
791795
ReplicationSlotsComputeRequiredXmin(true);
792796
LWLockRelease(ProcArrayLock);
797+
slot_state = CatchupSlotDirty;
798+
}
793799

800+
if (slot_state == CatchupSlotSucceeded)
801+
{
794802
/*
795-
* Our xmin and/or catalog_xmin may be > that required by one or more
796-
* of the slots we are trying to sync from the master, and/or we don't
797-
* have enough retained WAL for the slot's restart_lsn.
798-
*
799-
* If we persist the slot locally in that state it'll make a false
800-
* promise we can't satisfy.
801-
*
802-
* This can happen if this replica is fairly new or has only recently
803-
* started failover slot sync.
803+
* We can't satisfy this remote slot's requirements with our known-safe
804+
* local restart_lsn, catalog_xmin and xmin.
804805
*
805-
* TODO: Don't stop synchronization of other slots for this, we can't
806-
* add timeout because that could result in some slots never being
807-
* synchronized as they will always be behind the physical slot.
806+
* This shouldn't happen for existing slots unless someone else messed
807+
* with our physical replication slot on the master.
808808
*/
809809
if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
810810
TransactionIdPrecedes(remote_slot->catalog_xmin,
811811
MyReplicationSlot->data.catalog_xmin))
812812
{
813-
if (!wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot))
813+
elog(
814+
WARNING,
815+
"not synchronizing slot %s; synchronization would move it backward",
816+
remote_slot->name);
817+
ReplicationSlotRelease();
818+
PopActiveSnapshot();
819+
CommitTransactionCommand();
820+
return;
821+
}
822+
823+
LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
824+
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
825+
remote_slot->catalog_xmin);
826+
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
827+
remote_slot->restart_lsn);
828+
ReplicationSlotMarkDirty();
829+
ReplicationSlotSave();
830+
elog(
831+
DEBUG2,
832+
"synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)",
833+
remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32),
834+
(uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin);
835+
} else {
836+
if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
837+
TransactionIdPrecedes(remote_slot->catalog_xmin,
838+
MyReplicationSlot->data.catalog_xmin))
839+
{
840+
slot_state = wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot);
841+
if (slot_state == CatchupSlotDrop)
814842
{
815843
/* Provider slot didn't catch up to locally reserved position
816844
*/
817845
ReplicationSlotRelease();
846+
ReplicationSlotDrop(remote_slot->name, false);
818847
PopActiveSnapshot();
819848
CommitTransactionCommand();
820849
return;
821850
}
851+
} else {
852+
slot_state = CatchupSlotSucceeded;
822853
}
823854

824855
/*
825856
* We can locally satisfy requirements of remote slot's current
826857
* position now. Apply the new position if any and make it persistent.
827858
*/
828-
LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
829-
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
830-
remote_slot->catalog_xmin);
831-
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
832-
remote_slot->restart_lsn);
833-
ReplicationSlotMarkDirty();
834-
835-
ReplicationSlotPersist();
836-
859+
if (slot_state == CatchupSlotSucceeded)
860+
{
861+
LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn);
862+
LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn,
863+
remote_slot->catalog_xmin);
864+
LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn,
865+
remote_slot->restart_lsn);
866+
ReplicationSlotMarkDirty();
867+
ReplicationSlotPersist();
868+
}
837869
elog(DEBUG1,
838870
"synchronized new slot %s to lsn (%X/%X) and catalog xmin (%u)",
839871
remote_slot->name, (uint32) (remote_slot->restart_lsn >> 32),
840872
(uint32) (remote_slot->restart_lsn), remote_slot->catalog_xmin);
841873
}
842-
843874
ReplicationSlotRelease();
844875
PopActiveSnapshot();
845876
CommitTransactionCommand();
@@ -939,7 +970,7 @@ synchronize_failover_slots(long sleep_time)
939970
bool active;
940971
bool found = false;
941972

942-
active = (s->active_pid != 0);
973+
active = (s->active_pid != 0 && s->active_pid != MyProcPid);
943974

944975
/* Only check inactive slots. */
945976
if (!s->in_use || active)
@@ -1048,13 +1079,6 @@ synchronize_failover_slots(long sleep_time)
10481079
if (remote_slot->confirmed_lsn > receivePtr)
10491080
remote_slot->confirmed_lsn = receivePtr;
10501081

1051-
/*
1052-
* For simplicity we always move restart_lsn of all slots to the
1053-
* restart_lsn needed by the furthest-behind master slot.
1054-
*/
1055-
if (remote_slot->restart_lsn > lsn)
1056-
remote_slot->restart_lsn = lsn;
1057-
10581082
synchronize_one_slot(remote_slot);
10591083
nslots++;
10601084
}
@@ -1069,6 +1093,41 @@ synchronize_failover_slots(long sleep_time)
10691093
return sleep_time;
10701094
}
10711095

1096+
1097+
/*
1098+
* After a promotion, we need to clean up the unpersisted replication slots we created while in recovery.
1099+
* If they have never been persisted, it means they are in an incosistent state.
1100+
*/
1101+
static void
1102+
cleanup_failover_slots_after_promotion()
1103+
{
1104+
int i;
1105+
for (;;)
1106+
{
1107+
char * dropslot = NULL;
1108+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1109+
for (i = 0; i < max_replication_slots; i++)
1110+
{
1111+
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1112+
1113+
if (s->active_pid == MyProcPid && s->data.persistency == RS_TEMPORARY)
1114+
{
1115+
dropslot = pstrdup(NameStr(s->data.name));
1116+
break;
1117+
}
1118+
}
1119+
LWLockRelease(ReplicationSlotControlLock);
1120+
if (dropslot)
1121+
{
1122+
elog(WARNING, "dropping inconsistent replication slot after promotion \"%s\"", dropslot);
1123+
ReplicationSlotDrop(dropslot, false);
1124+
pfree(dropslot);
1125+
}
1126+
else
1127+
break;
1128+
}
1129+
}
1130+
10721131
void
10731132
pg_failover_slots_main(Datum main_arg)
10741133
{
@@ -1096,9 +1155,14 @@ pg_failover_slots_main(Datum main_arg)
10961155
CHECK_FOR_INTERRUPTS();
10971156

10981157
if (RecoveryInProgress())
1158+
{
10991159
sleep_time = synchronize_failover_slots(WORKER_NAP_TIME);
1160+
}
11001161
else
1162+
{
1163+
cleanup_failover_slots_after_promotion();
11011164
sleep_time = WORKER_NAP_TIME * 10;
1165+
}
11021166

11031167
rc =
11041168
WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
@@ -1474,6 +1538,13 @@ _PG_init(void)
14741538
&pg_failover_slots_dsn, "", PGC_SIGHUP, GUC_SUPERUSER_ONLY, NULL, NULL,
14751539
NULL);
14761540

1541+
DefineCustomIntVariable(
1542+
"pg_failover_slots.sync_timeout",
1543+
"timeout when waiting for a slot to be persisted",
1544+
"If set to -1 (the default), we wait forever meaning we could hang up"
1545+
"up on one slot while other slots are ok to be synced.",
1546+
&pg_failover_slots_sync_timeout, -1, -1, INT_MAX, PGC_SIGHUP,
1547+
GUC_UNIT_MS, NULL, NULL, NULL);
14771548

14781549
if (IsBinaryUpgrade)
14791550
return;
@@ -1486,7 +1557,7 @@ _PG_init(void)
14861557
snprintf(bgw.bgw_library_name, BGW_MAXLEN, EXTENSION_NAME);
14871558
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "pg_failover_slots_main");
14881559
snprintf(bgw.bgw_name, BGW_MAXLEN, "pg_failover_slots worker");
1489-
bgw.bgw_restart_time = 60;
1560+
bgw.bgw_restart_time = 10;
14901561

14911562
RegisterBackgroundWorker(&bgw);
14921563

0 commit comments

Comments
 (0)