Skip to content

Commit 1f4accb

Browse files
authored
feat(aggregation mode): add retry logic to batches download (#2046)
1 parent e345a37 commit 1f4accb

File tree

6 files changed

+170
-25
lines changed

6 files changed

+170
-25
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ verify_aggregated_proof_risc0:
298298
--rpc_url $(RPC_URL)
299299

300300
proof_aggregator_install: ## Install the aggregation mode with proving enabled
301-
cargo install --path aggregation_mode --features prove,gpu --bin proof_aggregator --locked
301+
cargo install --path aggregation_mode --features prove,gpu --bin proof_aggregator_gpu --locked
302302

303303
proof_aggregator_write_program_ids: ## Write proof aggregator zkvm programs ids
304304
@cd aggregation_mode && ./scripts/build_programs.sh

aggregation_mode/Cargo.lock

Lines changed: 26 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ reqwest = { version = "0.12" }
1818
ciborium = "=0.2.2"
1919
lambdaworks-crypto = { git = "https://github.com/lambdaclass/lambdaworks.git", rev = "5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b", features = ["serde"]}
2020
rayon = "1.10.0"
21+
backon = "1.2.0"
2122
# Necessary for the VerificationData type
2223
aligned-sdk = { path = "../crates/sdk/" }
2324
# zkvms

aggregation_mode/src/backend/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod config;
22
pub mod fetcher;
33
mod merkle_tree;
4+
mod retry;
45
mod s3;
56
mod types;
67

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use backon::ExponentialBuilder;
2+
use backon::Retryable;
3+
use std::{future::Future, time::Duration};
4+
5+
#[derive(Debug)]
6+
pub enum RetryError<E> {
7+
Transient(E),
8+
Permanent(E),
9+
}
10+
11+
impl<E: std::fmt::Display> std::fmt::Display for RetryError<E> {
12+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
13+
match self {
14+
RetryError::Transient(e) => write!(f, "{}", e),
15+
RetryError::Permanent(e) => write!(f, "{}", e),
16+
}
17+
}
18+
}
19+
20+
impl<E> RetryError<E> {
21+
pub fn inner(self) -> E {
22+
match self {
23+
RetryError::Transient(e) => e,
24+
RetryError::Permanent(e) => e,
25+
}
26+
}
27+
}
28+
29+
impl<E: std::fmt::Display> std::error::Error for RetryError<E> where E: std::fmt::Debug {}
30+
31+
/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function
32+
/// Runs with `jitter: false`.
33+
///
34+
/// # Parameters
35+
/// * `function` - The async function to retry
36+
/// * `min_delay_millis` - Initial delay before first retry attempt (in milliseconds)
37+
/// * `factor` - Exponential backoff multiplier for retry delays
38+
/// * `max_times` - Maximum number of retry attempts
39+
/// * `max_delay_seconds` - Maximum delay between retry attempts (in seconds)
40+
pub async fn retry_function<FutureFn, Fut, T, E>(
41+
function: FutureFn,
42+
min_delay_millis: u64,
43+
factor: f32,
44+
max_times: usize,
45+
max_delay_seconds: u64,
46+
) -> Result<T, RetryError<E>>
47+
where
48+
Fut: Future<Output = Result<T, RetryError<E>>>,
49+
FutureFn: FnMut() -> Fut,
50+
{
51+
let backoff = ExponentialBuilder::default()
52+
.with_min_delay(Duration::from_millis(min_delay_millis))
53+
.with_max_times(max_times)
54+
.with_factor(factor)
55+
.with_max_delay(Duration::from_secs(max_delay_seconds));
56+
57+
function
58+
.retry(backoff)
59+
.sleep(tokio::time::sleep)
60+
.when(|e| matches!(e, RetryError::Transient(_)))
61+
.await
62+
}

aggregation_mode/src/backend/s3.rs

Lines changed: 79 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
use crate::backend::retry::{retry_function, RetryError};
12
use aligned_sdk::common::types::VerificationData;
3+
use std::time::Duration;
24
use tracing::{info, warn};
35

