Skip to content

Commit 5e32525

Browse files
committed
feat: orchestrator write/read failure and preferred order by last successfull
1 parent 5340365 commit 5e32525

File tree

2 files changed

+123
-43
lines changed

2 files changed

+123
-43
lines changed

aggregation_mode/db/src/orchestrator.rs

Lines changed: 123 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
11
use std::{future::Future, time::Duration};
22

3-
use backon::{ExponentialBuilder, Retryable};
43
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
54

65
use crate::retry::{RetryConfig, RetryError};
76

8-
#[derive(Clone, Debug)]
7+
#[derive(Debug, Clone, Copy)]
8+
enum Operation {
9+
Read,
10+
Write,
11+
}
12+
13+
#[derive(Debug)]
914
struct DbNode {
1015
pool: Pool<Postgres>,
16+
last_read_failed: bool,
17+
last_write_failed: bool,
1118
}
1219

20+
#[derive(Debug)]
1321
pub struct DbOrchestartor {
1422
nodes: Vec<DbNode>,
1523
retry_config: RetryConfig,
@@ -34,7 +42,11 @@ impl DbOrchestartor {
3442
.map(|url| {
3543
let pool = PgPoolOptions::new().max_connections(5).connect_lazy(&url)?;
3644

37-
Ok(DbNode { pool })
45+
Ok(DbNode {
46+
pool,
47+
last_read_failed: false,
48+
last_write_failed: false,
49+
})
3850
})
3951
.collect::<Result<Vec<_>, sqlx::Error>>()
4052
.map_err(|e| DbOrchestartorError::Sqlx(e))?;
@@ -45,47 +57,124 @@ impl DbOrchestartor {
4557
})
4658
}
4759

