@@ -107,6 +107,7 @@ XLogRecPtr standby_slot_names_oldest_flush_lsn = InvalidXLogRecPtr;
107107/* Slots to sync */
108108char * pg_failover_slots_dsn ;
109109char * pg_failover_slot_names ;
110+ int pg_failover_slots_sync_timeout ;
110111static char * pg_failover_slot_names_str = NULL ;
111112static List * pg_failover_slot_names_list = NIL ;
112113static bool pg_failover_slots_drop = true;
@@ -116,6 +117,12 @@ char *pg_failover_slots_version_str;
116117void _PG_init (void );
117118PGDLLEXPORT void pg_failover_slots_main (Datum main_arg );
118119
120+ typedef enum SlotCatchupState {
121+ CatchupSlotDrop = 0 ,
122+ CatchupSlotSucceeded = 1 ,
123+ CatchupSlotDirty = 2 ,
124+ } SlotCatchupState ;
125+
119126static bool
120127check_failover_slot_names (char * * newval , void * * extra , GucSource source )
121128{
@@ -531,14 +538,16 @@ remote_connect(const char *connstr, const char *appname)
531538 * relies on us having already reserved the WAL for the old position of
532539 * `remote_slot` so `slot` can't continue to advance.
533540 */
534- static bool
541+ static SlotCatchupState
535542wait_for_primary_slot_catchup (ReplicationSlot * slot , RemoteSlot * remote_slot )
536543{
537544 List * slots ;
538545 PGconn * conn ;
539546 StringInfoData connstr ;
540547 TimestampTz cb_wait_start =
541548 0 ; /* first invocation should happen immediately */
549+ TimestampTz wait_start = GetCurrentTimestamp ();
550+ TimestampTz now = 0 ;
542551
543552 elog (
544553 LOG ,
@@ -581,7 +590,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
581590 "replication slot sync wait for slot %s interrupted by promotion" ,
582591 remote_slot -> name )));
583592 PQfinish (conn );
584- return false ;
593+ return CatchupSlotDrop ;
585594 }
586595
587596 filter -> key = FAILOVERSLOT_FILTER_NAME ;
@@ -592,7 +601,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
592601 {
593602 /* Slot on provider vanished */
594603 PQfinish (conn );
595- return false ;
604+ return CatchupSlotDrop ;
596605 }
597606
598607 receivePtr = GetWalRcvFlushRecPtr (NULL , NULL );
@@ -613,14 +622,30 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
613622 remote_slot -> confirmed_lsn = new_slot -> confirmed_lsn ;
614623 remote_slot -> catalog_xmin = new_slot -> catalog_xmin ;
615624 PQfinish (conn );
616- return true ;
625+ return CatchupSlotSucceeded ;
617626 }
618627
619628 /*
620629 * Invoke any callbacks that will help move the slots along
621630 */
631+ now = GetCurrentTimestamp ();
632+ if (pg_failover_slots_sync_timeout >= 0 && TimestampDifferenceExceeds (
633+ wait_start , now , pg_failover_slots_sync_timeout ))
634+ {
635+ elog (
636+ LOG ,
637+ "Give up on waiting for remote slot %s lsn (%X/%X) and catalog xmin (%u) to pass local slot lsn (%X/%X) and catalog xmin (%u)" ,
638+ remote_slot -> name , (uint32 ) (new_slot -> restart_lsn >> 32 ),
639+ (uint32 ) (new_slot -> restart_lsn ), new_slot -> catalog_xmin ,
640+ (uint32 ) (slot -> data .restart_lsn >> 32 ),
641+ (uint32 ) (slot -> data .restart_lsn ),
642+ slot -> data .catalog_xmin );
643+ PQfinish (conn );
644+ return CatchupSlotDirty ;
645+ }
646+
622647 if (TimestampDifferenceExceeds (
623- cb_wait_start , GetCurrentTimestamp () ,
648+ cb_wait_start , now ,
624649 Min (wal_retrieve_retry_interval * 5 , PG_WAIT_EXTENSION )))
625650 {
626651 if (cb_wait_start > 0 )
@@ -636,6 +661,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
636661 cb_wait_start = GetCurrentTimestamp ();
637662 }
638663
664+
639665 rc =
640666 WaitLatch (MyLatch , WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
641667 wal_retrieve_retry_interval , PG_WAIT_EXTENSION );
@@ -645,6 +671,16 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot)
645671
646672
647673 ResetLatch (MyLatch );
674+
675+ /*
676+ * The user may change pg_failover_slots_sync_timeout, so update it if needed.
677+ */
678+ if (ConfigReloadPending )
679+ {
680+ ConfigReloadPending = false;
681+ ProcessConfigFile (PGC_SIGHUP );
682+ }
683+
648684 }
649685}
650686
@@ -666,6 +702,7 @@ synchronize_one_slot(RemoteSlot *remote_slot)
666702{
667703 int i ;
668704 bool found = false;
705+ SlotCatchupState slot_state = CatchupSlotDirty ;
669706
670707 if (!RecoveryInProgress ())
671708 {
@@ -711,42 +748,10 @@ synchronize_one_slot(RemoteSlot *remote_slot)
711748 if (found )
712749 {
713750 ReplicationSlotAcquire (remote_slot -> name , true);
714-
715- /*
716- * We can't satisfy this remote slot's requirements with our known-safe
717- * local restart_lsn, catalog_xmin and xmin.
718- *
719- * This shouldn't happen for existing slots unless someone else messed
720- * with our physical replication slot on the master.
721- */
722- if (remote_slot -> restart_lsn < MyReplicationSlot -> data .restart_lsn ||
723- TransactionIdPrecedes (remote_slot -> catalog_xmin ,
724- MyReplicationSlot -> data .catalog_xmin ))
725- {
726- elog (
727- WARNING ,
728- "not synchronizing slot %s; synchronization would move it backward" ,
729- remote_slot -> name );
730-
731- ReplicationSlotRelease ();
732- PopActiveSnapshot ();
733- CommitTransactionCommand ();
734- return ;
735- }
736-
737- LogicalConfirmReceivedLocation (remote_slot -> confirmed_lsn );
738- LogicalIncreaseXminForSlot (remote_slot -> confirmed_lsn ,
739- remote_slot -> catalog_xmin );
740- LogicalIncreaseRestartDecodingForSlot (remote_slot -> confirmed_lsn ,
741- remote_slot -> restart_lsn );
742- ReplicationSlotMarkDirty ();
743- ReplicationSlotSave ();
744-
745- elog (
746- DEBUG2 ,
747- "synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)" ,
748- remote_slot -> name , (uint32 ) (remote_slot -> restart_lsn >> 32 ),
749- (uint32 ) (remote_slot -> restart_lsn ), remote_slot -> catalog_xmin );
751+ if (MyReplicationSlot -> data .persistency == RS_PERSISTENT )
752+ slot_state = CatchupSlotSucceeded ;
753+ else
754+ slot_state = CatchupSlotDirty ;
750755 }
751756 /*
752757 * Otherwise create the local slot and initialize it to the state of the
@@ -764,10 +769,10 @@ synchronize_one_slot(RemoteSlot *remote_slot)
764769 * don't want it to persist if we fail.
765770 */
766771#if PG_VERSION_NUM >= 140000
767- ReplicationSlotCreate (remote_slot -> name , true, RS_EPHEMERAL ,
772+ ReplicationSlotCreate (remote_slot -> name , true, RS_TEMPORARY ,
768773 remote_slot -> two_phase );
769774#else
770- ReplicationSlotCreate (remote_slot -> name , true, RS_EPHEMERAL );
775+ ReplicationSlotCreate (remote_slot -> name , true, RS_TEMPORARY );
771776#endif
772777 slot = MyReplicationSlot ;
773778
@@ -790,56 +795,83 @@ synchronize_one_slot(RemoteSlot *remote_slot)
790795 slot -> data .catalog_xmin = xmin_horizon ;
791796 ReplicationSlotsComputeRequiredXmin (true);
792797 LWLockRelease (ProcArrayLock );
798+ slot_state = CatchupSlotDirty ;
799+ }
793800
801+ if (slot_state == CatchupSlotSucceeded )
802+ {
794803 /*
795- * Our xmin and/or catalog_xmin may be > that required by one or more
796- * of the slots we are trying to sync from the master, and/or we don't
797- * have enough retained WAL for the slot's restart_lsn.
798- *
799- * If we persist the slot locally in that state it'll make a false
800- * promise we can't satisfy.
801- *
802- * This can happen if this replica is fairly new or has only recently
803- * started failover slot sync.
804+ * We can't satisfy this remote slot's requirements with our known-safe
805+ * local restart_lsn, catalog_xmin and xmin.
804806 *
805- * TODO: Don't stop synchronization of other slots for this, we can't
806- * add timeout because that could result in some slots never being
807- * synchronized as they will always be behind the physical slot.
807+ * This shouldn't happen for existing slots unless someone else messed
808+ * with our physical replication slot on the master.
808809 */
809810 if (remote_slot -> restart_lsn < MyReplicationSlot -> data .restart_lsn ||
810811 TransactionIdPrecedes (remote_slot -> catalog_xmin ,
811812 MyReplicationSlot -> data .catalog_xmin ))
812813 {
813- if (!wait_for_primary_slot_catchup (MyReplicationSlot , remote_slot ))
814+ elog (
815+ WARNING ,
816+ "not synchronizing slot %s; synchronization would move it backward" ,
817+ remote_slot -> name );
818+ ReplicationSlotRelease ();
819+ PopActiveSnapshot ();
820+ CommitTransactionCommand ();
821+ return ;
822+ }
823+
824+ LogicalConfirmReceivedLocation (remote_slot -> confirmed_lsn );
825+ LogicalIncreaseXminForSlot (remote_slot -> confirmed_lsn ,
826+ remote_slot -> catalog_xmin );
827+ LogicalIncreaseRestartDecodingForSlot (remote_slot -> confirmed_lsn ,
828+ remote_slot -> restart_lsn );
829+ ReplicationSlotMarkDirty ();
830+ ReplicationSlotSave ();
831+ elog (
832+ DEBUG2 ,
833+ "synchronized existing slot %s to lsn (%X/%X) and catalog xmin (%u)" ,
834+ remote_slot -> name , (uint32 ) (remote_slot -> restart_lsn >> 32 ),
835+ (uint32 ) (remote_slot -> restart_lsn ), remote_slot -> catalog_xmin );
836+ } else {
837+ if (remote_slot -> restart_lsn < MyReplicationSlot -> data .restart_lsn ||
838+ TransactionIdPrecedes (remote_slot -> catalog_xmin ,
839+ MyReplicationSlot -> data .catalog_xmin ))
840+ {
841+ slot_state = wait_for_primary_slot_catchup (MyReplicationSlot , remote_slot );
842+ if (slot_state == CatchupSlotDrop )
814843 {
815844 /* Provider slot didn't catch up to locally reserved position
816845 */
817846 ReplicationSlotRelease ();
847+ ReplicationSlotDrop (remote_slot -> name , false);
818848 PopActiveSnapshot ();
819849 CommitTransactionCommand ();
820850 return ;
821851 }
852+ } else {
853+ slot_state = CatchupSlotSucceeded ;
822854 }
823855
824856 /*
825857 * We can locally satisfy requirements of remote slot's current
826858 * position now. Apply the new position if any and make it persistent.
827859 */
828- LogicalConfirmReceivedLocation (remote_slot -> confirmed_lsn );
829- LogicalIncreaseXminForSlot (remote_slot -> confirmed_lsn ,
830- remote_slot -> catalog_xmin );
831- LogicalIncreaseRestartDecodingForSlot (remote_slot -> confirmed_lsn ,
832- remote_slot -> restart_lsn );
833- ReplicationSlotMarkDirty ();
834-
835- ReplicationSlotPersist ();
836-
860+ if (slot_state == CatchupSlotSucceeded )
861+ {
862+ LogicalConfirmReceivedLocation (remote_slot -> confirmed_lsn );
863+ LogicalIncreaseXminForSlot (remote_slot -> confirmed_lsn ,
864+ remote_slot -> catalog_xmin );
865+ LogicalIncreaseRestartDecodingForSlot (remote_slot -> confirmed_lsn ,
866+ remote_slot -> restart_lsn );
867+ ReplicationSlotMarkDirty ();
868+ ReplicationSlotPersist ();
869+ }
837870 elog (DEBUG1 ,
838871 "synchronized new slot %s to lsn (%X/%X) and catalog xmin (%u)" ,
839872 remote_slot -> name , (uint32 ) (remote_slot -> restart_lsn >> 32 ),
840873 (uint32 ) (remote_slot -> restart_lsn ), remote_slot -> catalog_xmin );
841874 }
842-
843875 ReplicationSlotRelease ();
844876 PopActiveSnapshot ();
845877 CommitTransactionCommand ();
@@ -939,7 +971,7 @@ synchronize_failover_slots(long sleep_time)
939971 bool active ;
940972 bool found = false;
941973
942- active = (s -> active_pid != 0 );
974+ active = (s -> active_pid != 0 && s -> active_pid != MyProcPid );
943975
944976 /* Only check inactive slots. */
945977 if (!s -> in_use || active )
@@ -1048,13 +1080,6 @@ synchronize_failover_slots(long sleep_time)
10481080 if (remote_slot -> confirmed_lsn > receivePtr )
10491081 remote_slot -> confirmed_lsn = receivePtr ;
10501082
1051- /*
1052- * For simplicity we always move restart_lsn of all slots to the
1053- * restart_lsn needed by the furthest-behind master slot.
1054- */
1055- if (remote_slot -> restart_lsn > lsn )
1056- remote_slot -> restart_lsn = lsn ;
1057-
10581083 synchronize_one_slot (remote_slot );
10591084 nslots ++ ;
10601085 }
@@ -1069,6 +1094,41 @@ synchronize_failover_slots(long sleep_time)
10691094 return sleep_time ;
10701095}
10711096
1097+
1098+ /*
1099+ * After a promotion, we need to clean up the unpersisted replication slots we created while in recovery.
1100+ * If they have never been persisted, it means they are in an incosistent state.
1101+ */
1102+ static void
1103+ cleanup_failover_slots_after_promotion ()
1104+ {
1105+ int i ;
1106+ for (;;)
1107+ {
1108+ char * dropslot = NULL ;
1109+ LWLockAcquire (ReplicationSlotControlLock , LW_SHARED );
1110+ for (i = 0 ; i < max_replication_slots ; i ++ )
1111+ {
1112+ ReplicationSlot * s = & ReplicationSlotCtl -> replication_slots [i ];
1113+
1114+ if (s -> active_pid == MyProcPid && s -> data .persistency == RS_TEMPORARY )
1115+ {
1116+ dropslot = pstrdup (NameStr (s -> data .name ));
1117+ break ;
1118+ }
1119+ }
1120+ LWLockRelease (ReplicationSlotControlLock );
1121+ if (dropslot )
1122+ {
1123+ elog (WARNING , "dropping inconsistent replication slot after promotion \"%s\"" , dropslot );
1124+ ReplicationSlotDrop (dropslot , false);
1125+ pfree (dropslot );
1126+ }
1127+ else
1128+ break ;
1129+ }
1130+ }
1131+
10721132void
10731133pg_failover_slots_main (Datum main_arg )
10741134{
@@ -1096,9 +1156,14 @@ pg_failover_slots_main(Datum main_arg)
10961156 CHECK_FOR_INTERRUPTS ();
10971157
10981158 if (RecoveryInProgress ())
1159+ {
10991160 sleep_time = synchronize_failover_slots (WORKER_NAP_TIME );
1161+ }
11001162 else
1163+ {
1164+ cleanup_failover_slots_after_promotion ();
11011165 sleep_time = WORKER_NAP_TIME * 10 ;
1166+ }
11021167
11031168 rc =
11041169 WaitLatch (MyLatch , WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
@@ -1474,6 +1539,13 @@ _PG_init(void)
14741539 & pg_failover_slots_dsn , "" , PGC_SIGHUP , GUC_SUPERUSER_ONLY , NULL , NULL ,
14751540 NULL );
14761541
1542+ DefineCustomIntVariable (
1543+ "pg_failover_slots.sync_timeout" ,
1544+ "timeout when waiting for a slot to be persisted" ,
1545+ "If set to -1 (the default), we wait forever meaning we could hang up"
1546+ "up on one slot while other slots are ok to be synced." ,
1547+ & pg_failover_slots_sync_timeout , -1 , -1 , INT_MAX , PGC_SIGHUP ,
1548+ GUC_UNIT_MS , NULL , NULL , NULL );
14771549
14781550 if (IsBinaryUpgrade )
14791551 return ;
@@ -1486,7 +1558,7 @@ _PG_init(void)
14861558 snprintf (bgw .bgw_library_name , BGW_MAXLEN , EXTENSION_NAME );
14871559 snprintf (bgw .bgw_function_name , BGW_MAXLEN , "pg_failover_slots_main" );
14881560 snprintf (bgw .bgw_name , BGW_MAXLEN , "pg_failover_slots worker" );
1489- bgw .bgw_restart_time = 60 ;
1561+ bgw .bgw_restart_time = 10 ;
14901562
14911563 RegisterBackgroundWorker (& bgw );
14921564
0 commit comments