46
#[derive(Debug)]
@@ -15,6 +17,22 @@ pub enum GetBatchProofsError {
1517
const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer";
1618
const MAX_BATCH_URLS: usize = 5;
1719

20+
// Retry parameters for S3 requests
21+
/// Initial delay before first retry attempt (in milliseconds)
22+
const RETRY_MIN_DELAY_MILLIS: u64 = 500;
23+
/// Exponential backoff multiplier for retry delays
24+
const RETRY_FACTOR: f32 = 2.0;
25+
/// Maximum number of retry attempts
26+
const RETRY_MAX_TIMES: usize = 5;
27+
/// Maximum delay between retry attempts (in seconds)
28+
const RETRY_MAX_DELAY_SECONDS: u64 = 10;
29+
30+
/// Timeout for establishing a connection to S3
31+
const CONNECT_TIMEOUT_SECONDS: Duration = Duration::from_secs(10);
32+
/// Timeout for Batch Download Requests
33+
const BATCH_DOWNLOAD_TIMEOUT_SECONDS: Duration = Duration::from_secs(5 * 60);
34+
35+
1836
// get_aligned_batch_from_s3_with_multiple_urls tries multiple comma-separated URLs until first successful response
1937
pub async fn get_aligned_batch_from_s3_with_multiple_urls(
2038
urls: String,
@@ -64,39 +82,78 @@ fn parse_batch_urls(batch_urls: &str) -> Vec<String> {
6482
urls
6583
}
6684

67-
pub async fn get_aligned_batch_from_s3(
85+
async fn get_aligned_batch_from_s3_retryable(
6886
url: String,
69-
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
87+
) -> Result<Vec<VerificationData>, RetryError<GetBatchProofsError>> {
7088
info!("Fetching batch from S3 URL: {}", url);
7189
let client = reqwest::Client::builder()
7290
.user_agent(DEFAULT_USER_AGENT)
91+
.connect_timeout(CONNECT_TIMEOUT_SECONDS)
92+
.timeout(BATCH_DOWNLOAD_TIMEOUT_SECONDS)
7393
.build()
74-
.map_err(|e| GetBatchProofsError::ReqwestClientFailed(e.to_string()))?;
94+
.map_err(|e| {
95+
RetryError::Permanent(GetBatchProofsError::ReqwestClientFailed(e.to_string()))
96+
})?;
97+
98+
let response = client.get(&url).send().await.map_err(|e| {
99+
warn!("Failed to send request to {}: {}", url, e);
100+
RetryError::Transient(GetBatchProofsError::FetchingS3Batch(e.to_string()))
101+
})?;
75102

76-
let response = client
77-
.get(url)
78-
.send()
79-
.await
80-
.map_err(|e| GetBatchProofsError::FetchingS3Batch(e.to_string()))?;
81103
if !response.status().is_success() {
82-
return Err(GetBatchProofsError::StatusFailed((
83-
response.status().as_u16(),
84-
response
85-
.status()
86-
.canonical_reason()
87-
.unwrap_or("")
88-
.to_string(),
89-
)));
104+
let status_code = response.status().as_u16();
105+
let reason = response
106+
.status()
107+
.canonical_reason()
108+
.unwrap_or("")
109+
.to_string();
110+
111+
// Determine if the error is retryable based on status code
112+
let error = GetBatchProofsError::StatusFailed((status_code, reason));
113+
return match status_code {
114+
// Client errors (4xx) are generally permanent, except for specific cases
115+
400..=499 => match status_code {
116+
408 | 429 => Err(RetryError::Transient(error)), // Request Timeout, Too Many Requests
117+
_ => Err(RetryError::Permanent(error)),
118+
},
119+
// Server errors (5xx) are generally transient
120+
500..=599 => Err(RetryError::Transient(error)),
121+
_ => Err(RetryError::Permanent(error)),
122+
};
90123
}
91124

92-
let bytes = response
93-
.bytes()
94-
.await
95-
.map_err(|e| GetBatchProofsError::EmptyBody(e.to_string()))?;
125+
let bytes = response.bytes().await.map_err(|e| {
126+
warn!("Failed to read response body from {}: {}", url, e);
127+
RetryError::Transient(GetBatchProofsError::EmptyBody(e.to_string()))
128+
})?;
96129
let bytes: &[u8] = bytes.iter().as_slice();
97130

98-
let data: Vec<VerificationData> = ciborium::from_reader(bytes)
99-
.map_err(|e| GetBatchProofsError::Deserialization(e.to_string()))?;
131+
let data: Vec<VerificationData> = ciborium::from_reader(bytes).map_err(|e| {
132+
warn!("Failed to deserialize batch data from {}: {}", url, e);
133+
RetryError::Permanent(GetBatchProofsError::Deserialization(e.to_string()))
134+
})?;
100135

101136
Ok(data)
102137
}
138+
139+
/// Download batch from Storage Service using the provided URL.
140+
///
141+
/// Retries on recoverable errors using exponential backoff up to `RETRY_MAX_TIMES` times:
142+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
143+
pub async fn get_aligned_batch_from_s3(
144+
url: String,
145+
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
146+
let url_clone = url.clone();
147+
retry_function(
148+
move || {
149+
let url = url_clone.clone();
150+
get_aligned_batch_from_s3_retryable(url)
151+
},
152+
RETRY_MIN_DELAY_MILLIS,
153+
RETRY_FACTOR,
154+
RETRY_MAX_TIMES,
155+
RETRY_MAX_DELAY_SECONDS,
156+
)
157+
.await
158+
.map_err(|retry_err| retry_err.inner())
159+
}

0 commit comments

Comments
 (0)