Skip to content

Commit cb8fb0a

Browse files
committed
feat(aggregation mode): add retry logic to batches download
1 parent 58d5b99 commit cb8fb0a

File tree

5 files changed

+162
-17
lines changed

5 files changed

+162
-17
lines changed

aggregation_mode/Cargo.lock

Lines changed: 26 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ reqwest = { version = "0.12" }
1818
ciborium = "=0.2.2"
1919
lambdaworks-crypto = { git = "https://github.com/lambdaclass/lambdaworks.git", rev = "5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b", features = ["serde"]}
2020
rayon = "1.10.0"
21+
backon = "1.2.0"
2122
# Necessary for the VerificationData type
2223
aligned-sdk = { path = "../crates/sdk/" }
2324
# zkvms

aggregation_mode/src/backend/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod fetcher;
33
mod merkle_tree;
44
mod s3;
55
mod types;
6+
mod retry;
67

78
use crate::aggregators::{AlignedProof, ProofAggregationError, ZKVMEngine};
89

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use backon::ExponentialBuilder;
2+
use backon::Retryable;
3+
use std::{future::Future, time::Duration};
4+
5+
#[derive(Debug)]
6+
pub enum RetryError<E> {
7+
Transient(E),
8+
Permanent(E),
9+
}
10+
11+
impl<E: std::fmt::Display> std::fmt::Display for RetryError<E> {
12+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
13+
match self {
14+
RetryError::Transient(e) => write!(f, "{}", e),
15+
RetryError::Permanent(e) => write!(f, "{}", e),
16+
}
17+
}
18+
}
19+
20+
impl<E> RetryError<E> {
21+
pub fn inner(self) -> E {
22+
match self {
23+
RetryError::Transient(e) => e,
24+
RetryError::Permanent(e) => e,
25+
}
26+
}
27+
}
28+
29+
impl<E: std::fmt::Display> std::error::Error for RetryError<E> where E: std::fmt::Debug {}
30+
31+
/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function
32+
/// Runs with `jitter: false`.
33+
///
34+
/// # Parameters
35+
/// * `function` - The async function to retry
36+
/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds)
37+
/// * `factor` - Exponential backoff multiplier for retry delays
38+
/// * `max_times` - Maximum number of retry attempts
39+
/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds)
40+
pub async fn retry_function<FutureFn, Fut, T, E>(
41+
function: FutureFn,
42+
min_delay_millis: u64,
43+
factor: f32,
44+
max_times: usize,
45+
max_delay_seconds: u64,
46+
) -> Result<T, RetryError<E>>
47+
where
48+
Fut: Future<Output = Result<T, RetryError<E>>>,
49+
FutureFn: FnMut() -> Fut,
50+
{
51+
let backoff = ExponentialBuilder::default()
52+
.with_min_delay(Duration::from_millis(min_delay_millis))
53+
.with_max_times(max_times)
54+
.with_factor(factor)
55+
.with_max_delay(Duration::from_secs(max_delay_seconds));
56+
57+
function
58+
.retry(backoff)
59+
.sleep(tokio::time::sleep)
60+
.when(|e| matches!(e, RetryError::Transient(_)))
61+
.await
62+
}

aggregation_mode/src/backend/s3.rs

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
use std::time::Duration;
12
use aligned_sdk::common::types::VerificationData;
3+
use tracing::{info, warn};
4+
use crate::backend::retry::{retry_function, RetryError};
25

