diff --git a/sqlx-postgres/src/listener.rs b/sqlx-postgres/src/listener.rs index 639ec95441..4dbda011e4 100644 --- a/sqlx-postgres/src/listener.rs +++ b/sqlx-postgres/src/listener.rs @@ -1,5 +1,4 @@ use std::fmt::{self, Debug}; -use std::io; use std::str::from_utf8; use futures_channel::mpsc; @@ -258,10 +257,44 @@ impl PgListener { /// /// [`eager_reconnect`]: PgListener::eager_reconnect pub async fn try_recv(&mut self) -> Result, Error> { + match self.recv_without_recovery().await { + Ok(notification) => Ok(Some(notification)), + + // The connection is dead, ensure that it is dropped, + // update self state + Err(Error::Io(_)) => { + if let Some(mut conn) = self.connection.take() { + self.buffer_tx = conn.inner.stream.notifications.take(); + // Close the connection in a background task, so we can continue. + conn.close_on_drop(); + } + + if self.eager_reconnect { + // If reconnecting fails due to an IO error - retry + // if the pool is unable to get a new working connection + // it will eventually error out with Error::PoolTimedOut and + // end this loop + loop { + match self.connection().await { + Ok(_) => break, + Err(Error::Io(_)) => continue, + Err(e) => return Err(e), + } + } + } + + // lost connection + Ok(None) + } + Err(e) => Err(e), + } + } + + async fn recv_without_recovery(&mut self) -> Result { // Flush the buffer first, if anything // This would only fill up if this listener is used as a connection if let Some(notification) = self.next_buffered() { - return Ok(Some(notification)); + return Ok(notification); } // Fetch our `CloseEvent` listener, if applicable. @@ -278,45 +311,12 @@ impl PgListener { next_message.await }; - let message = match res { - Ok(message) => message, - - // The connection is dead, ensure that it is dropped, - // update self state, and loop to try again. - Err(Error::Io(err)) - if matches!( - err.kind(), - io::ErrorKind::ConnectionAborted | - io::ErrorKind::UnexpectedEof | - // see ERRORS section in tcp(7) man page (https://man7.org/linux/man-pages/man7/tcp.7.html) - io::ErrorKind::TimedOut | - io::ErrorKind::BrokenPipe - ) => - { - if let Some(mut conn) = self.connection.take() { - self.buffer_tx = conn.inner.stream.notifications.take(); - // Close the connection in a background task, so we can continue. - conn.close_on_drop(); - } - - if self.eager_reconnect { - self.connect_if_needed().await?; - } - - // lost connection - return Ok(None); - } - - // Forward other errors - Err(error) => { - return Err(error); - } - }; + let message = res?; match message.format { // We've received an async notification, return it. BackendMessageFormat::NotificationResponse => { - return Ok(Some(PgNotification(message.decode()?))); + return Ok(PgNotification(message.decode()?)); } // Mark the connection as ready for another query