Skip to content

Commit 34b2ece

Browse files
committed
refactor: comment backoff delays remove stale todos and set initial retry params
1 parent 2d4b876 commit 34b2ece

File tree

6 files changed

+58
-18
lines changed

6 files changed

+58
-18
lines changed

aggregation_mode/Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/db/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ edition = "2021"
66
[dependencies]
77
serde = { workspace = true }
88
tokio = { version = "1"}
9-
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] }
10-
backon = "1.2.0"
9+
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" , "uuid", "bigdecimal"] }
1110
tracing = { version = "0.1", features = ["log"] }
1211

1312
[[bin]]

aggregation_mode/db/src/orchestrator.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ enum Operation {
1818
}
1919

2020
/// A single DB node: connection pool plus shared health flags (used to prioritize nodes).
21-
2221
#[derive(Debug)]
2322
struct DbNode {
2423
pool: Pool<Postgres>,
@@ -128,16 +127,41 @@ impl DbOrchestartor {
128127

129128
tracing::warn!(attempt = attempts, delay_milis = delay.as_millis(), error = ?err, "retrying after backoff");
130129
tokio::time::sleep(delay).await;
131-
delay = self.backoff_delay(delay);
130+
delay = self.next_backoff_delay(delay);
132131
attempts += 1;
133132
}
134133
}
135134
}
136135
}
137136

138-
fn backoff_delay(&self, current: Duration) -> Duration {
139-
let max = Duration::from_secs(self.retry_config.max_delay_seconds);
140-
let scaled_secs = current.as_secs_f64() * f64::from(self.retry_config.factor);
137+
// Exponential backoff with a hard cap.
138+
//
139+
// Each retry multiplies the previous delay by `retry_config.factor`,
140+
// then clamps it to `max_delay_seconds`. This yields:
141+
//
142+
// d_{n+1} = min(max, d_n * factor) => d_n = min(max, d_initial * factor^n)
143+
//
144+
// Example starting at 500ms with factor = 2.0 (no jitter):
145+
// retry 0: 0.5s
146+
// retry 1: 1.0s
147+
// retry 2: 2.0s
148+
// retry 3: 4.0s
149+
// retry 4: 8.0s
150+
// ...
151+
// until the delay reaches `max_delay_seconds`, after which it stays at that max.
152+
// see reference: https://en.wikipedia.org/wiki/Exponential_backoff
153+
fn next_backoff_delay(&self, current_delay: Duration) -> Duration {
154+
let max: Duration = Duration::from_secs(self.retry_config.max_delay_seconds);
155+
// Defensive: factor should be >= 1.0 for backoff, we clamp it to avoid shrinking/NaN.
156+
let factor = f64::from(self.retry_config.factor).max(1.0);
157+
158+
let scaled_secs = current_delay.as_secs_f64() * factor;
159+
let scaled_secs = if scaled_secs.is_finite() {
160+
scaled_secs
161+
} else {
162+
max.as_secs_f64()
163+
};
164+
141165
let scaled = Duration::from_secs_f64(scaled_secs);
142166
if scaled > max {
143167
max

aggregation_mode/gateway/src/db.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,16 @@
1+
use crate::types::Receipt;
12
use db::{orchestrator::DbOrchestartor, retry::RetryConfig};
23
use sqlx::types::{BigDecimal, Uuid};
34

4-
use crate::types::Receipt;
5+
// Retry parameters for Db queries
6+
/// Initial delay before first retry attempt (in milliseconds)
7+
const RETRY_MIN_DELAY_MILLIS: u64 = 500;
8+
/// Exponential backoff multiplier for retry delays
9+
const RETRY_FACTOR: f32 = 2.0;
10+
/// Maximum number of retry attempts
11+
const RETRY_MAX_TIMES: usize = 4;
12+
/// Maximum delay between retry attempts (in seconds)
13+
const RETRY_MAX_DELAY_SECONDS: u64 = 10;
514

615
#[derive(Clone, Debug)]
716
pub struct Db {
@@ -18,10 +27,10 @@ impl Db {
1827
let orchestrator = DbOrchestartor::try_new(
1928
connection_urls,
2029
RetryConfig {
21-
factor: 0.0,
22-
max_delay_seconds: 0,
23-
max_times: 0,
24-
min_delay_millis: 0,
30+
min_delay_millis: RETRY_MIN_DELAY_MILLIS,
31+
factor: RETRY_FACTOR,
32+
max_times: RETRY_MAX_TIMES,
33+
max_delay_seconds: RETRY_MAX_DELAY_SECONDS,
2534
},
2635
)
2736
.map_err(|e| DbError::ConnectError(e.to_string()))?;

aggregation_mode/gateway/src/http.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ impl GatewayServer {
8484
.json(AppResponse::new_unsucessfull("Internal server error", 500));
8585
};
8686

87-
// TODO: how to fix the mutable thing
8887
let state = state.get_ref();
8988
match state.db.count_tasks_by_address(&address).await {
9089
Ok(count) => HttpResponse::Ok().json(AppResponse::new_sucessfull(serde_json::json!(

aggregation_mode/payments_poller/src/db.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
use db::{orchestrator::DbOrchestartor, retry::RetryConfig};
22
use sqlx::types::BigDecimal;
33

4+
// Retry parameters for Db queries
5+
/// Initial delay before first retry attempt (in milliseconds)
6+
const RETRY_MIN_DELAY_MILLIS: u64 = 500;
7+
/// Exponential backoff multiplier for retry delays
8+
const RETRY_FACTOR: f32 = 2.0;
9+
/// Maximum number of retry attempts
10+
const RETRY_MAX_TIMES: usize = 5;
11+
/// Maximum delay between retry attempts (in seconds)
12+
const RETRY_MAX_DELAY_SECONDS: u64 = 30;
13+
414
#[derive(Clone, Debug)]
515
pub struct Db {
616
orchestartor: DbOrchestartor,
@@ -16,10 +26,10 @@ impl Db {
1626
let orchestartor = DbOrchestartor::try_new(
1727
connection_urls,
1828
RetryConfig {
19-
factor: 0.0,
20-
max_delay_seconds: 0,
21-
max_times: 0,
22-
min_delay_millis: 0,
29+
min_delay_millis: RETRY_MIN_DELAY_MILLIS,
30+
factor: RETRY_FACTOR,
31+
max_times: RETRY_MAX_TIMES,
32+
max_delay_seconds: RETRY_MAX_DELAY_SECONDS,
2333
},
2434
)
2535
.map_err(|e| DbError::ConnectError(e.to_string()))?;

0 commit comments

Comments
 (0)