@@ -399,29 +399,35 @@ def create_sub(self, node_dsn: str, subscription_name: str, provider_dsn: str,
399399 if self .verbose :
400400 self .info (f"Subscription { subscription_name } created remotely" )
401401
402- def create_replication_slot (self , node_dsn : str , slot_name : str , plugin : str = "spock_output" ):
403- """Create a logical replication slot on a remote node"""
402+ def create_replication_slot (self , node_dsn : str , slot_name : str , plugin : str = "spock_output" ) -> Optional [ str ] :
403+ """Create a logical replication slot on a remote node and return the LSN """
404404 # Check if slot already exists
405405 sql = f"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '{ slot_name } ';"
406406 count = self .run_psql (node_dsn , sql , fetch = True , return_single = True )
407407
408408 if count and int (count .strip ()) > 0 :
409409 if self .verbose :
410410 self .info (f"Replication slot '{ slot_name } ' already exists. Skipping creation." )
411- return
412-
411+ return None
412+
413413 sql = f"SELECT slot_name, lsn FROM pg_create_logical_replication_slot('{ slot_name } ', '{ plugin } ');"
414414
415415 if self .verbose :
416416 self .info (f"[QUERY] { sql } " )
417-
418- result = self .run_psql (node_dsn , sql )
417+
418+ # Fetch the result to get the LSN
419+ result = self .run_psql (node_dsn , sql , fetch = True , return_single = True )
419420 if result is None :
420421 if self .verbose :
421422 self .info (f"Replication slot '{ slot_name } ' may already exist or creation failed." )
423+ return None
422424 else :
425+ # Parse the result to extract LSN (format: "slot_name|lsn")
426+ parts = result .split ('|' )
427+ lsn = parts [1 ].strip () if len (parts ) > 1 else None
423428 if self .verbose :
424- self .info (f"Created replication slot '{ slot_name } ' with plugin '{ plugin } ' on remote node." )
429+ self .info (f"Created replication slot '{ slot_name } ' with plugin '{ plugin } ' on remote node (LSN: { lsn } )." )
430+ return lsn
425431
426432 def create_disable_subscriptions_and_slots (self , src_node_name : str , src_dsn : str ,
427433 new_node_name : str , new_node_dsn : str ):
@@ -438,7 +444,7 @@ def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: st
438444 if rec ['node_name' ] == src_node_name :
439445 continue
440446
441- # Create replication slot
447+ # Create replication slot and capture the commit LSN
442448 dbname = "pgedge" # Default database name
443449 if "dbname=" in rec ['dsn' ]:
444450 dbname = rec ['dsn' ].split ("dbname=" )[1 ].split ()[0 ]
@@ -447,12 +453,17 @@ def create_disable_subscriptions_and_slots(self, src_node_name: str, src_dsn: st
447453 if len (slot_name ) > 64 :
448454 slot_name = slot_name [:64 ]
449455
450- self .create_replication_slot (rec ['dsn' ], slot_name )
451- self .notice (f" OK: Creating replication slot { slot_name } on node { rec ['node_name' ]} " )
456+ commit_lsn = self .create_replication_slot (rec ['dsn' ], slot_name )
457+ self .notice (f" OK: Creating replication slot { slot_name } (LSN: { commit_lsn } ) on node { rec ['node_name' ]} " )
452458
453459 # Trigger sync event on origin node and store LSN for later use
454460 sync_lsn = self .sync_event (rec ['dsn' ])
455- self .sync_lsns [rec ['node_name' ]] = sync_lsn
461+
462+ # Store both sync_lsn and commit_lsn
463+ self .sync_lsns [rec ['node_name' ]] = {
464+ 'sync_lsn' : sync_lsn ,
465+ 'commit_lsn' : commit_lsn
466+ }
456467 self .notice (f" OK: Triggering sync event on node { rec ['node_name' ]} (LSN: { sync_lsn } )" )
457468
458469 # Create disabled subscription
@@ -576,42 +587,38 @@ def get_commit_timestamp(self, node_dsn: str, origin: str, receiver: str) -> str
576587 result = self .run_psql (node_dsn , sql , fetch = True , return_single = True )
577588 return result
578589
579- def advance_replication_slot (self , node_dsn : str , slot_name : str , sync_timestamp : str ):
580- """Advance a replication slot to a specific timestamp """
581- if not sync_timestamp :
590+ def advance_replication_slot (self , node_dsn : str , slot_name : str , target_lsn : str ):
591+ """Advance a replication slot to a specific LSN """
592+ if not target_lsn :
582593 if self .verbose :
583- self .info (f"Commit timestamp is NULL, skipping slot advance for slot '{ slot_name } '." )
594+ self .info (f"Target LSN is NULL, skipping slot advance for slot '{ slot_name } '." )
584595 return
585-
586- sql = f"""
587- WITH lsn_cte AS (
588- SELECT spock.get_lsn_from_commit_ts('{ slot_name } ', '{ sync_timestamp } ') AS lsn
589- )
590- SELECT pg_replication_slot_advance('{ slot_name } ', lsn) FROM lsn_cte;
591- """
592-
596+
597+ sql = f"SELECT pg_replication_slot_advance('{ slot_name } ', '{ target_lsn } '::pg_lsn);"
598+
593599 if self .verbose :
594600 self .info (f"[QUERY] { sql } " )
595601
596602 self .run_psql (node_dsn , sql )
597603
598- def check_commit_timestamp_and_advance_slot (self , src_node_name : str , src_dsn : str ,
604+ def check_commit_timestamp_and_advance_slot (self , src_node_name : str , src_dsn : str ,
599605 new_node_name : str , new_node_dsn : str ):
600- """Phase 7: Check commit timestamp and advance replication slot"""
601- self .notice ("Phase 7: Checking commit timestamp and advancing replication slot" )
602-
606+ """Phase 7: Check commit LSN and advance replication slot"""
607+ self .notice ("Phase 7: Checking commit LSN and advancing replication slot" )
608+
603609 # Get all nodes from source cluster
604610 nodes = self .get_spock_nodes (src_dsn )
605611
606612 for rec in nodes :
607613 if rec ['node_name' ] == src_node_name :
608614 continue
609-
610- # Get commit timestamp
611- sync_timestamp = self .get_commit_timestamp (new_node_dsn , src_node_name , rec ['node_name' ])
612- if sync_timestamp :
613- self .notice (f" OK: Found commit timestamp for { src_node_name } ->{ rec ['node_name' ]} : { sync_timestamp } " )
614-
615+
616+ # Get the stored commit LSN from when subscription was created
617+ commit_lsn = self .sync_lsns [rec ['node_name' ]]['commit_lsn' ]
618+
619+ if commit_lsn :
620+ self .notice (f" OK: Found commit LSN for { rec ['node_name' ]} (LSN: { commit_lsn } )..." )
621+
615622 # Advance replication slot
616623 dbname = "pgedge"
617624 if "dbname=" in rec ['dsn' ]:
@@ -625,18 +632,15 @@ def check_commit_timestamp_and_advance_slot(self, src_node_name: str, src_dsn: s
625632 self .info (f"[QUERY] { sql } " )
626633
627634 current_lsn = self .run_psql (rec ['dsn' ], sql , fetch = True , return_single = True )
628-
629- # Get target LSN
630- sql = f"SELECT spock.get_lsn_from_commit_ts('{ slot_name } ', '{ sync_timestamp } ')"
631- if self .verbose :
632- self .info (f"[QUERY] { sql } " )
633-
634- target_lsn = self .run_psql (rec ['dsn' ], sql , fetch = True , return_single = True )
635-
635+
636+ target_lsn = commit_lsn
637+
636638 if current_lsn and target_lsn and current_lsn >= target_lsn :
637639 self .notice (f" - Slot { slot_name } already at or beyond target LSN (current: { current_lsn } , target: { target_lsn } )" )
638640 else :
639- self .advance_replication_slot (rec ['dsn' ], slot_name , sync_timestamp )
641+ self .advance_replication_slot (rec ['dsn' ], slot_name , target_lsn )
642+ else :
643+ self .notice (f" - No commit LSN found for { rec ['node_name' ]} ->{ new_node_name } " )
640644
641645 def enable_sub (self , node_dsn : str , sub_name : str , immediate : bool = True ):
642646 """Enable a subscription on a remote node"""
@@ -689,7 +693,8 @@ def enable_disabled_subscriptions(self, src_node_name: str, src_dsn: str,
689693 # Wait for the sync event that was captured when subscription was created
690694 # This ensures the subscription starts replicating from the correct sync point
691695 timeout_ms = 1200 # 20 minutes
692- sync_lsn = self .sync_lsns .get (rec ['node_name' ]) # Use stored sync LSN from Phase 3
696+ sync_lsn = self .sync_lsns [rec ['node_name' ]]['sync_lsn' ] # Use stored sync LSN from Phase 3
697+
693698 if sync_lsn :
694699 self .notice (f" OK: Using stored sync event from origin node { rec ['node_name' ]} (LSN: { sync_lsn } )..." )
695700
0 commit comments