36
#[derive(Debug)]
47
#[allow(dead_code)]
@@ -13,38 +16,92 @@ pub enum GetBatchProofsError {
1316
// needed to make S3 bucket work
1417
const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer";
1518

16-
pub async fn get_aligned_batch_from_s3(
19+
// Retry parameters for S3 requests
20+
/// Initial delay before first retry attempt (in milliseconds)
21+
const RETRY_MIN_DELAY_MILLIS: u64 = 500;
22+
/// Exponential backoff multiplier for retry delays
23+
const RETRY_FACTOR: f32 = 2.0;
24+
/// Maximum number of retry attempts
25+
const RETRY_MAX_TIMES: usize = 5;
26+
/// Maximum delay between retry attempts (in seconds)
27+
const RETRY_MAX_DELAY_SECONDS: u64 = 10;
28+
29+
/// Timeout for Reqwest Client
30+
const REQWEST_TIMEOUT_SECONDS: Duration = Duration::from_secs(60);
31+
32+
async fn get_aligned_batch_from_s3_retryable(
1733
url: String,
18-
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
34+
) -> Result<Vec<VerificationData>, RetryError<GetBatchProofsError>> {
35+
info!("Fetching batch from S3 URL: {}", url);
1936
let client = reqwest::Client::builder()
2037
.user_agent(DEFAULT_USER_AGENT)
38+
.timeout(REQWEST_TIMEOUT_SECONDS)
2139
.build()
22-
.map_err(|e| GetBatchProofsError::ReqwestClientFailed(e.to_string()))?;
40+
.map_err(|e| RetryError::Permanent(GetBatchProofsError::ReqwestClientFailed(e.to_string())))?;
2341

2442
let response = client
25-
.get(url)
43+
.get(&url)
2644
.send()
2745
.await
28-
.map_err(|e| GetBatchProofsError::FetchingS3Batch(e.to_string()))?;
46+
.map_err(|e| {
47+
warn!("Failed to send request to {}: {}", url, e);
48+
RetryError::Transient(GetBatchProofsError::FetchingS3Batch(e.to_string()))
49+
})?;
50+
2951
if !response.status().is_success() {
30-
return Err(GetBatchProofsError::StatusFailed((
31-
response.status().as_u16(),
32-
response
33-
.status()
34-
.canonical_reason()
35-
.unwrap_or("")
36-
.to_string(),
37-
)));
52+
let status_code = response.status().as_u16();
53+
let reason = response.status().canonical_reason().unwrap_or("").to_string();
54+
55+
// Determine if the error is retryable based on status code
56+
let error = GetBatchProofsError::StatusFailed((status_code, reason));
57+
return match status_code {
58+
// Client errors (4xx) are generally permanent, except for specific cases
59+
400..=499 => match status_code {
60+
408 | 429 => Err(RetryError::Transient(error)), // Request Timeout, Too Many Requests
61+
_ => Err(RetryError::Permanent(error)),
62+
},
63+
// Server errors (5xx) are generally transient
64+
500..=599 => Err(RetryError::Transient(error)),
65+
_ => Err(RetryError::Permanent(error)),
66+
};
3867
}
3968

4069
let bytes = response
4170
.bytes()
4271
.await
43-
.map_err(|e| GetBatchProofsError::EmptyBody(e.to_string()))?;
72+
.map_err(|e| {
73+
warn!("Failed to read response body from {}: {}", url, e);
74+
RetryError::Transient(GetBatchProofsError::EmptyBody(e.to_string()))
75+
})?;
4476
let bytes: &[u8] = bytes.iter().as_slice();
4577

4678
let data: Vec<VerificationData> = ciborium::from_reader(bytes)
47-
.map_err(|e| GetBatchProofsError::Deserialization(e.to_string()))?;
79+
.map_err(|e| {
80+
warn!("Failed to deserialize batch data from {}: {}", url, e);
81+
RetryError::Permanent(GetBatchProofsError::Deserialization(e.to_string()))
82+
})?;
4883

4984
Ok(data)
5085
}
86+
87+
/// Download batch from Storage Service using the provided URL.
88+
///
89+
/// Retries on recoverable errors using exponential backoff up to `RETRY_MAX_TIMES` times:
90+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
91+
pub async fn get_aligned_batch_from_s3(
92+
url: String,
93+
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
94+
let url_clone = url.clone();
95+
retry_function(
96+
move || {
97+
let url = url_clone.clone();
98+
get_aligned_batch_from_s3_retryable(url)
99+
},
100+
RETRY_MIN_DELAY_MILLIS,
101+
RETRY_FACTOR,
102+
RETRY_MAX_TIMES,
103+
RETRY_MAX_DELAY_SECONDS,
104+
)
105+
.await
106+
.map_err(|retry_err| retry_err.inner())
107+
}

0 commit comments

Comments
 (0)