@@ -288,15 +288,14 @@ static void update_progress_entry(Oid target_node_id,
288288static void check_and_update_progress (XLogRecPtr last_received_lsn ,
289289 TimestampTz timestamp );
290290
291- static ApplyReplayEntry *
292- apply_replay_entry_create (int r , char * buf );
293- static void
294- apply_replay_entry_free (ApplyReplayEntry * entry );
295- static void
296- apply_replay_queue_reset (void );
297-
291+ static ApplyReplayEntry * apply_replay_entry_create (int r , char * buf );
292+ static void apply_replay_entry_free (ApplyReplayEntry * entry );
293+ static void apply_replay_queue_reset (void );
294+ static void maybe_send_feedback (PGconn * applyconn , XLogRecPtr lsn_to_send ,
295+ TimestampTz * last_receive_timestamp );
298296static void append_feedback_position (XLogRecPtr recvpos );
299- static void get_feedback_position (XLogRecPtr * recvpos , XLogRecPtr * writepos , XLogRecPtr * flushpos , XLogRecPtr * max_recvpos );
297+ static void get_feedback_position (XLogRecPtr * recvpos , XLogRecPtr * writepos ,
298+ XLogRecPtr * flushpos , XLogRecPtr * max_recvpos );
300299
301300
302301/*
@@ -1868,7 +1867,9 @@ spock_apply_worker_shmem_exit(int code, Datum arg)
18681867 * on_proc_exit because the backend may also clean up the origin
18691868 * in certain cases, and we want to avoid duplicate cleanup.
18701869 */
1871- replorigin_session_reset ();
1870+ replorigin_session_origin = InvalidRepOriginId ;
1871+ replorigin_session_origin_lsn = InvalidXLogRecPtr ;
1872+ replorigin_session_origin_timestamp = 0 ;
18721873}
18731874
18741875/*
@@ -2740,7 +2741,6 @@ apply_work(PGconn *streamConn)
27402741 TimestampTz last_receive_timestamp = GetCurrentTimestamp ();
27412742 bool need_replay ;
27422743 ErrorData * edata ;
2743- RepOriginId originid ;
27442744
27452745 applyconn = streamConn ;
27462746 fd = PQsocket (applyconn );
@@ -2763,7 +2763,6 @@ apply_work(PGconn *streamConn)
27632763 if (MyApplyWorker -> apply_group == NULL )
27642764 spock_apply_worker_attach (); /* Attach this worker. */
27652765
2766- originid = replorigin_session_origin ;
27672766stream_replay :
27682767
27692768 need_replay = false;
@@ -2921,38 +2920,26 @@ apply_work(PGconn *streamConn)
29212920 {
29222921 XLogRecPtr start_lsn ;
29232922 XLogRecPtr end_lsn ;
2924- static int w_message_count = 0 ;
29252923
29262924 start_lsn = pq_getmsgint64 (msg );
29272925 end_lsn = pq_getmsgint64 (msg );
29282926 pq_getmsgint64 (msg ); /* sendTime */
29292927
2928+ /*
2929+ * Call maybe_send_feedback before last_received is updated.
2930+ * This ordering guarantees that feedback LSN never advertises
2931+ * a position beyond what has actually been received and processed.
2932+ * Prevents skipping over unapplied changes due to premature flush LSN.
2933+ */
2934+ maybe_send_feedback (applyconn , last_received ,
2935+ & last_receive_timestamp );
2936+
29302937 if (last_received < start_lsn )
29312938 last_received = start_lsn ;
29322939
29332940 if (last_received < end_lsn )
29342941 last_received = end_lsn ;
29352942
2936- w_message_count ++ ;
2937-
2938- /*
2939- * Send feedback if wal_sender_timeout/2 has passed or after 10 'w' messages.
2940- */
2941- if (TimestampDifferenceExceeds (last_receive_timestamp , GetCurrentTimestamp (), wal_sender_timeout / 2 ) ||
2942- w_message_count >= 10 )
2943- {
2944- elog (DEBUG2 , "SPOCK %s: force sending feedback after %d 'w' messages or timeout" ,
2945- MySubscription -> name , w_message_count );
2946- /*
2947- * We need to send feedback to the walsender process
2948- * to avoid remote wal_sender_timeout.
2949- */
2950- send_feedback (applyconn , last_received , GetCurrentTimestamp (), true);
2951- last_receive_timestamp = GetCurrentTimestamp ();
2952- w_message_count = 0 ;
2953- }
2954-
2955-
29562943 /*
29572944 * Append the entry to the end of the replay queue
29582945 * if we read it from the stream but check for overflow.
@@ -3162,8 +3149,6 @@ apply_work(PGconn *streamConn)
31623149 MemoryContextSwitchTo (MessageContext );
31633150 elog (LOG , "SPOCK: caught initial exception - %s" , edata -> message );
31643151
3165- /* reset replication session to avoid reuse of it after error. */
3166- replorigin_session_reset ();
31673152 FlushErrorState ();
31683153
31693154 MemoryContextReset (MessageContext );
@@ -3184,10 +3169,6 @@ apply_work(PGconn *streamConn)
31843169 {
31853170 MyApplyWorker -> use_try_block = true;
31863171
3187- /* Its possible that origin session may have been reset above */
3188- replorigin_session_setup (originid );
3189- replorigin_session_origin = originid ;
3190-
31913172 goto stream_replay ;
31923173 }
31933174
@@ -4173,3 +4154,34 @@ apply_replay_queue_reset(void)
41734154
41744155 MemoryContextReset (ApplyReplayContext );
41754156}
4157+
4158+ /*
4159+ * Check if we should send feedback based on message count or timeout.
4160+ * Resets internal state if feedback is sent.
4161+ */
4162+ static void
4163+ maybe_send_feedback (PGconn * applyconn , XLogRecPtr lsn_to_send ,
4164+ TimestampTz * last_receive_timestamp )
4165+ {
4166+ static int w_message_count = 0 ;
4167+ TimestampTz now = GetCurrentTimestamp ();
4168+
4169+ w_message_count ++ ;
4170+
4171+ /*
4172+ * Send feedback if wal_sender_timeout/2 has passed or after 10 'w' messages.
4173+ */
4174+ if (TimestampDifferenceExceeds (* last_receive_timestamp , now , wal_sender_timeout / 2 ) ||
4175+ w_message_count >= 10 )
4176+ {
4177+ elog (DEBUG2 , "SPOCK %s: force sending feedback after %d 'w' messages or timeout" ,
4178+ MySubscription -> name , w_message_count );
4179+ /*
4180+ * We need to send feedback to the walsender process
4181+ * to avoid remote wal_sender_timeout.
4182+ */
4183+ send_feedback (applyconn , lsn_to_send , now , true);
4184+ * last_receive_timestamp = now ;
4185+ w_message_count = 0 ;
4186+ }
4187+ }
0 commit comments