From 8a4e17b1202b3ae02bc98500f27b59c01a548d89 Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Thu, 6 Nov 2025 20:51:01 -0800 Subject: [PATCH] fix: make Postgres source change listening connection more stable --- src/ops/sources/postgres.rs | 42 +++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/src/ops/sources/postgres.rs b/src/ops/sources/postgres.rs index 194af8e9..e45b8333 100644 --- a/src/ops/sources/postgres.rs +++ b/src/ops/sources/postgres.rs @@ -12,6 +12,7 @@ use std::fmt::Write; type PgValueDecoder = fn(&sqlx::postgres::PgRow, usize) -> Result; +const LISTENER_HEARTBEAT_INTERVAL: std::time::Duration = std::time::Duration::from_secs(45); #[derive(Clone)] struct FieldSchemaInfo { schema: FieldSchema, @@ -527,12 +528,41 @@ impl SourceExecutor for PostgresSourceExecutor { listener.listen(¬ification_ctx.channel_name).await?; let stream = stream! { - while let Ok(notification) = listener.recv().await { - let change = self.parse_notification_payload(¬ification); - yield change.map(|change| SourceChangeMessage { - changes: vec![change], - ack_fn: None, - }); + loop { + let mut heartbeat = tokio::time::interval(LISTENER_HEARTBEAT_INTERVAL); + loop { + tokio::select! { + notification = listener.recv() => { + let notification = match notification { + Ok(notification) => notification, + Err(e) => { + warn!("Failed to receive notification from channel {}: {e:?}", notification_ctx.channel_name); + break; + } + }; + let change = self.parse_notification_payload(¬ification); + yield change.map(|change| SourceChangeMessage { + changes: vec![change], + ack_fn: None, + }); + } + + _ = heartbeat.tick() => { + let ok = tokio::time::timeout(std::time::Duration::from_secs(5), + sqlx::query("SELECT 1").execute(&mut listener) + ).await.is_ok(); + if !ok { + warn!("Listener heartbeat failed for channel {}", notification_ctx.channel_name); + break; + } + + } + } + } + std::mem::drop(listener); + info!("Reconnecting to listener {}", notification_ctx.channel_name); + listener = PgListener::connect_with(&self.db_pool).await?; + listener.listen(¬ification_ctx.channel_name).await?; } };