Skip to content

Commit 5794215

Browse files
committed
feat: db orchestrator design
1 parent d623ed0 commit 5794215

File tree

5 files changed

+157
-2
lines changed

5 files changed

+157
-2
lines changed

aggregation_mode/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/db/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,8 @@ edition = "2021"
55

66
[dependencies]
77
tokio = { version = "1"}
8-
# TODO: enable tls
98
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] }
10-
9+
backon = "1.2.0"
1110

1211
[[bin]]
1312
name = "migrate"

aggregation_mode/db/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1+
pub mod orchestrator;
2+
mod retry;
13
pub mod types;
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
use std::{future::Future, time::Duration};
2+
3+
use backon::{ExponentialBuilder, Retryable};
4+
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
5+
6+
use crate::retry::RetryError;
7+
8+
#[derive(Clone, Debug)]
9+
struct DbNode {
10+
pool: Pool<Postgres>,
11+
}
12+
13+
pub struct DbOrchestartor {
14+
nodes: Vec<DbNode>,
15+
}
16+
17+
impl DbOrchestartor {
18+
pub fn try_new(connection_urls: Vec<String>) -> Result<Self, sqlx::Error> {
19+
// TODO: validate at least one connection url
20+
let nodes = connection_urls
21+
.into_iter()
22+
.map(|url| {
23+
let pool = PgPoolOptions::new().max_connections(5).connect_lazy(&url)?;
24+
25+
Ok(DbNode { pool })
26+
})
27+
.collect::<Result<Vec<_>, sqlx::Error>>()?;
28+
29+
Ok(Self { nodes })
30+
}
31+
32+
fn backoff_builder(&self) -> ExponentialBuilder {
33+
ExponentialBuilder::default()
34+
.with_min_delay(Duration::from_millis(0))
35+
.with_max_times(0)
36+
.with_factor(0.0)
37+
.with_max_delay(Duration::from_secs(0))
38+
}
39+
40+
pub async fn query<T, E, Q, Fut>(&self, query_fn: Q) -> Result<T, sqlx::Error>
41+
where
42+
Q: Fn(Pool<Postgres>) -> Fut,
43+
Fut: Future<Output = Result<T, sqlx::Error>>,
44+
{
45+
let func = async || {
46+
let mut last_error = None;
47+
48+
for idx in 0..self.nodes.len() {
49+
let pool = self.nodes[idx].pool.clone();
50+
match query_fn(pool).await {
51+
Ok(res) => return Ok(res),
52+
Err(err) => {
53+
if Self::is_connection_error(&err) {
54+
last_error = Some(err);
55+
} else {
56+
return Err(RetryError::Permanent(err));
57+
}
58+
}
59+
};
60+
}
61+
62+
Err(RetryError::Transient(
63+
last_error.expect("write_op attempted without database nodes"),
64+
))
65+
};
66+
67+
func.retry(self.backoff_builder())
68+
.sleep(tokio::time::sleep)
69+
.when(|e| matches!(e, RetryError::Transient(_)))
70+
.await
71+
.map_err(|e| e.inner())
72+
}
73+
74+
fn is_connection_error(error: &sqlx::Error) -> bool {
75+
matches!(
76+
error,
77+
sqlx::Error::Io(_)
78+
| sqlx::Error::Tls(_)
79+
| sqlx::Error::Protocol(_)
80+
| sqlx::Error::PoolTimedOut
81+
| sqlx::Error::PoolClosed
82+
| sqlx::Error::WorkerCrashed
83+
| sqlx::Error::BeginFailed
84+
)
85+
}
86+
}

aggregation_mode/db/src/retry.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
use backon::ExponentialBuilder;
2+
use backon::Retryable;
3+
use std::{future::Future, time::Duration};
4+
5+
#[derive(Debug)]
6+
pub enum RetryError<E> {
7+
Transient(E),
8+
Permanent(E),
9+
}
10+
11+
impl<E: std::fmt::Display> std::fmt::Display for RetryError<E> {
12+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
13+
match self {
14+
RetryError::Transient(e) => write!(f, "{e}"),
15+
RetryError::Permanent(e) => write!(f, "{e}"),
16+
}
17+
}
18+
}
19+
20+
impl<E> RetryError<E> {
21+
pub fn inner(self) -> E {
22+
match self {
23+
RetryError::Transient(e) => e,
24+
RetryError::Permanent(e) => e,
25+
}
26+
}
27+
}
28+
29+
impl<E: std::fmt::Display> std::error::Error for RetryError<E> where E: std::fmt::Debug {}
30+
31+
/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function
32+
/// Runs with `jitter: false`.
33+
///
34+
/// # Parameters
35+
/// * `function` - The async function to retry
36+
/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds)
37+
/// * `factor` - Exponential backoff multiplier for retry delays
38+
/// * `max_times` - Maximum number of retry attempts
39+
/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds)
40+
pub async fn retry_function<FutureFn, Fut, T, E, A>(
41+
mut function: FutureFn,
42+
min_delay_millis: u64,
43+
factor: f32,
44+
max_times: usize,
45+
max_delay_seconds: u64,
46+
) -> Result<T, RetryError<E>>
47+
where
48+
Fut: Future<Output = Result<T, RetryError<E>>>,
49+
FutureFn: FnMut() -> Fut,
50+
{
51+
let backoff = ExponentialBuilder::default()
52+
.with_min_delay(Duration::from_millis(min_delay_millis))
53+
.with_max_times(max_times)
54+
.with_factor(factor)
55+
.with_max_delay(Duration::from_secs(max_delay_seconds));
56+
57+
let func = async || match function().await {
58+
Ok(res) => Ok(res),
59+
Err(e) => Err(e),
60+
};
61+
62+
function
63+
.retry(backoff)
64+
.sleep(tokio::time::sleep)
65+
.when(|e| matches!(e, RetryError::Transient(_)))
66+
.await
67+
}

0 commit comments

Comments
 (0)