Skip to content

Commit 77ab1b5

Browse files
authored
Merge pull request #118 from seanwevans/codex/replace-multiplication-with-clamped-calculation
Clamp messaging retry backoff delay
2 parents 90cc4ac + 53b533b commit 77ab1b5

File tree

3 files changed

+36
-5
lines changed

3 files changed

+36
-5
lines changed

src/an_node.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ pub async fn run() -> Result<(), Box<dyn Error>> {
121121
.unwrap_or_else(|_| "500".into())
122122
.parse()
123123
.unwrap_or(500);
124+
let max_backoff_ms: u64 = std::env::var("AMQP_RECONNECT_MAX_BACKOFF_MS")
125+
.unwrap_or_else(|_| "5000".into())
126+
.parse()
127+
.unwrap_or(5_000);
124128

125129
let queue_name = "an_task_queue";
126130
let consumer_tag = "an_consumer";
@@ -132,6 +136,7 @@ pub async fn run() -> Result<(), Box<dyn Error>> {
132136
consumer_tag,
133137
max_retries,
134138
backoff_ms,
139+
max_backoff_ms,
135140
)
136141
.await?;
137142

@@ -212,8 +217,14 @@ mod tests {
212217
#[cfg(feature = "integration-tests")]
213218
#[tokio::test]
214219
async fn test_setup_consumer_workflow() {
215-
let (channel, mut consumer) =
216-
messaging::connect_with_retries(AMQP_ADDR, "test_queue", "test_consumer", 1, 10)
220+
let (channel, mut consumer) = messaging::connect_with_retries(
221+
AMQP_ADDR,
222+
"test_queue",
223+
"test_consumer",
224+
1,
225+
10,
226+
10_000,
227+
)
217228
.await
218229
.expect("setup");
219230

src/ki_node.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ pub async fn run() -> Result<(), Box<dyn Error>> {
6565
.unwrap_or_else(|_| "500".into())
6666
.parse()
6767
.unwrap_or(500);
68+
let max_backoff_ms: u64 = std::env::var("AMQP_RECONNECT_MAX_BACKOFF_MS")
69+
.unwrap_or_else(|_| "5000".into())
70+
.parse()
71+
.unwrap_or(5_000);
6872

6973
let queue_name = "ki_task_queue";
7074
let consumer_tag = "ki_consumer";
@@ -80,6 +84,7 @@ pub async fn run() -> Result<(), Box<dyn Error>> {
8084
consumer_tag,
8185
max_retries,
8286
backoff_ms,
87+
max_backoff_ms,
8388
)
8489
.await?;
8590

@@ -89,6 +94,7 @@ pub async fn run() -> Result<(), Box<dyn Error>> {
8994
model_consumer_tag,
9095
max_retries,
9196
backoff_ms,
97+
max_backoff_ms,
9298
)
9399
.await?;
94100

src/messaging.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub async fn connect_with_retries(
8282
consumer_tag: &str,
8383
max_retries: u32,
8484
backoff_ms: u64,
85+
max_delay_ms: u64,
8586
) -> Result<(Channel, lapin::Consumer), Box<dyn Error>> {
8687
let mut attempts = 0u32;
8788
loop {
@@ -105,7 +106,11 @@ pub async fn connect_with_retries(
105106
error!("Exceeded maximum reconnection attempts.");
106107
return Err(e);
107108
}
108-
let delay = backoff_ms * 2u64.pow(attempts - 1);
109+
let attempt_factor = attempts.saturating_sub(1);
110+
let multiplier = 2u64.saturating_pow(attempt_factor);
111+
let delay = backoff_ms
112+
.saturating_mul(multiplier)
113+
.min(max_delay_ms);
109114
sleep(Duration::from_millis(delay)).await;
110115
}
111116
}
@@ -165,14 +170,23 @@ mod tests {
165170

166171
#[tokio::test]
167172
async fn test_connect_with_retries_failure() {
168-
let result = connect_with_retries("amqp://invalid:5672/%2f", "queue", "tag", 3, 10).await;
173+
let result = connect_with_retries("amqp://invalid:5672/%2f", "queue", "tag", 3, 10, 20)
174+
.await;
175+
assert!(result.is_err());
176+
}
177+
178+
#[tokio::test]
179+
async fn test_connect_with_retries_hits_max_delay_without_panic() {
180+
let result = connect_with_retries("amqp://invalid:5672/%2f", "queue", "tag", 5, 4, 8)
181+
.await;
169182
assert!(result.is_err());
170183
}
171184

172185
#[cfg(feature = "integration-tests")]
173186
#[tokio::test]
174187
async fn test_connect_with_retries_success() {
175-
let res = connect_with_retries(AMQP_ADDR, "test_queue_retry", "test_tag", 1, 10).await;
188+
let res =
189+
connect_with_retries(AMQP_ADDR, "test_queue_retry", "test_tag", 1, 10, 10_000).await;
176190
match res {
177191
Ok((channel, consumer)) => {
178192
// drop consumer to close

0 commit comments

Comments
 (0)