Skip to content

Commit 5340365

Browse files
committed
feat: receive config via params and log errors
1 parent 5794215 commit 5340365

File tree

4 files changed

+39
-49
lines changed

4 files changed

+39
-49
lines changed

aggregation_mode/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/db/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ edition = "2021"
77
tokio = { version = "1"}
88
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] }
99
backon = "1.2.0"
10+
tracing = { version = "0.1", features = ["log"] }
1011

1112
[[bin]]
1213
name = "migrate"

aggregation_mode/db/src/orchestrator.rs

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{future::Future, time::Duration};
33
use backon::{ExponentialBuilder, Retryable};
44
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
55

6-
use crate::retry::RetryError;
6+
use crate::retry::{RetryConfig, RetryError};
77

88
#[derive(Clone, Debug)]
99
struct DbNode {
@@ -12,29 +12,45 @@ struct DbNode {
1212

1313
pub struct DbOrchestartor {
1414
nodes: Vec<DbNode>,
15+
retry_config: RetryConfig,
16+
}
17+
18+
pub enum DbOrchestartorError {
19+
InvalidNumberOfConnectionUrls,
20+
Sqlx(sqlx::Error),
1521
}
1622

1723
impl DbOrchestartor {
18-
pub fn try_new(connection_urls: Vec<String>) -> Result<Self, sqlx::Error> {
19-
// TODO: validate at least one connection url
24+
pub fn try_new(
25+
connection_urls: Vec<String>,
26+
retry_config: RetryConfig,
27+
) -> Result<Self, DbOrchestartorError> {
28+
if connection_urls.is_empty() {
29+
return Err(DbOrchestartorError::InvalidNumberOfConnectionUrls);
30+
}
31+
2032
let nodes = connection_urls
2133
.into_iter()
2234
.map(|url| {
2335
let pool = PgPoolOptions::new().max_connections(5).connect_lazy(&url)?;
2436

2537
Ok(DbNode { pool })
2638
})
27-
.collect::<Result<Vec<_>, sqlx::Error>>()?;
39+
.collect::<Result<Vec<_>, sqlx::Error>>()
40+
.map_err(|e| DbOrchestartorError::Sqlx(e))?;
2841

29-
Ok(Self { nodes })
42+
Ok(Self {
43+
nodes,
44+
retry_config,
45+
})
3046
}
3147

3248
fn backoff_builder(&self) -> ExponentialBuilder {
3349
ExponentialBuilder::default()
34-
.with_min_delay(Duration::from_millis(0))
35-
.with_max_times(0)
36-
.with_factor(0.0)
37-
.with_max_delay(Duration::from_secs(0))
50+
.with_min_delay(Duration::from_millis(self.retry_config.min_delay_millis))
51+
.with_max_times(self.retry_config.max_times)
52+
.with_factor(self.retry_config.factor)
53+
.with_max_delay(Duration::from_secs(self.retry_config.max_delay_seconds))
3854
}
3955

4056
pub async fn query<T, E, Q, Fut>(&self, query_fn: Q) -> Result<T, sqlx::Error>
@@ -51,6 +67,7 @@ impl DbOrchestartor {
5167
Ok(res) => return Ok(res),
5268
Err(err) => {
5369
if Self::is_connection_error(&err) {
70+
tracing::warn!(node_index = idx, error = ?err, "database query failed; retrying");
5471
last_error = Some(err);
5572
} else {
5673
return Err(RetryError::Permanent(err));
@@ -81,6 +98,7 @@ impl DbOrchestartor {
8198
| sqlx::Error::PoolClosed
8299
| sqlx::Error::WorkerCrashed
83100
| sqlx::Error::BeginFailed
101+
| sqlx::Error::Database(_)
84102
)
85103
}
86104
}

aggregation_mode/db/src/retry.rs

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
use backon::ExponentialBuilder;
2-
use backon::Retryable;
3-
use std::{future::Future, time::Duration};
4-
51
#[derive(Debug)]
62
pub enum RetryError<E> {
73
Transient(E),
@@ -28,40 +24,14 @@ impl<E> RetryError<E> {
2824

2925
impl<E: std::fmt::Display> std::error::Error for RetryError<E> where E: std::fmt::Debug {}
3026

31-
/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function
32-
/// Runs with `jitter: false`.
33-
///
34-
/// # Parameters
35-
/// * `function` - The async function to retry
36-
/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds)
37-
/// * `factor` - Exponential backoff multiplier for retry delays
38-
/// * `max_times` - Maximum number of retry attempts
39-
/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds)
40-
pub async fn retry_function<FutureFn, Fut, T, E, A>(
41-
mut function: FutureFn,
42-
min_delay_millis: u64,
43-
factor: f32,
44-
max_times: usize,
45-
max_delay_seconds: u64,
46-
) -> Result<T, RetryError<E>>
47-
where
48-
Fut: Future<Output = Result<T, RetryError<E>>>,
49-
FutureFn: FnMut() -> Fut,
50-
{
51-
let backoff = ExponentialBuilder::default()
52-
.with_min_delay(Duration::from_millis(min_delay_millis))
53-
.with_max_times(max_times)
54-
.with_factor(factor)
55-
.with_max_delay(Duration::from_secs(max_delay_seconds));
56-
57-
let func = async || match function().await {
58-
Ok(res) => Ok(res),
59-
Err(e) => Err(e),
60-
};
61-
62-
function
63-
.retry(backoff)
64-
.sleep(tokio::time::sleep)
65-
.when(|e| matches!(e, RetryError::Transient(_)))
66-
.await
27+
#[derive(Debug)]
28+
pub struct RetryConfig {
29+
/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds)
30+
pub min_delay_millis: u64,
31+
/// * `factor` - Exponential backoff multiplier for retry delays
32+
pub factor: f32,
33+
/// * `max_times` - Maximum number of retry attempts
34+
pub max_times: usize,
35+
/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds)
36+
pub max_delay_seconds: u64,
6737
}

0 commit comments

Comments
 (0)