Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions sqlx-core/src/pool/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::{self, Debug, Formatter};
use std::future::{self, Future};
use std::io;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand All @@ -15,6 +16,33 @@ use crate::pool::options::PoolConnectionMetadata;

const CLOSE_ON_DROP_TIMEOUT: Duration = Duration::from_secs(5);

/// Helper function to execute a ping with an optional timeout.
///
/// If `timeout` is `Some(Duration::ZERO)`, immediately returns a timeout error
/// for deterministic testing behavior.
async fn ping_with_timeout<F>(timeout: Option<Duration>, ping: F) -> Result<(), Error>
where
F: Future<Output = Result<(), Error>>,
{
match timeout {
Some(timeout) if timeout.is_zero() => {
// Duration::ZERO means "always timeout immediately"
// This provides deterministic behavior for testing
Err(Error::Io(io::Error::new(
io::ErrorKind::TimedOut,
"ping timed out",
)))
}
Some(timeout) => crate::rt::timeout(timeout, ping).await.unwrap_or_else(|_| {
Err(Error::Io(io::Error::new(
io::ErrorKind::TimedOut,
"ping timed out",
)))
}),
None => ping.await,
}
}

/// A connection managed by a [`Pool`][crate::pool::Pool].
///
/// Will be returned to the pool on-drop.
Expand Down Expand Up @@ -311,7 +339,9 @@ impl<DB: Database> Floating<DB, Live<DB>> {
// returned to the pool; also of course, if it was dropped due to an error
// this is simply a band-aid as SQLx-next connections should be able
// to recover from cancellations
if let Err(error) = self.raw.ping().await {
let ping_result =
ping_with_timeout(self.guard.pool.options.ping_timeout, self.raw.ping()).await;
if let Err(error) = ping_result {
tracing::warn!(
%error,
"error occurred while testing the connection on-release",
Expand Down Expand Up @@ -370,7 +400,7 @@ impl<DB: Database> Floating<DB, Idle<DB>> {
}

pub async fn ping(&mut self) -> Result<(), Error> {
self.live.raw.ping().await
ping_with_timeout(self.guard.pool.options.ping_timeout, self.live.raw.ping()).await
}

pub fn into_live(self) -> Floating<DB, Live<DB>> {
Expand Down
20 changes: 20 additions & 0 deletions sqlx-core/src/pool/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct PoolOptions<DB: Database> {
pub(crate) min_connections: u32,
pub(crate) max_lifetime: Option<Duration>,
pub(crate) idle_timeout: Option<Duration>,
pub(crate) ping_timeout: Option<Duration>,
pub(crate) fair: bool,

pub(crate) parent_pool: Option<Pool<DB>>,
Expand All @@ -105,6 +106,7 @@ impl<DB: Database> Clone for PoolOptions<DB> {
min_connections: self.min_connections,
max_lifetime: self.max_lifetime,
idle_timeout: self.idle_timeout,
ping_timeout: self.ping_timeout,
fair: self.fair,
parent_pool: self.parent_pool.clone(),
}
Expand Down Expand Up @@ -160,6 +162,7 @@ impl<DB: Database> PoolOptions<DB> {
acquire_timeout: Duration::from_secs(30),
idle_timeout: Some(Duration::from_secs(10 * 60)),
max_lifetime: Some(Duration::from_secs(30 * 60)),
ping_timeout: None,
fair: true,
parent_pool: None,
}
Expand Down Expand Up @@ -307,6 +310,22 @@ impl<DB: Database> PoolOptions<DB> {
self.idle_timeout
}

/// Set the ping (health check) timeout. Ping is used when a connection is returned to the pool
/// or when a connection is acquired and test_before_acquire is true.
///
/// If the ping takes longer than this, the connection is closed and a warning is logged.
///
/// When set to `None` (the default), there is no timeout.
pub fn ping_timeout(mut self, timeout: impl Into<Option<Duration>>) -> Self {
self.ping_timeout = timeout.into();
self
}

/// Get the ping (health check) timeout.
pub fn get_ping_timeout(&self) -> Option<Duration> {
self.ping_timeout
}

/// If true, the health of a connection will be verified by a call to [`Connection::ping`]
/// before returning the connection.
///
Expand Down Expand Up @@ -590,6 +609,7 @@ impl<DB: Database> Debug for PoolOptions<DB> {
.field("connect_timeout", &self.acquire_timeout)
.field("max_lifetime", &self.max_lifetime)
.field("idle_timeout", &self.idle_timeout)
.field("ping_timeout", &self.ping_timeout)
.field("test_before_acquire", &self.test_before_acquire)
.finish()
}
Expand Down
104 changes: 104 additions & 0 deletions tests/any/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,107 @@ async fn test_connection_maintenance() -> anyhow::Result<()> {

Ok(())
}

#[sqlx_macros::test]
async fn pool_ping_timeout_on_return() -> anyhow::Result<()> {
sqlx::any::install_default_drivers();

// With a reasonable timeout, connections should be returned to the pool
let pool = AnyPoolOptions::new()
.ping_timeout(Duration::from_secs(10))
.max_connections(1)
.connect(&dotenvy::var("DATABASE_URL")?)
.await?;

let mut conn = pool.acquire().await?;
sqlx::query("SELECT 1").fetch_one(&mut *conn).await?;
conn.return_to_pool().await;

assert_eq!(pool.num_idle(), 1);
drop(pool);

// With a zero timeout, connections should be discarded on return
let pool = AnyPoolOptions::new()
.ping_timeout(Duration::ZERO)
.max_connections(1)
.connect(&dotenvy::var("DATABASE_URL")?)
.await?;

let mut conn = pool.acquire().await?;
sqlx::query("SELECT 1").fetch_one(&mut *conn).await?;
conn.return_to_pool().await;

assert_eq!(pool.num_idle(), 0);

Ok(())
}

#[sqlx_macros::test]
async fn pool_ping_timeout_on_acquire() -> anyhow::Result<()> {
sqlx::any::install_default_drivers();

// Helper to wait for idle connections
async fn wait_for_idle(pool: &sqlx::AnyPool, expected: usize) {
for _ in 0..100 {
if pool.num_idle() == expected {
return;
}
sqlx_core::rt::sleep(Duration::from_millis(50)).await;
}
panic!(
"timed out waiting for {} idle connections, got {}",
expected,
pool.num_idle()
);
}

// With a reasonable timeout, idle connections should be used
let connect_count = Arc::new(AtomicUsize::new(0));
let connect_count_ = connect_count.clone();
let pool = AnyPoolOptions::new()
.ping_timeout(Duration::from_secs(10))
.test_before_acquire(true)
.min_connections(1)
.max_connections(1)
.after_connect(move |_conn, _meta| {
connect_count_.fetch_add(1, Ordering::SeqCst);
Box::pin(async { Ok(()) })
})
.connect(&dotenvy::var("DATABASE_URL")?)
.await?;

wait_for_idle(&pool, 1).await;
assert_eq!(connect_count.load(Ordering::SeqCst), 1);

// Acquire should reuse the same connection
let _conn = pool.acquire().await?;
assert_eq!(connect_count.load(Ordering::SeqCst), 1);
drop(pool);

// With a zero timeout, idle connections should fail ping and be replaced
let connect_count = Arc::new(AtomicUsize::new(0));
let connect_count_ = connect_count.clone();
let pool = AnyPoolOptions::new()
.ping_timeout(Duration::ZERO)
.test_before_acquire(true)
.min_connections(1)
.max_connections(1)
// Disable timeouts to prevent the reaper from interfering
.idle_timeout(None)
.max_lifetime(None)
.after_connect(move |_conn, _meta| {
connect_count_.fetch_add(1, Ordering::SeqCst);
Box::pin(async { Ok(()) })
})
.connect_lazy(&dotenvy::var("DATABASE_URL")?)?;

wait_for_idle(&pool, 1).await;
assert_eq!(connect_count.load(Ordering::SeqCst), 1);

// Acquire - ping will fail and the caller will go ahead and open a new
// connection. Importantly, the caller won't observe any error.
let _conn = pool.acquire().await?;
assert_eq!(connect_count.load(Ordering::SeqCst), 2);

Ok(())
}