diff --git a/sqlx-core/src/pool/connection.rs b/sqlx-core/src/pool/connection.rs index 7912b12aa1..ad044e9f2e 100644 --- a/sqlx-core/src/pool/connection.rs +++ b/sqlx-core/src/pool/connection.rs @@ -1,5 +1,6 @@ use std::fmt::{self, Debug, Formatter}; use std::future::{self, Future}; +use std::io; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -15,6 +16,33 @@ use crate::pool::options::PoolConnectionMetadata; const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5); +/// Helper function to execute a ping with an optional timeout. +/// +/// If `timeout` is `Some(Duration::ZERO)`, immediately returns a timeout error +/// for deterministic testing behavior. +async fn ping_with_timeout(timeout: Option, ping: F) -> Result<(), Error> +where + F: Future>, +{ + match timeout { + Some(timeout) if timeout.is_zero() => { + // Duration::ZERO means "always timeout immediately" + // This provides deterministic behavior for testing + Err(Error::Io(io::Error::new( + io::ErrorKind::TimedOut, + "ping timed out", + ))) + } + Some(timeout) => crate::rt::timeout(timeout, ping).await.unwrap_or_else(|_| { + Err(Error::Io(io::Error::new( + io::ErrorKind::TimedOut, + "ping timed out", + ))) + }), + None => ping.await, + } +} + /// A connection managed by a [`Pool`][crate::pool::Pool]. /// /// Will be returned to the pool on-drop. @@ -311,7 +339,9 @@ impl Floating> { // returned to the pool; also of course, if it was dropped due to an error // this is simply a band-aid as SQLx-next connections should be able // to recover from cancellations - if let Err(error) = self.raw.ping().await { + let ping_result = + ping_with_timeout(self.guard.pool.options.ping_timeout, self.raw.ping()).await; + if let Err(error) = ping_result { tracing::warn!( %error, "error occurred while testing the connection on-release", @@ -370,7 +400,7 @@ impl Floating> { } pub async fn ping(&mut self) -> Result<(), Error> { - self.live.raw.ping().await + ping_with_timeout(self.guard.pool.options.ping_timeout, self.live.raw.ping()).await } pub fn into_live(self) -> Floating> { diff --git a/sqlx-core/src/pool/options.rs b/sqlx-core/src/pool/options.rs index 3d048f1795..f1475dbe4c 100644 --- a/sqlx-core/src/pool/options.rs +++ b/sqlx-core/src/pool/options.rs @@ -82,6 +82,7 @@ pub struct PoolOptions { pub(crate) min_connections: u32, pub(crate) max_lifetime: Option, pub(crate) idle_timeout: Option, + pub(crate) ping_timeout: Option, pub(crate) fair: bool, pub(crate) parent_pool: Option>, @@ -105,6 +106,7 @@ impl Clone for PoolOptions { min_connections: self.min_connections, max_lifetime: self.max_lifetime, idle_timeout: self.idle_timeout, + ping_timeout: self.ping_timeout, fair: self.fair, parent_pool: self.parent_pool.clone(), } @@ -160,6 +162,7 @@ impl PoolOptions { acquire_timeout: Duration::from_secs(30), idle_timeout: Some(Duration::from_secs(10 * 60)), max_lifetime: Some(Duration::from_secs(30 * 60)), + ping_timeout: None, fair: true, parent_pool: None, } @@ -307,6 +310,22 @@ impl PoolOptions { self.idle_timeout } + /// Set the ping (health check) timeout. Ping is used when a connection is returned to the pool + /// or when a connection is acquired and test_before_acquire is true. + /// + /// If the ping takes longer than this, the connection is closed and a warning is logged. + /// + /// When set to `None` (the default), there is no timeout. + pub fn ping_timeout(mut self, timeout: impl Into>) -> Self { + self.ping_timeout = timeout.into(); + self + } + + /// Get the ping (health check) timeout. + pub fn get_ping_timeout(&self) -> Option { + self.ping_timeout + } + /// If true, the health of a connection will be verified by a call to [`Connection::ping`] /// before returning the connection. /// @@ -590,6 +609,7 @@ impl Debug for PoolOptions { .field("connect_timeout", &self.acquire_timeout) .field("max_lifetime", &self.max_lifetime) .field("idle_timeout", &self.idle_timeout) + .field("ping_timeout", &self.ping_timeout) .field("test_before_acquire", &self.test_before_acquire) .finish() } diff --git a/tests/any/pool.rs b/tests/any/pool.rs index a4849940b8..3221bfbac4 100644 --- a/tests/any/pool.rs +++ b/tests/any/pool.rs @@ -268,3 +268,107 @@ async fn test_connection_maintenance() -> anyhow::Result<()> { Ok(()) } + +#[sqlx_macros::test] +async fn pool_ping_timeout_on_return() -> anyhow::Result<()> { + sqlx::any::install_default_drivers(); + + // With a reasonable timeout, connections should be returned to the pool + let pool = AnyPoolOptions::new() + .ping_timeout(Duration::from_secs(10)) + .max_connections(1) + .connect(&dotenvy::var("DATABASE_URL")?) + .await?; + + let mut conn = pool.acquire().await?; + sqlx::query("SELECT 1").fetch_one(&mut *conn).await?; + conn.return_to_pool().await; + + assert_eq!(pool.num_idle(), 1); + drop(pool); + + // With a zero timeout, connections should be discarded on return + let pool = AnyPoolOptions::new() + .ping_timeout(Duration::ZERO) + .max_connections(1) + .connect(&dotenvy::var("DATABASE_URL")?) + .await?; + + let mut conn = pool.acquire().await?; + sqlx::query("SELECT 1").fetch_one(&mut *conn).await?; + conn.return_to_pool().await; + + assert_eq!(pool.num_idle(), 0); + + Ok(()) +} + +#[sqlx_macros::test] +async fn pool_ping_timeout_on_acquire() -> anyhow::Result<()> { + sqlx::any::install_default_drivers(); + + // Helper to wait for idle connections + async fn wait_for_idle(pool: &sqlx::AnyPool, expected: usize) { + for _ in 0..100 { + if pool.num_idle() == expected { + return; + } + sqlx_core::rt::sleep(Duration::from_millis(50)).await; + } + panic!( + "timed out waiting for {} idle connections, got {}", + expected, + pool.num_idle() + ); + } + + // With a reasonable timeout, idle connections should be used + let connect_count = Arc::new(AtomicUsize::new(0)); + let connect_count_ = connect_count.clone(); + let pool = AnyPoolOptions::new() + .ping_timeout(Duration::from_secs(10)) + .test_before_acquire(true) + .min_connections(1) + .max_connections(1) + .after_connect(move |_conn, _meta| { + connect_count_.fetch_add(1, Ordering::SeqCst); + Box::pin(async { Ok(()) }) + }) + .connect(&dotenvy::var("DATABASE_URL")?) + .await?; + + wait_for_idle(&pool, 1).await; + assert_eq!(connect_count.load(Ordering::SeqCst), 1); + + // Acquire should reuse the same connection + let _conn = pool.acquire().await?; + assert_eq!(connect_count.load(Ordering::SeqCst), 1); + drop(pool); + + // With a zero timeout, idle connections should fail ping and be replaced + let connect_count = Arc::new(AtomicUsize::new(0)); + let connect_count_ = connect_count.clone(); + let pool = AnyPoolOptions::new() + .ping_timeout(Duration::ZERO) + .test_before_acquire(true) + .min_connections(1) + .max_connections(1) + // Disable timeouts to prevent the reaper from interfering + .idle_timeout(None) + .max_lifetime(None) + .after_connect(move |_conn, _meta| { + connect_count_.fetch_add(1, Ordering::SeqCst); + Box::pin(async { Ok(()) }) + }) + .connect_lazy(&dotenvy::var("DATABASE_URL")?)?; + + wait_for_idle(&pool, 1).await; + assert_eq!(connect_count.load(Ordering::SeqCst), 1); + + // Acquire - ping will fail and the caller will go ahead and open a new + // connection. Importantly, the caller won't observe any error. + let _conn = pool.acquire().await?; + assert_eq!(connect_count.load(Ordering::SeqCst), 2); + + Ok(()) +}