From 40e5883877da898cef0ece99c9e5ee0ede3b58c8 Mon Sep 17 00:00:00 2001 From: PonasKovas Date: Tue, 2 Sep 2025 19:55:42 +0300 Subject: [PATCH 1/5] remove specific IO error kind matching in PgListener::try_recv --- sqlx-postgres/src/listener.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/sqlx-postgres/src/listener.rs b/sqlx-postgres/src/listener.rs index 639ec95441..5c6a7e7f2a 100644 --- a/sqlx-postgres/src/listener.rs +++ b/sqlx-postgres/src/listener.rs @@ -283,15 +283,7 @@ impl PgListener { // The connection is dead, ensure that it is dropped, // update self state, and loop to try again. - Err(Error::Io(err)) - if matches!( - err.kind(), - io::ErrorKind::ConnectionAborted | - io::ErrorKind::UnexpectedEof | - // see ERRORS section in tcp(7) man page (https://man7.org/linux/man-pages/man7/tcp.7.html) - io::ErrorKind::TimedOut | - io::ErrorKind::BrokenPipe - ) => + Err(Error::Io(err)) => { if let Some(mut conn) = self.connection.take() { self.buffer_tx = conn.inner.stream.notifications.take(); From 59e3ca5af93a3b4cc2baf4cc141ae00081f594ad Mon Sep 17 00:00:00 2001 From: PonasKovas Date: Tue, 2 Sep 2025 21:10:29 +0300 Subject: [PATCH 2/5] add dbg! to identify where the error is coming from --- sqlx-postgres/src/listener.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sqlx-postgres/src/listener.rs b/sqlx-postgres/src/listener.rs index 5c6a7e7f2a..1c16e676d4 100644 --- a/sqlx-postgres/src/listener.rs +++ b/sqlx-postgres/src/listener.rs @@ -268,12 +268,12 @@ impl PgListener { let mut close_event = (!self.ignore_close_event).then(|| self.pool.close_event()); loop { - let next_message = self.connection().await?.inner.stream.recv_unchecked(); + let next_message = dbg!(self.connection().await)?.inner.stream.recv_unchecked(); let res = if let Some(ref mut close_event) = close_event { // cancels the wait and returns `Err(PoolClosed)` if the pool is closed // before `next_message` returns, or if the pool was already closed - close_event.do_until(next_message).await? + dbg!(close_event.do_until(next_message).await)? } else { next_message.await }; @@ -292,7 +292,7 @@ impl PgListener { } if self.eager_reconnect { - self.connect_if_needed().await?; + dbg!(self.connect_if_needed().await)?; } // lost connection @@ -313,7 +313,7 @@ impl PgListener { // Mark the connection as ready for another query BackendMessageFormat::ReadyForQuery => { - self.connection().await?.inner.pending_ready_for_query_count -= 1; + dbg!(self.connection().await)?.inner.pending_ready_for_query_count -= 1; } // Ignore unexpected messages From 964b672de66dda033c3d28e4cb5f0cad0bdf093d Mon Sep 17 00:00:00 2001 From: PonasKovas Date: Thu, 4 Sep 2025 16:11:31 +0300 Subject: [PATCH 3/5] rewrite PgListener recv and try_recv methods --- sqlx-postgres/src/listener.rs | 86 ++++++++++++++++++++++------------- 1 file changed, 54 insertions(+), 32 deletions(-) diff --git a/sqlx-postgres/src/listener.rs b/sqlx-postgres/src/listener.rs index 1c16e676d4..8552235ddc 100644 --- a/sqlx-postgres/src/listener.rs +++ b/sqlx-postgres/src/listener.rs @@ -1,5 +1,4 @@ use std::fmt::{self, Debug}; -use std::io; use std::str::from_utf8; use futures_channel::mpsc; @@ -197,6 +196,30 @@ impl PgListener { Ok(self.connection.as_mut().unwrap()) } + // same as `connection` but if fails to connect retries 5 times + #[inline] + async fn connection_with_recovery(&mut self) -> Result<&mut PgConnection, Error> { + // retry max 5 times with these backoff durations + let backoff_times = [0, 100, 1000, 2000, 10_000]; // ms + + let mut last_err = None; + for backoff_ms in backoff_times { + match self.connect_if_needed().await { + Ok(()) => return Ok(self.connection.as_mut().unwrap()), + Err(err @ Error::Io(_)) => { + last_err = Some(err); + + crate::rt::sleep(std::time::Duration::from_millis(backoff_ms)).await; + continue; + }, + Err(other) => return Err(other), + } + } + + // if 5 retries later still got IO error, return the last one and stop + Err(last_err.unwrap()) + } + /// Receives the next notification available from any of the subscribed channels. /// /// If the connection to PostgreSQL is lost, it is automatically reconnected on the next @@ -258,62 +281,61 @@ impl PgListener { /// /// [`eager_reconnect`]: PgListener::eager_reconnect pub async fn try_recv(&mut self) -> Result, Error> { + match self.recv_without_recovery().await { + Ok(notification) => return Ok(Some(notification)), + + // The connection is dead, ensure that it is dropped, + // update self state, and loop to try again. + Err(Error::Io(_)) => { + if let Some(mut conn) = self.connection.take() { + self.buffer_tx = conn.inner.stream.notifications.take(); + // Close the connection in a background task, so we can continue. + conn.close_on_drop(); + } + + if self.eager_reconnect { + self.connection_with_recovery().await?; + } + + // lost connection + return Ok(None); + }, + Err(e) => return Err(e), + } + } + + async fn recv_without_recovery(&mut self) -> Result { // Flush the buffer first, if anything // This would only fill up if this listener is used as a connection if let Some(notification) = self.next_buffered() { - return Ok(Some(notification)); + return Ok(notification); } // Fetch our `CloseEvent` listener, if applicable. let mut close_event = (!self.ignore_close_event).then(|| self.pool.close_event()); loop { - let next_message = dbg!(self.connection().await)?.inner.stream.recv_unchecked(); + let next_message = self.connection().await?.inner.stream.recv_unchecked(); let res = if let Some(ref mut close_event) = close_event { // cancels the wait and returns `Err(PoolClosed)` if the pool is closed // before `next_message` returns, or if the pool was already closed - dbg!(close_event.do_until(next_message).await)? + close_event.do_until(next_message).await? } else { next_message.await }; - let message = match res { - Ok(message) => message, - - // The connection is dead, ensure that it is dropped, - // update self state, and loop to try again. - Err(Error::Io(err)) => - { - if let Some(mut conn) = self.connection.take() { - self.buffer_tx = conn.inner.stream.notifications.take(); - // Close the connection in a background task, so we can continue. - conn.close_on_drop(); - } - - if self.eager_reconnect { - dbg!(self.connect_if_needed().await)?; - } - - // lost connection - return Ok(None); - } - - // Forward other errors - Err(error) => { - return Err(error); - } - }; + let message = res?; match message.format { // We've received an async notification, return it. BackendMessageFormat::NotificationResponse => { - return Ok(Some(PgNotification(message.decode()?))); + return Ok(PgNotification(message.decode()?)); } // Mark the connection as ready for another query BackendMessageFormat::ReadyForQuery => { - dbg!(self.connection().await)?.inner.pending_ready_for_query_count -= 1; + self.connection().await?.inner.pending_ready_for_query_count -= 1; } // Ignore unexpected messages From 23ee1ce69406637c441e3a6944c1ba535b93fc2a Mon Sep 17 00:00:00 2001 From: PonasKovas Date: Thu, 4 Sep 2025 17:44:10 +0300 Subject: [PATCH 4/5] fix fmt and clippy --- sqlx-postgres/src/listener.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sqlx-postgres/src/listener.rs b/sqlx-postgres/src/listener.rs index 8552235ddc..776fceae33 100644 --- a/sqlx-postgres/src/listener.rs +++ b/sqlx-postgres/src/listener.rs @@ -211,11 +211,11 @@ impl PgListener { crate::rt::sleep(std::time::Duration::from_millis(backoff_ms)).await; continue; - }, + } Err(other) => return Err(other), } } - + // if 5 retries later still got IO error, return the last one and stop Err(last_err.unwrap()) } @@ -282,7 +282,7 @@ impl PgListener { /// [`eager_reconnect`]: PgListener::eager_reconnect pub async fn try_recv(&mut self) -> Result, Error> { match self.recv_without_recovery().await { - Ok(notification) => return Ok(Some(notification)), + Ok(notification) => Ok(Some(notification)), // The connection is dead, ensure that it is dropped, // update self state, and loop to try again. @@ -298,9 +298,9 @@ impl PgListener { } // lost connection - return Ok(None); - }, - Err(e) => return Err(e), + Ok(None) + } + Err(e) => Err(e), } } From ec9cfa77f5c7a87d7a1c51cfee904986a322c613 Mon Sep 17 00:00:00 2001 From: PonasKovas Date: Tue, 9 Sep 2025 12:28:04 +0300 Subject: [PATCH 5/5] remove the redundant backoff logic --- sqlx-postgres/src/listener.rs | 38 +++++++++++------------------------ 1 file changed, 12 insertions(+), 26 deletions(-) diff --git a/sqlx-postgres/src/listener.rs b/sqlx-postgres/src/listener.rs index 776fceae33..4dbda011e4 100644 --- a/sqlx-postgres/src/listener.rs +++ b/sqlx-postgres/src/listener.rs @@ -196,30 +196,6 @@ impl PgListener { Ok(self.connection.as_mut().unwrap()) } - // same as `connection` but if fails to connect retries 5 times - #[inline] - async fn connection_with_recovery(&mut self) -> Result<&mut PgConnection, Error> { - // retry max 5 times with these backoff durations - let backoff_times = [0, 100, 1000, 2000, 10_000]; // ms - - let mut last_err = None; - for backoff_ms in backoff_times { - match self.connect_if_needed().await { - Ok(()) => return Ok(self.connection.as_mut().unwrap()), - Err(err @ Error::Io(_)) => { - last_err = Some(err); - - crate::rt::sleep(std::time::Duration::from_millis(backoff_ms)).await; - continue; - } - Err(other) => return Err(other), - } - } - - // if 5 retries later still got IO error, return the last one and stop - Err(last_err.unwrap()) - } - /// Receives the next notification available from any of the subscribed channels. /// /// If the connection to PostgreSQL is lost, it is automatically reconnected on the next @@ -285,7 +261,7 @@ impl PgListener { Ok(notification) => Ok(Some(notification)), // The connection is dead, ensure that it is dropped, - // update self state, and loop to try again. + // update self state Err(Error::Io(_)) => { if let Some(mut conn) = self.connection.take() { self.buffer_tx = conn.inner.stream.notifications.take(); @@ -294,7 +270,17 @@ impl PgListener { } if self.eager_reconnect { - self.connection_with_recovery().await?; + // If reconnecting fails due to an IO error - retry + // if the pool is unable to get a new working connection + // it will eventually error out with Error::PoolTimedOut and + // end this loop + loop { + match self.connection().await { + Ok(_) => break, + Err(Error::Io(_)) => continue, + Err(e) => return Err(e), + } + } } // lost connection