From 23b09f68d48709e8fcf777f1dc0ac598bd0c457c Mon Sep 17 00:00:00 2001 From: Pierpaolo Follia Date: Mon, 7 Jul 2025 16:33:03 +0200 Subject: [PATCH 1/3] Make backoff optional --- lib/src/config.rs | 100 +++++++++++++++++++ lib/src/graph.rs | 38 ++++--- lib/src/pool.rs | 23 ++--- lib/src/routing/connection_registry.rs | 2 + lib/src/routing/routed_connection_manager.rs | 10 +- 5 files changed, 139 insertions(+), 34 deletions(-) diff --git a/lib/src/config.rs b/lib/src/config.rs index 8027f870..b90e0a05 100644 --- a/lib/src/config.rs +++ b/lib/src/config.rs @@ -1,5 +1,6 @@ use crate::auth::{ClientCertificate, ConnectionTLSConfig, MutualTLS}; use crate::errors::{Error, Result}; +use backon::ExponentialBuilder; #[cfg(feature = "unstable-bolt-protocol-impl-v2")] use serde::{Deserialize, Deserializer, Serialize}; use std::path::Path; @@ -67,6 +68,87 @@ pub struct LiveConfig { pub(crate) fetch_size: usize, } +#[derive(Debug, Clone, PartialEq)] +pub struct BackoffConfig { + pub(crate) multiplier: Option, + pub(crate) min_delay_ms: Option, + pub(crate) max_delay_ms: Option, + pub(crate) total_delay_ms: Option, +} + +impl Default for BackoffConfig { + fn default() -> Self { + BackoffConfig { + multiplier: Some(2.0), + min_delay_ms: Some(1), + max_delay_ms: Some(10000), + total_delay_ms: Some(60000), + } + } +} + +impl BackoffConfig { + pub fn to_exponential_builder(&self) -> ExponentialBuilder { + ExponentialBuilder::new() + .with_jitter() + .with_factor(self.multiplier.unwrap_or(2.0)) + .without_max_times() + .with_min_delay(std::time::Duration::from_millis( + self.min_delay_ms.unwrap_or(1), + )) + .with_max_delay(std::time::Duration::from_millis( + self.max_delay_ms.unwrap_or(10_000), + )) + .with_total_delay(Some(std::time::Duration::from_millis( + self.total_delay_ms.unwrap_or(60_000), + ))) + } +} + +#[derive(Default)] +pub struct BackoffConfigBuilder { + multiplier: Option, + min_delay_ms: Option, + max_delay_ms: Option, + total_delay_ms: Option, +} + +#[allow(dead_code)] +impl BackoffConfigBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn with_multiplier(mut self, multiplier: f32) -> Self { + self.multiplier = Some(multiplier); + self + } + + pub fn with_min_delay_ms(mut self, min_delay_ms: u64) -> Self { + self.min_delay_ms = Some(min_delay_ms); + self + } + + pub fn with_max_delay_ms(mut self, max_delay_ms: u64) -> Self { + self.max_delay_ms = Some(max_delay_ms); + self + } + + pub fn with_total_delay_ms(mut self, max_total_delay_ms: Option) -> Self { + self.total_delay_ms = max_total_delay_ms; + self + } + + pub fn build(self) -> BackoffConfig { + BackoffConfig { + multiplier: self.multiplier, + min_delay_ms: self.min_delay_ms, + max_delay_ms: self.max_delay_ms, + total_delay_ms: self.total_delay_ms, + } + } +} + /// The configuration used to connect to the database, see [`crate::Graph::connect`]. #[derive(Debug, Clone)] pub struct Config { @@ -77,6 +159,7 @@ pub struct Config { pub(crate) db: Option, pub(crate) fetch_size: usize, pub(crate) tls_config: ConnectionTLSConfig, + pub(crate) backoff: Option, } impl Config { @@ -97,6 +180,7 @@ pub struct ConfigBuilder { fetch_size: usize, max_connections: usize, tls_config: ConnectionTLSConfig, + backoff_config: Option, } impl ConfigBuilder { @@ -178,6 +262,11 @@ impl ConfigBuilder { self } + pub fn with_backoff(mut self, backoff: Option) -> Self { + self.backoff_config = backoff; + self + } + pub fn build(self) -> Result { if let (Some(uri), Some(user), Some(password)) = (self.uri, self.user, self.password) { Ok(Config { @@ -188,6 +277,7 @@ impl ConfigBuilder { max_connections: self.max_connections, db: self.db, tls_config: self.tls_config, + backoff: self.backoff_config, }) } else { Err(Error::InvalidConfig) @@ -205,6 +295,7 @@ impl Default for ConfigBuilder { max_connections: DEFAULT_MAX_CONNECTIONS, fetch_size: DEFAULT_FETCH_SIZE, tls_config: ConnectionTLSConfig::None, + backoff_config: Some(BackoffConfig::default()), } } } @@ -222,6 +313,7 @@ mod tests { .db("some_db") .fetch_size(10) .max_connections(5) + .with_backoff(None) .build() .unwrap(); assert_eq!(config.uri, "127.0.0.1:7687"); @@ -231,6 +323,7 @@ mod tests { assert_eq!(config.fetch_size, 10); assert_eq!(config.max_connections, 5); assert_eq!(config.tls_config, ConnectionTLSConfig::None); + assert_eq!(config.backoff, None); } #[test] @@ -248,6 +341,8 @@ mod tests { assert_eq!(config.fetch_size, 200); assert_eq!(config.max_connections, 16); assert_eq!(config.tls_config, ConnectionTLSConfig::None); + assert!(config.backoff.is_some()); + assert_eq!(config.backoff.as_ref().unwrap(), &BackoffConfig::default()); } #[test] @@ -257,6 +352,9 @@ mod tests { .user("some_user") .password("some_password") .skip_ssl_validation() + .with_backoff(Some( + BackoffConfigBuilder::new().with_multiplier(2.0).build(), + )) .build() .unwrap(); assert_eq!(config.uri, "127.0.0.1:7687"); @@ -266,6 +364,8 @@ mod tests { assert_eq!(config.fetch_size, 200); assert_eq!(config.max_connections, 16); assert_eq!(config.tls_config, ConnectionTLSConfig::NoSSLValidation); + assert!(config.backoff.is_some()); + assert_eq!(config.backoff.as_ref().unwrap().multiplier.unwrap(), 2.0); } #[test] diff --git a/lib/src/graph.rs b/lib/src/graph.rs index d928c642..a58b08bd 100644 --- a/lib/src/graph.rs +++ b/lib/src/graph.rs @@ -46,7 +46,7 @@ impl ConnectionPoolManager { } } - fn backoff(&self) -> ExponentialBuilder { + fn backoff(&self) -> Option { match self { #[cfg(feature = "unstable-bolt-protocol-impl-v2")] Routed(manager) => manager.backoff(), @@ -233,13 +233,17 @@ impl Graph { ) -> Result { let query = query.into_retryable(db, operation, &self.pool, None); - let (query, result) = RetryableQuery::retry_run - .retry(self.pool.backoff()) - .sleep(tokio::time::sleep) - .context(query) - .when(|e| matches!(e, Retry::Yes(_))) - .notify(Self::log_retry) - .await; + let (query, result) = if let Some(exponential_backoff) = self.pool.backoff() { + RetryableQuery::retry_run + .retry(exponential_backoff) + .sleep(tokio::time::sleep) + .context(query) + .when(|e| matches!(e, Retry::Yes(_))) + .notify(Self::log_retry) + .await + } else { + query.retry_run().await + }; match result { Ok(result) => { @@ -331,13 +335,17 @@ impl Graph { ) -> Result { let query = query.into_retryable(db, operation, &self.pool, Some(self.config.fetch_size)); - let (query, result) = RetryableQuery::retry_execute - .retry(self.pool.backoff()) - .sleep(tokio::time::sleep) - .context(query) - .when(|e| matches!(e, Retry::Yes(_))) - .notify(Self::log_retry) - .await; + let (_, result) = if let Some(exponential_backoff) = self.pool.backoff() { + RetryableQuery::retry_execute + .retry(exponential_backoff) + .sleep(tokio::time::sleep) + .context(query) + .when(|e| matches!(e, Retry::Yes(_))) + .notify(Self::log_retry) + .await + } else { + query.retry_execute().await + }; result.map_err(Retry::into_inner) } diff --git a/lib/src/pool.rs b/lib/src/pool.rs index 35ad638b..73fa9d82 100644 --- a/lib/src/pool.rs +++ b/lib/src/pool.rs @@ -1,6 +1,5 @@ -use std::time::Duration; - use crate::auth::ConnectionTLSConfig; +use crate::config::BackoffConfig; use crate::{ config::Config, connection::{Connection, ConnectionInfo}, @@ -15,7 +14,7 @@ pub type ManagedConnection = Object; pub struct ConnectionManager { info: ConnectionInfo, - backoff: ExponentialBuilder, + backoff: Option, } impl ConnectionManager { @@ -24,27 +23,18 @@ impl ConnectionManager { user: &str, password: &str, tls_config: &ConnectionTLSConfig, + backoff_config: Option<&BackoffConfig>, ) -> Result { let info = ConnectionInfo::new(uri, user, password, tls_config)?; - let backoff = backoff(); + let backoff = backoff_config.map(|backoff_config| backoff_config.to_exponential_builder()); Ok(ConnectionManager { info, backoff }) } - pub fn backoff(&self) -> ExponentialBuilder { - self.backoff + pub fn backoff(&self) -> Option { + self.backoff.clone() } } -pub(crate) fn backoff() -> ExponentialBuilder { - ExponentialBuilder::new() - .with_jitter() - .with_factor(2.0) - .without_max_times() - .with_min_delay(Duration::from_millis(1)) - .with_max_delay(Duration::from_secs(10)) - .with_total_delay(Some(Duration::from_secs(60))) -} - impl Manager for ConnectionManager { type Type = Connection; type Error = Error; @@ -66,6 +56,7 @@ pub fn create_pool(config: &Config) -> Result { &config.user, &config.password, &config.tls_config, + config.backoff.as_ref(), )?; info!( "creating connection pool for node {} with max size {}", diff --git a/lib/src/routing/connection_registry.rs b/lib/src/routing/connection_registry.rs index afae2fc4..79c9ab0c 100644 --- a/lib/src/routing/connection_registry.rs +++ b/lib/src/routing/connection_registry.rs @@ -418,6 +418,7 @@ mod tests { db: None, fetch_size: 200, tls_config: ConnectionTLSConfig::None, + backoff: None, }; let registry = Arc::new(ConnectionRegistry::default()); let ttl = refresh_all_routing_tables( @@ -539,6 +540,7 @@ mod tests { db: None, fetch_size: 200, tls_config: ConnectionTLSConfig::None, + backoff: None, }; let registry = Arc::new(ConnectionRegistry::default()); // get registry for db1 amd refresh routing table diff --git a/lib/src/routing/routed_connection_manager.rs b/lib/src/routing/routed_connection_manager.rs index c43252ed..e97e0585 100644 --- a/lib/src/routing/routed_connection_manager.rs +++ b/lib/src/routing/routed_connection_manager.rs @@ -1,3 +1,4 @@ +use crate::config::BackoffConfig; use crate::pool::ManagedConnection; use crate::routing::connection_registry::{ start_background_updater, BoltServer, ConnectionRegistry, RegistryCommand, @@ -19,7 +20,7 @@ pub struct RoutedConnectionManager { load_balancing_strategy: Arc, connection_registry: Arc, bookmarks: Arc>>, - backoff: ExponentialBuilder, + backoff: Option, channel: Sender, } @@ -27,7 +28,10 @@ const ROUTING_TABLE_MAX_WAIT_TIME_MS: i32 = 5000; impl RoutedConnectionManager { pub fn new(config: &Config, provider: Arc) -> Result { - let backoff = crate::pool::backoff(); + let backoff = config + .backoff + .clone() + .map(|config| config.to_exponential_builder()); let connection_registry = Arc::new(ConnectionRegistry::default()); let channel = start_background_updater(config, connection_registry.clone(), provider); Ok(RoutedConnectionManager { @@ -131,7 +135,7 @@ impl RoutedConnectionManager { } } - pub(crate) fn backoff(&self) -> ExponentialBuilder { + pub(crate) fn backoff(&self) -> Option { self.backoff } From 6cbfcf8290831dd1f9435b68a99cc327e90786d3 Mon Sep 17 00:00:00 2001 From: Pierpaolo Follia Date: Fri, 11 Jul 2025 11:43:15 +0200 Subject: [PATCH 2/3] Turn backoff off for routed connections --- lib/src/pool.rs | 2 +- lib/src/routing/connection_registry.rs | 1 + lib/src/routing/routed_connection_manager.rs | 4 +++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/src/pool.rs b/lib/src/pool.rs index 73fa9d82..c01fedcd 100644 --- a/lib/src/pool.rs +++ b/lib/src/pool.rs @@ -31,7 +31,7 @@ impl ConnectionManager { } pub fn backoff(&self) -> Option { - self.backoff.clone() + self.backoff } } diff --git a/lib/src/routing/connection_registry.rs b/lib/src/routing/connection_registry.rs index 79c9ab0c..0ad26c37 100644 --- a/lib/src/routing/connection_registry.rs +++ b/lib/src/routing/connection_registry.rs @@ -176,6 +176,7 @@ async fn refresh_routing_table( server.clone(), create_pool(&Config { uri, + backoff: None, ..config.clone() })?, ); diff --git a/lib/src/routing/routed_connection_manager.rs b/lib/src/routing/routed_connection_manager.rs index e97e0585..23c9a4ba 100644 --- a/lib/src/routing/routed_connection_manager.rs +++ b/lib/src/routing/routed_connection_manager.rs @@ -1,4 +1,3 @@ -use crate::config::BackoffConfig; use crate::pool::ManagedConnection; use crate::routing::connection_registry::{ start_background_updater, BoltServer, ConnectionRegistry, RegistryCommand, @@ -28,6 +27,9 @@ const ROUTING_TABLE_MAX_WAIT_TIME_MS: i32 = 5000; impl RoutedConnectionManager { pub fn new(config: &Config, provider: Arc) -> Result { + // backoff config should be set to None here, since the routing table updater will handle retries + // We could provide some configuration to "force" the retry mechanism in a clustered env, + // but for now we will turn it off let backoff = config .backoff .clone() From 3b28545fe92ac6846cda00308c83194ea0738924 Mon Sep 17 00:00:00 2001 From: Pierpaolo Follia Date: Sun, 20 Jul 2025 11:58:18 +0200 Subject: [PATCH 3/3] Expose new structures --- lib/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 4f90dce7..4edec695 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -509,7 +509,7 @@ mod types; mod version; pub use crate::auth::ClientCertificate; -pub use crate::config::{Config, ConfigBuilder, Database}; +pub use crate::config::{BackoffConfig, BackoffConfigBuilder, Config, ConfigBuilder, Database}; pub use crate::errors::{ Error, Neo4jClientErrorKind, Neo4jError, Neo4jErrorKind, Neo4jSecurityErrorKind, Result, };