Skip to content

Commit 87a425e

Browse files
committed
Merge
2 parents fc93d60 + e345a37 commit 87a425e

File tree

18 files changed

+511
-48
lines changed

18 files changed

+511
-48
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ crates/cli/batch_inclusion_responses/*
1212
**/aligned_verification_data
1313
**/broadcast
1414
volume
15+
volume2
1516
config-files/*.last_processed_batch.json
1617
config-files/*.last_aggregated_block.json
1718

aggregation_mode/src/backend/fetcher.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
risc0_aggregator::Risc0ProofReceiptAndImageId, sp1_aggregator::SP1ProofWithPubValuesAndElf,
1010
AlignedProof, ZKVMEngine,
1111
},
12-
backend::s3::get_aligned_batch_from_s3,
12+
backend::s3::get_aligned_batch_from_s3_with_multiple_urls,
1313
};
1414
use aligned_sdk::common::types::ProvingSystemId;
1515
use alloy::{
@@ -97,13 +97,14 @@ impl ProofsFetcher {
9797
);
9898

9999
// Download batch proofs from s3
100-
let data = match get_aligned_batch_from_s3(batch.batchDataPointer).await {
101-
Ok(data) => data,
102-
Err(err) => {
103-
error!("Error while downloading proofs from s3. Err {:?}", err);
104-
continue;
105-
}
106-
};
100+
let data =
101+
match get_aligned_batch_from_s3_with_multiple_urls(batch.batchDataPointer).await {
102+
Ok(data) => data,
103+
Err(err) => {
104+
error!("Error while downloading proofs from s3. Err {:?}", err);
105+
continue;
106+
}
107+
};
107108

108109
info!("Data downloaded from S3, number of proofs {}", data.len());
109110

aggregation_mode/src/backend/s3.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use aligned_sdk::common::types::VerificationData;
2+
use tracing::{info, warn};
23

34
#[derive(Debug)]
45
#[allow(dead_code)]
@@ -12,10 +13,61 @@ pub enum GetBatchProofsError {
1213

1314
// needed to make S3 bucket work
1415
const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer";
16+
const MAX_BATCH_URLS: usize = 5;
17+
18+
// get_aligned_batch_from_s3_with_multiple_urls tries multiple comma-separated URLs until first successful response
19+
pub async fn get_aligned_batch_from_s3_with_multiple_urls(
20+
urls: String,
21+
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
22+
// Parse comma-separated URLs and limit to max 5
23+
let parsed_urls = parse_batch_urls(&urls);
24+
info!(
25+
"Getting batch from data service with {} URLs: {:?}",
26+
parsed_urls.len(),
27+
parsed_urls
28+
);
29+
30+
let mut errors = Vec::new();
31+
32+
// Try each URL until first successful response
33+
for url in parsed_urls.iter() {
34+
match get_aligned_batch_from_s3(url.clone()).await {
35+
Ok(data) => {
36+
return Ok(data);
37+
}
38+
Err(err) => {
39+
warn!("Failed to fetch batch from URL {}: {:?}", url, err);
40+
errors.push(format!("URL {}: {:?}", url, err));
41+
}
42+
}
43+
}
44+
45+
// All URLs failed
46+
Err(GetBatchProofsError::FetchingS3Batch(format!(
47+
"Failed to get batch from all URLs, errors: {}",
48+
errors.join("; ")
49+
)))
50+
}
51+
52+
// parse_batch_urls parses comma-separated URLs and limits to max 5
53+
fn parse_batch_urls(batch_urls: &str) -> Vec<String> {
54+
let mut urls = Vec::new();
55+
for url in batch_urls.split(',') {
56+
let trimmed_url = url.trim();
57+
if !trimmed_url.is_empty() {
58+
urls.push(trimmed_url.to_string());
59+
if urls.len() > MAX_BATCH_URLS {
60+
break;
61+
}
62+
}
63+
}
64+
urls
65+
}
1566

1667
pub async fn get_aligned_batch_from_s3(
1768
url: String,
1869
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
70+
info!("Fetching batch from S3 URL: {}", url);
1971
let client = reqwest::Client::builder()
2072
.user_agent(DEFAULT_USER_AGENT)
2173
.build()

crates/batcher/.env.dev

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
1+
# Main S3 bucket configuration
12
AWS_SECRET_ACCESS_KEY=test
23
AWS_REGION=us-east-2
34
AWS_ACCESS_KEY_ID=test
45
AWS_BUCKET_NAME=aligned.storage
56
UPLOAD_ENDPOINT=http://localhost:4566
67
DOWNLOAD_ENDPOINT=http://localhost:4566/aligned.storage
8+
9+
# Secondary S3 bucket configuration
10+
AWS_SECRET_ACCESS_KEY_SECONDARY=test2
11+
AWS_REGION_SECONDARY=us-west-1
12+
AWS_ACCESS_KEY_ID_SECONDARY=test2
13+
AWS_BUCKET_NAME_SECONDARY=aligned.storage
14+
UPLOAD_ENDPOINT_SECONDARY=http://localhost:4567
15+
DOWNLOAD_ENDPOINT_SECONDARY=http://localhost:4567/aligned.storage
16+
717
RUST_LOG=info
818
RUST_BACKTRACE=1

crates/batcher/.env.docker

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
1+
# Main S3 bucket configuration
12
AWS_SECRET_ACCESS_KEY=test
23
AWS_REGION=us-east-2
34
AWS_ACCESS_KEY_ID=test
45
AWS_BUCKET_NAME=aligned.storage
56
UPLOAD_ENDPOINT=http://localstack:4566
67
DOWNLOAD_ENDPOINT=http://localstack:4566/aligned.storage
8+
9+
# Secondary S3 bucket configuration
10+
AWS_SECRET_ACCESS_KEY_SECONDARY=test2
11+
AWS_REGION_SECONDARY=us-west-1
12+
AWS_ACCESS_KEY_ID_SECONDARY=test2
13+
AWS_BUCKET_NAME_SECONDARY=aligned.storage
14+
UPLOAD_ENDPOINT_SECONDARY=http://localstack2:4567
15+
DOWNLOAD_ENDPOINT_SECONDARY=http://localstack2:4567/aligned.storage
16+
717
RUST_LOG=info
818
RUST_BACKTRACE=1

crates/batcher/.env.example

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
1+
# Main S3 bucket configuration
12
AWS_SECRET_ACCESS_KEY=<secret_access_key>
23
AWS_REGION=<region>
34
AWS_ACCESS_KEY_ID=<access_key_id>
45
AWS_BUCKET_NAME=<bucket_name>
56
UPLOAD_ENDPOINT=<upload_endpoint. For non-aws storage>
67
DOWNLOAD_ENDPOINT=<download_endpoint>
8+
9+
# Secondary S3 bucket configuration
10+
AWS_SECRET_ACCESS_KEY_SECONDARY=<secondary_secret_access_key>
11+
AWS_REGION_SECONDARY=<secondary_region>
12+
AWS_ACCESS_KEY_ID_SECONDARY=<secondary_access_key_id>
13+
AWS_BUCKET_NAME_SECONDARY=<secondary_bucket_name>
14+
UPLOAD_ENDPOINT_SECONDARY=<secondary_upload_endpoint. For non-aws storage>
15+
DOWNLOAD_ENDPOINT_SECONDARY=<secondary_download_endpoint>
16+
717
RUST_LOG=<log_level> # info, debug, error, warn, etc.

crates/batcher/src/lib.rs

Lines changed: 124 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ pub struct Batcher {
7777
s3_client: S3Client,
7878
s3_bucket_name: String,
7979
download_endpoint: String,
80+
s3_client_secondary: Option<S3Client>,
81+
s3_bucket_name_secondary: Option<String>,
82+
download_endpoint_secondary: Option<String>,
8083
eth_ws_url: String,
8184
eth_ws_url_fallback: String,
8285
batcher_signer: Arc<SignerMiddlewareT>,
@@ -129,15 +132,40 @@ impl Batcher {
129132
dotenv().ok();
130133

131134
// https://docs.aws.amazon.com/sdk-for-rust/latest/dg/localstack.html
132-
let upload_endpoint = env::var("UPLOAD_ENDPOINT").ok();
135+
// Primary S3 configuration
136+
let s3_config_primary = s3::S3Config {
137+
access_key_id: env::var("AWS_ACCESS_KEY_ID").ok(),
138+
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").ok(),
139+
region: env::var("AWS_REGION").ok(),
140+
endpoint_url: env::var("UPLOAD_ENDPOINT").ok(),
141+
};
133142

134143
let s3_bucket_name =
135144
env::var("AWS_BUCKET_NAME").expect("AWS_BUCKET_NAME not found in environment");
136145

137146
let download_endpoint =
138147
env::var("DOWNLOAD_ENDPOINT").expect("DOWNLOAD_ENDPOINT not found in environment");
139148

140-
let s3_client = s3::create_client(upload_endpoint).await;
149+
let s3_client = s3::create_client(s3_config_primary).await;
150+
151+
// Secondary S3 configuration (optional)
152+
let s3_bucket_name_secondary = env::var("AWS_BUCKET_NAME_SECONDARY").ok();
153+
let download_endpoint_secondary = env::var("DOWNLOAD_ENDPOINT_SECONDARY").ok();
154+
155+
let s3_client_secondary = if s3_bucket_name_secondary.is_some()
156+
&& download_endpoint_secondary.is_some()
157+
{
158+
let s3_config_secondary = s3::S3Config {
159+
access_key_id: env::var("AWS_ACCESS_KEY_ID_SECONDARY").ok(),
160+
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY_SECONDARY").ok(),
161+
region: env::var("AWS_REGION_SECONDARY").ok(),
162+
endpoint_url: env::var("UPLOAD_ENDPOINT_SECONDARY").ok(),
163+
};
164+
Some(s3::create_client(s3_config_secondary).await)
165+
} else {
166+
info!("Secondary S3 configuration not found or incomplete. Operating with primary S3 only.");
167+
None
168+
};
141169

142170
let config = ConfigFromYaml::new(config_file);
143171
// Ensure max_batch_bytes_size can at least hold one proof of max_proof_size,
@@ -273,6 +301,9 @@ impl Batcher {
273301
s3_client,
274302
s3_bucket_name,
275303
download_endpoint,
304+
s3_client_secondary,
305+
s3_bucket_name_secondary,
306+
download_endpoint_secondary,
276307
eth_ws_url: config.eth_ws_url,
277308
eth_ws_url_fallback: config.eth_ws_url_fallback,
278309
batcher_signer,
@@ -1872,7 +1903,18 @@ impl Batcher {
18721903
let batch_merkle_root_hex = hex::encode(batch_merkle_root);
18731904
info!("Batch merkle root: 0x{}", batch_merkle_root_hex);
18741905
let file_name = batch_merkle_root_hex.clone() + ".json";
1875-
let batch_data_pointer: String = "".to_owned() + &self.download_endpoint + "/" + &file_name;
1906+
1907+
let batch_data_pointer = self
1908+
.upload_batch_to_multiple_s3(batch_bytes, &file_name)
1909+
.await?;
1910+
if let Err(e) = self
1911+
.telemetry
1912+
.task_uploaded_to_s3(&batch_merkle_root_hex)
1913+
.await
1914+
{
1915+
warn!("Failed to send task status to telemetry: {:?}", e);
1916+
};
1917+
info!("Batch upload to: {}", batch_data_pointer);
18761918

18771919
let num_proofs_in_batch = leaves.len();
18781920
let gas_per_proof = (self.constant_gas_cost()
@@ -1908,16 +1950,6 @@ impl Batcher {
19081950
.gas_price_used_on_latest_batch
19091951
.set(gas_price.as_u64() as i64);
19101952

1911-
info!("Uploading batch to S3...");
1912-
self.upload_batch_to_s3(batch_bytes, &file_name).await?;
1913-
if let Err(e) = self
1914-
.telemetry
1915-
.task_uploaded_to_s3(&batch_merkle_root_hex)
1916-
.await
1917-
{
1918-
warn!("Failed to send task status to telemetry: {:?}", e);
1919-
};
1920-
info!("Batch sent to S3 with name: {}", file_name);
19211953
if let Err(e) = self
19221954
.telemetry
19231955
.task_created(
@@ -2188,22 +2220,99 @@ impl Batcher {
21882220
unlocked
21892221
}
21902222

2223+
/// Uploads the batch to both S3 buckets and returns the comma-separated URLs of successful uploads.
2224+
/// Returns an error only if all uploads fail.
2225+
async fn upload_batch_to_multiple_s3(
2226+
&self,
2227+
batch_bytes: &[u8],
2228+
file_name: &str,
2229+
) -> Result<String, BatcherError> {
2230+
// Upload to both S3 buckets and collect successful URLs
2231+
let mut successful_urls = Vec::new();
2232+
2233+
// Try primary S3 upload
2234+
if self
2235+
.upload_batch_to_s3(
2236+
&self.s3_client,
2237+
batch_bytes,
2238+
file_name,
2239+
&self.s3_bucket_name,
2240+
)
2241+
.await
2242+
.is_ok()
2243+
{
2244+
let primary_url = format!("{}/{}", self.download_endpoint, file_name);
2245+
successful_urls.push(primary_url.clone());
2246+
info!("Successfully uploaded batch to primary S3: {}", primary_url);
2247+
} else {
2248+
warn!("Failed to upload batch to primary S3");
2249+
}
2250+
2251+
// Try secondary S3 upload (if configured)
2252+
if let (
2253+
Some(s3_client_secondary),
2254+
Some(s3_bucket_name_secondary),
2255+
Some(download_endpoint_secondary),
2256+
) = (
2257+
&self.s3_client_secondary,
2258+
&self.s3_bucket_name_secondary,
2259+
&self.download_endpoint_secondary,
2260+
) {
2261+
if self
2262+
.upload_batch_to_s3(
2263+
s3_client_secondary,
2264+
batch_bytes,
2265+
file_name,
2266+
s3_bucket_name_secondary,
2267+
)
2268+
.await
2269+
.is_ok()
2270+
{
2271+
let secondary_url = format!("{}/{}", download_endpoint_secondary, file_name);
2272+
successful_urls.push(secondary_url.clone());
2273+
info!(
2274+
"Successfully uploaded batch to secondary S3: {}",
2275+
secondary_url
2276+
);
2277+
} else {
2278+
warn!("Failed to upload batch to secondary S3");
2279+
}
2280+
}
2281+
2282+
// Update metrics with number of available data services
2283+
self.metrics
2284+
.available_data_services
2285+
.set(successful_urls.len() as i64);
2286+
2287+
// If no uploads succeeded, return error
2288+
if successful_urls.is_empty() {
2289+
error!("Failed to upload batch to both S3 buckets");
2290+
return Err(BatcherError::BatchUploadError(
2291+
"Failed to upload to any S3 bucket".to_string(),
2292+
));
2293+
}
2294+
2295+
Ok(successful_urls.join(","))
2296+
}
2297+
21912298
/// Uploads the batch to s3.
21922299
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
21932300
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
21942301
async fn upload_batch_to_s3(
21952302
&self,
2303+
s3_client: &S3Client,
21962304
batch_bytes: &[u8],
21972305
file_name: &str,
2306+
bucket_name: &str,
21982307
) -> Result<(), BatcherError> {
21992308
let start = Instant::now();
22002309
let result = retry_function(
22012310
|| {
22022311
Self::upload_batch_to_s3_retryable(
22032312
batch_bytes,
22042313
file_name,
2205-
self.s3_client.clone(),
2206-
&self.s3_bucket_name,
2314+
s3_client.clone(),
2315+
bucket_name,
22072316
)
22082317
},
22092318
ETHEREUM_CALL_MIN_RETRY_DELAY,

0 commit comments

Comments
 (0)