48-
fn backoff_builder(&self) -> ExponentialBuilder {
49-
ExponentialBuilder::default()
50-
.with_min_delay(Duration::from_millis(self.retry_config.min_delay_millis))
51-
.with_max_times(self.retry_config.max_times)
52-
.with_factor(self.retry_config.factor)
53-
.with_max_delay(Duration::from_secs(self.retry_config.max_delay_seconds))
60+
pub async fn write<T, Q, Fut>(&mut self, query: Q) -> Result<T, sqlx::Error>
61+
where
62+
Q: Fn(Pool<Postgres>) -> Fut,
63+
Fut: Future<Output = Result<T, sqlx::Error>>,
64+
{
65+
self.query::<T, Q, Fut>(query, Operation::Write).await
5466
}
5567

56-
pub async fn query<T, E, Q, Fut>(&self, query_fn: Q) -> Result<T, sqlx::Error>
68+
pub async fn read<T, Q, Fut>(&mut self, query: Q) -> Result<T, sqlx::Error>
5769
where
5870
Q: Fn(Pool<Postgres>) -> Fut,
5971
Fut: Future<Output = Result<T, sqlx::Error>>,
6072
{
61-
let func = async || {
62-
let mut last_error = None;
63-
64-
for idx in 0..self.nodes.len() {
65-
let pool = self.nodes[idx].pool.clone();
66-
match query_fn(pool).await {
67-
Ok(res) => return Ok(res),
68-
Err(err) => {
69-
if Self::is_connection_error(&err) {
70-
tracing::warn!(node_index = idx, error = ?err, "database query failed; retrying");
71-
last_error = Some(err);
72-
} else {
73-
return Err(RetryError::Permanent(err));
74-
}
73+
self.query::<T, Q, Fut>(query, Operation::Read).await
74+
}
75+
76+
async fn query<T, Q, Fut>(
77+
&mut self,
78+
query_fn: Q,
79+
operation: Operation,
80+
) -> Result<T, sqlx::Error>
81+
where
82+
Q: Fn(Pool<Postgres>) -> Fut,
83+
Fut: Future<Output = Result<T, sqlx::Error>>,
84+
{
85+
let mut attempts = 0;
86+
let mut delay = Duration::from_millis(self.retry_config.min_delay_millis);
87+
88+
loop {
89+
match self.execute_once(&query_fn, operation).await {
90+
Ok(value) => return Ok(value),
91+
Err(RetryError::Permanent(err)) => return Err(err),
92+
Err(RetryError::Transient(err)) => {
93+
if attempts >= self.retry_config.max_delay_seconds {
94+
return Err(err);
7595
}
76-
};
96+
97+
tracing::warn!(attempt = attempts, delay_milis = delay.as_millis(), error = ?err, "retrying after backoff");
98+
tokio::time::sleep(delay).await;
99+
delay = self.backoff_delay(delay);
100+
attempts += 1;
101+
}
77102
}
103+
}
104+
}
105+
106+
fn backoff_delay(&self, current: Duration) -> Duration {
107+
let max = Duration::from_secs(self.retry_config.max_delay_seconds);
108+
let scaled_secs = current.as_secs_f64() * f64::from(self.retry_config.factor);
109+
let scaled = Duration::from_secs_f64(scaled_secs);
110+
if scaled > max {
111+
max
112+
} else {
113+
scaled
114+
}
115+
}
78116

79-
Err(RetryError::Transient(
80-
last_error.expect("write_op attempted without database nodes"),
81-
))
82-
};
117+
async fn execute_once<T, Q, Fut>(
118+
&mut self,
119+
query_fn: &Q,
120+
operation: Operation,
121+
) -> Result<T, RetryError<sqlx::Error>>
122+
where
123+
Q: Fn(Pool<Postgres>) -> Fut,
124+
Fut: Future<Output = Result<T, sqlx::Error>>,
125+
{
126+
let mut last_error = None;
127+
128+
for idx in self.preferred_order(operation) {
129+
let pool = self.nodes[idx].pool.clone();
130+
131+
match query_fn(pool).await {
132+
Ok(res) => {
133+
match operation {
134+
Operation::Read => self.nodes[idx].last_read_failed = false,
135+
Operation::Write => self.nodes[idx].last_write_failed = false,
136+
};
137+
return Ok(res);
138+
}
139+
Err(err) => {
140+
if Self::is_connection_error(&err) {
141+
tracing::warn!(node_index = idx, error = ?err, "database query failed");
142+
match operation {
143+
Operation::Read => self.nodes[idx].last_read_failed = true,
144+
Operation::Write => self.nodes[idx].last_write_failed = true,
145+
};
146+
last_error = Some(err);
147+
} else {
148+
return Err(RetryError::Permanent(err));
149+
}
150+
}
151+
};
152+
}
153+
154+
Err(RetryError::Transient(
155+
last_error.expect("write_op attempted without database nodes"),
156+
))
157+
}
158+
159+
fn preferred_order(&self, operation: Operation) -> Vec<usize> {
160+
let mut preferred = Vec::with_capacity(self.nodes.len());
161+
let mut fallback = Vec::new();
162+
163+
for (idx, node) in self.nodes.iter().enumerate() {
164+
let failed = match operation {
165+
Operation::Read => node.last_read_failed,
166+
Operation::Write => node.last_write_failed,
167+
};
168+
169+
if failed {
170+
fallback.push(idx);
171+
} else {
172+
preferred.push(idx);
173+
}
174+
}
83175

84-
func.retry(self.backoff_builder())
85-
.sleep(tokio::time::sleep)
86-
.when(|e| matches!(e, RetryError::Transient(_)))
87-
.await
88-
.map_err(|e| e.inner())
176+
preferred.extend(fallback);
177+
preferred
89178
}
90179

91180
fn is_connection_error(error: &sqlx::Error) -> bool {

aggregation_mode/db/src/retry.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,6 @@ impl<E: std::fmt::Display> std::fmt::Display for RetryError<E> {
1313
}
1414
}
1515

16-
impl<E> RetryError<E> {
17-
pub fn inner(self) -> E {
18-
match self {
19-
RetryError::Transient(e) => e,
20-
RetryError::Permanent(e) => e,
21-
}
22-
}
23-
}
24-
2516
impl<E: std::fmt::Display> std::error::Error for RetryError<E> where E: std::fmt::Debug {}
2617

2718
#[derive(Debug)]

0 commit comments

Comments
 (0)