Skip to content

Commit 8de7b89

Browse files
committed
Make backoff optional
1 parent 6748145 commit 8de7b89

File tree

5 files changed

+139
-34
lines changed

5 files changed

+139
-34
lines changed

lib/src/config.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::auth::{ClientCertificate, ConnectionTLSConfig};
22
use crate::errors::{Error, Result};
3+
use backon::ExponentialBuilder;
34
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
45
use serde::{Deserialize, Deserializer, Serialize};
56
use std::path::Path;
@@ -67,6 +68,87 @@ pub struct LiveConfig {
6768
pub(crate) fetch_size: usize,
6869
}
6970

71+
#[derive(Debug, Clone, PartialEq)]
72+
pub struct BackoffConfig {
73+
pub(crate) multiplier: Option<f32>,
74+
pub(crate) min_delay_ms: Option<u64>,
75+
pub(crate) max_delay_ms: Option<u64>,
76+
pub(crate) total_delay_ms: Option<u64>,
77+
}
78+
79+
impl Default for BackoffConfig {
80+
fn default() -> Self {
81+
BackoffConfig {
82+
multiplier: Some(2.0),
83+
min_delay_ms: Some(1),
84+
max_delay_ms: Some(10000),
85+
total_delay_ms: Some(60000),
86+
}
87+
}
88+
}
89+
90+
impl BackoffConfig {
91+
pub fn to_exponential_builder(&self) -> ExponentialBuilder {
92+
ExponentialBuilder::new()
93+
.with_jitter()
94+
.with_factor(self.multiplier.unwrap_or(2.0))
95+
.without_max_times()
96+
.with_min_delay(std::time::Duration::from_millis(
97+
self.min_delay_ms.unwrap_or(1),
98+
))
99+
.with_max_delay(std::time::Duration::from_millis(
100+
self.max_delay_ms.unwrap_or(10_000),
101+
))
102+
.with_total_delay(Some(std::time::Duration::from_millis(
103+
self.total_delay_ms.unwrap_or(60_000),
104+
)))
105+
}
106+
}
107+
108+
#[derive(Default)]
109+
pub struct BackoffConfigBuilder {
110+
multiplier: Option<f32>,
111+
min_delay_ms: Option<u64>,
112+
max_delay_ms: Option<u64>,
113+
total_delay_ms: Option<u64>,
114+
}
115+
116+
#[allow(dead_code)]
117+
impl BackoffConfigBuilder {
118+
pub fn new() -> Self {
119+
Self::default()
120+
}
121+
122+
pub fn with_multiplier(mut self, multiplier: f32) -> Self {
123+
self.multiplier = Some(multiplier);
124+
self
125+
}
126+
127+
pub fn with_min_delay_ms(mut self, min_delay_ms: u64) -> Self {
128+
self.min_delay_ms = Some(min_delay_ms);
129+
self
130+
}
131+
132+
pub fn with_max_delay_ms(mut self, max_delay_ms: u64) -> Self {
133+
self.max_delay_ms = Some(max_delay_ms);
134+
self
135+
}
136+
137+
pub fn with_total_delay_ms(mut self, max_total_delay_ms: Option<u64>) -> Self {
138+
self.total_delay_ms = max_total_delay_ms;
139+
self
140+
}
141+
142+
pub fn build(self) -> BackoffConfig {
143+
BackoffConfig {
144+
multiplier: self.multiplier,
145+
min_delay_ms: self.min_delay_ms,
146+
max_delay_ms: self.max_delay_ms,
147+
total_delay_ms: self.total_delay_ms,
148+
}
149+
}
150+
}
151+
70152
/// The configuration used to connect to the database, see [`crate::Graph::connect`].
71153
#[derive(Debug, Clone)]
72154
pub struct Config {
@@ -77,6 +159,7 @@ pub struct Config {
77159
pub(crate) db: Option<Database>,
78160
pub(crate) fetch_size: usize,
79161
pub(crate) tls_config: ConnectionTLSConfig,
162+
pub(crate) backoff: Option<BackoffConfig>,
80163
}
81164

82165
impl Config {
@@ -97,6 +180,7 @@ pub struct ConfigBuilder {
97180
fetch_size: usize,
98181
max_connections: usize,
99182
tls_config: ConnectionTLSConfig,
183+
backoff_config: Option<BackoffConfig>,
100184
}
101185

102186
impl ConfigBuilder {
@@ -166,6 +250,11 @@ impl ConfigBuilder {
166250
self
167251
}
168252

253+
pub fn with_backoff(mut self, backoff: Option<BackoffConfig>) -> Self {
254+
self.backoff_config = backoff;
255+
self
256+
}
257+
169258
pub fn build(self) -> Result<Config> {
170259
if let (Some(uri), Some(user), Some(password)) = (self.uri, self.user, self.password) {
171260
Ok(Config {
@@ -176,6 +265,7 @@ impl ConfigBuilder {
176265
max_connections: self.max_connections,
177266
db: self.db,
178267
tls_config: self.tls_config,
268+
backoff: self.backoff_config,
179269
})
180270
} else {
181271
Err(Error::InvalidConfig)
@@ -193,6 +283,7 @@ impl Default for ConfigBuilder {
193283
max_connections: DEFAULT_MAX_CONNECTIONS,
194284
fetch_size: DEFAULT_FETCH_SIZE,
195285
tls_config: ConnectionTLSConfig::None,
286+
backoff_config: Some(BackoffConfig::default()),
196287
}
197288
}
198289
}
@@ -210,6 +301,7 @@ mod tests {
210301
.db("some_db")
211302
.fetch_size(10)
212303
.max_connections(5)
304+
.with_backoff(None)
213305
.build()
214306
.unwrap();
215307
assert_eq!(config.uri, "127.0.0.1:7687");
@@ -219,6 +311,7 @@ mod tests {
219311
assert_eq!(config.fetch_size, 10);
220312
assert_eq!(config.max_connections, 5);
221313
assert_eq!(config.tls_config, ConnectionTLSConfig::None);
314+
assert_eq!(config.backoff, None);
222315
}
223316

224317
#[test]
@@ -236,6 +329,8 @@ mod tests {
236329
assert_eq!(config.fetch_size, 200);
237330
assert_eq!(config.max_connections, 16);
238331
assert_eq!(config.tls_config, ConnectionTLSConfig::None);
332+
assert!(config.backoff.is_some());
333+
assert_eq!(config.backoff.as_ref().unwrap(), &BackoffConfig::default());
239334
}
240335

241336
#[test]
@@ -245,6 +340,9 @@ mod tests {
245340
.user("some_user")
246341
.password("some_password")
247342
.skip_ssl_validation()
343+
.with_backoff(Some(
344+
BackoffConfigBuilder::new().with_multiplier(2.0).build(),
345+
))
248346
.build()
249347
.unwrap();
250348
assert_eq!(config.uri, "127.0.0.1:7687");
@@ -254,6 +352,8 @@ mod tests {
254352
assert_eq!(config.fetch_size, 200);
255353
assert_eq!(config.max_connections, 16);
256354
assert_eq!(config.tls_config, ConnectionTLSConfig::NoSSLValidation);
355+
assert!(config.backoff.is_some());
356+
assert_eq!(config.backoff.as_ref().unwrap().multiplier.unwrap(), 2.0);
257357
}
258358

259359
#[test]

lib/src/graph.rs

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl ConnectionPoolManager {
4646
}
4747
}
4848

49-
fn backoff(&self) -> ExponentialBuilder {
49+
fn backoff(&self) -> Option<ExponentialBuilder> {
5050
match self {
5151
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
5252
Routed(manager) => manager.backoff(),
@@ -233,13 +233,17 @@ impl Graph {
233233
) -> Result<RunResult> {
234234
let query = query.into_retryable(db, operation, &self.pool, None);
235235

236-
let (query, result) = RetryableQuery::retry_run
237-
.retry(self.pool.backoff())
238-
.sleep(tokio::time::sleep)
239-
.context(query)
240-
.when(|e| matches!(e, Retry::Yes(_)))
241-
.notify(Self::log_retry)
242-
.await;
236+
let (query, result) = if let Some(exponential_backoff) = self.pool.backoff() {
237+
RetryableQuery::retry_run
238+
.retry(exponential_backoff)
239+
.sleep(tokio::time::sleep)
240+
.context(query)
241+
.when(|e| matches!(e, Retry::Yes(_)))
242+
.notify(Self::log_retry)
243+
.await
244+
} else {
245+
query.retry_run().await
246+
};
243247

244248
match result {
245249
Ok(result) => {
@@ -331,13 +335,17 @@ impl Graph {
331335
) -> Result<DetachedRowStream> {
332336
let query = query.into_retryable(db, operation, &self.pool, Some(self.config.fetch_size));
333337

334-
let (query, result) = RetryableQuery::retry_execute
335-
.retry(self.pool.backoff())
336-
.sleep(tokio::time::sleep)
337-
.context(query)
338-
.when(|e| matches!(e, Retry::Yes(_)))
339-
.notify(Self::log_retry)
340-
.await;
338+
let (_, result) = if let Some(exponential_backoff) = self.pool.backoff() {
339+
RetryableQuery::retry_execute
340+
.retry(exponential_backoff)
341+
.sleep(tokio::time::sleep)
342+
.context(query)
343+
.when(|e| matches!(e, Retry::Yes(_)))
344+
.notify(Self::log_retry)
345+
.await
346+
} else {
347+
query.retry_execute().await
348+
};
341349

342350
result.map_err(Retry::into_inner)
343351
}

lib/src/pool.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use std::time::Duration;
2-
31
use crate::auth::ConnectionTLSConfig;
2+
use crate::config::BackoffConfig;
43
use crate::{
54
config::Config,
65
connection::{Connection, ConnectionInfo},
@@ -15,7 +14,7 @@ pub type ManagedConnection = Object<ConnectionManager>;
1514

1615
pub struct ConnectionManager {
1716
info: ConnectionInfo,
18-
backoff: ExponentialBuilder,
17+
backoff: Option<ExponentialBuilder>,
1918
}
2019

2120
impl ConnectionManager {
@@ -24,27 +23,18 @@ impl ConnectionManager {
2423
user: &str,
2524
password: &str,
2625
tls_config: &ConnectionTLSConfig,
26+
backoff_config: Option<&BackoffConfig>,
2727
) -> Result<Self> {
2828
let info = ConnectionInfo::new(uri, user, password, tls_config)?;
29-
let backoff = backoff();
29+
let backoff = backoff_config.map(|backoff_config| backoff_config.to_exponential_builder());
3030
Ok(ConnectionManager { info, backoff })
3131
}
3232

33-
pub fn backoff(&self) -> ExponentialBuilder {
34-
self.backoff
33+
pub fn backoff(&self) -> Option<ExponentialBuilder> {
34+
self.backoff.clone()
3535
}
3636
}
3737

38-
pub(crate) fn backoff() -> ExponentialBuilder {
39-
ExponentialBuilder::new()
40-
.with_jitter()
41-
.with_factor(2.0)
42-
.without_max_times()
43-
.with_min_delay(Duration::from_millis(1))
44-
.with_max_delay(Duration::from_secs(10))
45-
.with_total_delay(Some(Duration::from_secs(60)))
46-
}
47-
4838
impl Manager for ConnectionManager {
4939
type Type = Connection;
5040
type Error = Error;
@@ -66,6 +56,7 @@ pub fn create_pool(config: &Config) -> Result<ConnectionPool> {
6656
&config.user,
6757
&config.password,
6858
&config.tls_config,
59+
config.backoff.as_ref(),
6960
)?;
7061
info!(
7162
"creating connection pool for node {} with max size {}",

lib/src/routing/connection_registry.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ mod tests {
418418
db: None,
419419
fetch_size: 200,
420420
tls_config: ConnectionTLSConfig::None,
421+
backoff: None,
421422
};
422423
let registry = Arc::new(ConnectionRegistry::default());
423424
let ttl = refresh_all_routing_tables(
@@ -539,6 +540,7 @@ mod tests {
539540
db: None,
540541
fetch_size: 200,
541542
tls_config: ConnectionTLSConfig::None,
543+
backoff: None,
542544
};
543545
let registry = Arc::new(ConnectionRegistry::default());
544546
// get registry for db1 amd refresh routing table

lib/src/routing/routed_connection_manager.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::config::BackoffConfig;
12
use crate::pool::ManagedConnection;
23
use crate::routing::connection_registry::{
34
start_background_updater, BoltServer, ConnectionRegistry, RegistryCommand,
@@ -19,15 +20,18 @@ pub struct RoutedConnectionManager {
1920
load_balancing_strategy: Arc<dyn LoadBalancingStrategy>,
2021
connection_registry: Arc<ConnectionRegistry>,
2122
bookmarks: Arc<Mutex<Vec<String>>>,
22-
backoff: ExponentialBuilder,
23+
backoff: Option<ExponentialBuilder>,
2324
channel: Sender<RegistryCommand>,
2425
}
2526

2627
const ROUTING_TABLE_MAX_WAIT_TIME_MS: i32 = 5000;
2728

2829
impl RoutedConnectionManager {
2930
pub fn new(config: &Config, provider: Arc<dyn RoutingTableProvider>) -> Result<Self, Error> {
30-
let backoff = crate::pool::backoff();
31+
let backoff = config
32+
.backoff
33+
.clone()
34+
.map(|config| config.to_exponential_builder());
3135
let connection_registry = Arc::new(ConnectionRegistry::default());
3236
let channel = start_background_updater(config, connection_registry.clone(), provider);
3337
Ok(RoutedConnectionManager {
@@ -131,7 +135,7 @@ impl RoutedConnectionManager {
131135
}
132136
}
133137

134-
pub(crate) fn backoff(&self) -> ExponentialBuilder {
138+
pub(crate) fn backoff(&self) -> Option<ExponentialBuilder> {
135139
self.backoff
136140
}
137141

0 commit comments

Comments
 (0)