Skip to content

Commit eb1ed8b

Browse files
committed
feat: upload batches to multiple storage services
1 parent 58d5b99 commit eb1ed8b

File tree

13 files changed

+328
-32
lines changed

13 files changed

+328
-32
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ crates/cli/batch_inclusion_responses/*
1212
**/aligned_verification_data
1313
**/broadcast
1414
volume
15+
volume2
1516
config-files/*.last_processed_batch.json
1617
config-files/*.last_aggregated_block.json
1718

aggregation_mode/src/backend/fetcher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
risc0_aggregator::Risc0ProofReceiptAndImageId, sp1_aggregator::SP1ProofWithPubValuesAndElf,
1010
AlignedProof, ZKVMEngine,
1111
},
12-
backend::s3::get_aligned_batch_from_s3,
12+
backend::s3::get_aligned_batch_from_s3_with_multiple_urls,
1313
};
1414
use aligned_sdk::common::types::ProvingSystemId;
1515
use alloy::{
@@ -97,7 +97,7 @@ impl ProofsFetcher {
9797
);
9898

9999
// Download batch proofs from s3
100-
let data = match get_aligned_batch_from_s3(batch.batchDataPointer).await {
100+
let data = match get_aligned_batch_from_s3_with_multiple_urls(batch.batchDataPointer).await {
101101
Ok(data) => data,
102102
Err(err) => {
103103
error!("Error while downloading proofs from s3. Err {:?}", err);

aggregation_mode/src/backend/s3.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use tracing::{info, warn};
12
use aligned_sdk::common::types::VerificationData;
23

34
#[derive(Debug)]
@@ -12,10 +13,57 @@ pub enum GetBatchProofsError {
1213

1314
// needed to make S3 bucket work
1415
const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer";
16+
const MAX_BATCH_URLS: usize = 5;
17+
18+
// get_aligned_batch_from_s3_with_multiple_urls tries multiple comma-separated URLs until first successful response
19+
pub async fn get_aligned_batch_from_s3_with_multiple_urls(
20+
urls: String,
21+
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
22+
// Parse comma-separated URLs and limit to max 5
23+
let parsed_urls = parse_batch_urls(&urls);
24+
info!("Getting batch from data service with {} URLs: {:?}", parsed_urls.len(), parsed_urls);
25+
26+
let mut errors = Vec::new();
27+
28+
// Try each URL until first successful response
29+
for (_i, url) in parsed_urls.iter().enumerate() {
30+
match get_aligned_batch_from_s3(url.clone()).await {
31+
Ok(data) => {
32+
return Ok(data);
33+
}
34+
Err(err) => {
35+
warn!("Failed to fetch batch from URL {}: {:?}", url, err);
36+
errors.push(format!("URL {}: {:?}", url, err));
37+
}
38+
}
39+
}
40+
41+
// All URLs failed
42+
Err(GetBatchProofsError::FetchingS3Batch(format!(
43+
"Failed to get batch from all URLs, errors: {}",
44+
errors.join("; ")
45+
)))
46+
}
47+
48+
// parse_batch_urls parses comma-separated URLs and limits to max 5
49+
fn parse_batch_urls(batch_urls: &str) -> Vec<String> {
50+
let mut urls = Vec::new();
51+
for url in batch_urls.split(',') {
52+
let trimmed_url = url.trim();
53+
if !trimmed_url.is_empty() {
54+
urls.push(trimmed_url.to_string());
55+
if urls.len() > MAX_BATCH_URLS {
56+
break;
57+
}
58+
}
59+
}
60+
urls
61+
}
1562

1663
pub async fn get_aligned_batch_from_s3(
1764
url: String,
1865
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
66+
info!("Fetching batch from S3 URL: {}", url);
1967
let client = reqwest::Client::builder()
2068
.user_agent(DEFAULT_USER_AGENT)
2169
.build()

crates/batcher/.env.dev

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
1+
# Main S3 bucket configuration
12
AWS_SECRET_ACCESS_KEY=test
23
AWS_REGION=us-east-2
34
AWS_ACCESS_KEY_ID=test
45
AWS_BUCKET_NAME=aligned.storage
56
UPLOAD_ENDPOINT=http://localhost:4566
67
DOWNLOAD_ENDPOINT=http://localhost:4566/aligned.storage
8+
9+
# Secondary S3 bucket configuration
10+
AWS_SECRET_ACCESS_KEY_SECONDARY=test2
11+
AWS_REGION_SECONDARY=us-west-1
12+
AWS_ACCESS_KEY_ID_SECONDARY=test2
13+
AWS_BUCKET_NAME_SECONDARY=aligned.storage
14+
UPLOAD_ENDPOINT_SECONDARY=http://localhost:4567
15+
DOWNLOAD_ENDPOINT_SECONDARY=http://localhost:4567/aligned.storage
16+
717
RUST_LOG=info
818
RUST_BACKTRACE=1

crates/batcher/.env.docker

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
1+
# Main S3 bucket configuration
12
AWS_SECRET_ACCESS_KEY=test
23
AWS_REGION=us-east-2
34
AWS_ACCESS_KEY_ID=test
45
AWS_BUCKET_NAME=aligned.storage
56
UPLOAD_ENDPOINT=http://localstack:4566
67
DOWNLOAD_ENDPOINT=http://localstack:4566/aligned.storage
8+
9+
# Secondary S3 bucket configuration
10+
AWS_SECRET_ACCESS_KEY_SECONDARY=test2
11+
AWS_REGION_SECONDARY=us-west-1
12+
AWS_ACCESS_KEY_ID_SECONDARY=test2
13+
AWS_BUCKET_NAME_SECONDARY=aligned.storage
14+
UPLOAD_ENDPOINT_SECONDARY=http://localhost:4567
15+
DOWNLOAD_ENDPOINT_SECONDARY=http://localhost:4567/aligned.storage
16+
717
RUST_LOG=info
818
RUST_BACKTRACE=1

crates/batcher/.env.example

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
1+
# Main S3 bucket configuration
12
AWS_SECRET_ACCESS_KEY=<secret_access_key>
23
AWS_REGION=<region>
34
AWS_ACCESS_KEY_ID=<access_key_id>
45
AWS_BUCKET_NAME=<bucket_name>
56
UPLOAD_ENDPOINT=<upload_endpoint. For non-aws storage>
67
DOWNLOAD_ENDPOINT=<download_endpoint>
8+
9+
# Secondary S3 bucket configuration
10+
AWS_SECRET_ACCESS_KEY_SECONDARY=<secondary_secret_access_key>
11+
AWS_REGION_SECONDARY=<secondary_region>
12+
AWS_ACCESS_KEY_ID_SECONDARY=<secondary_access_key_id>
13+
AWS_BUCKET_NAME_SECONDARY=<secondary_bucket_name>
14+
UPLOAD_ENDPOINT_SECONDARY=<secondary_upload_endpoint. For non-aws storage>
15+
DOWNLOAD_ENDPOINT_SECONDARY=<secondary_download_endpoint>
16+
717
RUST_LOG=<log_level> # info, debug, error, warn, etc.

crates/batcher/src/lib.rs

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ pub struct Batcher {
7575
s3_client: S3Client,
7676
s3_bucket_name: String,
7777
download_endpoint: String,
78+
s3_client_secondary: S3Client,
79+
s3_bucket_name_secondary: String,
80+
download_endpoint_secondary: String,
7881
eth_ws_url: String,
7982
eth_ws_url_fallback: String,
8083
batcher_signer: Arc<SignerMiddlewareT>,
@@ -106,15 +109,36 @@ impl Batcher {
106109
dotenv().ok();
107110

108111
// https://docs.aws.amazon.com/sdk-for-rust/latest/dg/localstack.html
109-
let upload_endpoint = env::var("UPLOAD_ENDPOINT").ok();
112+
// Primary S3 configuration
113+
let s3_config_primary = s3::S3Config {
114+
access_key_id: env::var("AWS_ACCESS_KEY_ID").ok(),
115+
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").ok(),
116+
region: env::var("AWS_REGION").ok(),
117+
endpoint_url: env::var("UPLOAD_ENDPOINT").ok(),
118+
};
110119

111120
let s3_bucket_name =
112121
env::var("AWS_BUCKET_NAME").expect("AWS_BUCKET_NAME not found in environment");
113122

114123
let download_endpoint =
115124
env::var("DOWNLOAD_ENDPOINT").expect("DOWNLOAD_ENDPOINT not found in environment");
116125

117-
let s3_client = s3::create_client(upload_endpoint).await;
126+
let s3_client = s3::create_client(s3_config_primary).await;
127+
128+
// Secondary S3 configuration
129+
let s3_config_secondary = s3::S3Config {
130+
access_key_id: env::var("AWS_ACCESS_KEY_ID_SECONDARY").ok(),
131+
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY_SECONDARY").ok(),
132+
region: env::var("AWS_REGION_SECONDARY").ok(),
133+
endpoint_url: env::var("UPLOAD_ENDPOINT_SECONDARY").ok(),
134+
};
135+
136+
let s3_bucket_name_secondary = env::var("AWS_BUCKET_NAME_SECONDARY")
137+
.expect("AWS_BUCKET_NAME_SECONDARY not found in environment");
138+
let download_endpoint_secondary = env::var("DOWNLOAD_ENDPOINT_SECONDARY")
139+
.expect("DOWNLOAD_ENDPOINT_SECONDARY not found in environment");
140+
141+
let s3_client_secondary = s3::create_client(s3_config_secondary).await;
118142

119143
let config = ConfigFromYaml::new(config_file);
120144
// Ensure max_batch_bytes_size can at least hold one proof of max_proof_size,
@@ -252,6 +276,9 @@ impl Batcher {
252276
s3_client,
253277
s3_bucket_name,
254278
download_endpoint,
279+
s3_client_secondary,
280+
s3_bucket_name_secondary,
281+
download_endpoint_secondary,
255282
eth_ws_url: config.eth_ws_url,
256283
eth_ws_url_fallback: config.eth_ws_url_fallback,
257284
batcher_signer,
@@ -1541,7 +1568,16 @@ impl Batcher {
15411568
let batch_merkle_root_hex = hex::encode(batch_merkle_root);
15421569
info!("Batch merkle root: 0x{}", batch_merkle_root_hex);
15431570
let file_name = batch_merkle_root_hex.clone() + ".json";
1544-
let batch_data_pointer: String = "".to_owned() + &self.download_endpoint + "/" + &file_name;
1571+
1572+
let batch_data_pointer = self.upload_batch_to_multiple_s3(batch_bytes, &file_name).await?;
1573+
if let Err(e) = self
1574+
.telemetry
1575+
.task_uploaded_to_s3(&batch_merkle_root_hex)
1576+
.await
1577+
{
1578+
warn!("Failed to send task status to telemetry: {:?}", e);
1579+
};
1580+
info!("Batch upload to: {}", batch_data_pointer);
15451581

15461582
let num_proofs_in_batch = leaves.len();
15471583
let gas_per_proof = (self.constant_gas_cost()
@@ -1577,16 +1613,6 @@ impl Batcher {
15771613
.gas_price_used_on_latest_batch
15781614
.set(gas_price.as_u64() as i64);
15791615

1580-
info!("Uploading batch to S3...");
1581-
self.upload_batch_to_s3(batch_bytes, &file_name).await?;
1582-
if let Err(e) = self
1583-
.telemetry
1584-
.task_uploaded_to_s3(&batch_merkle_root_hex)
1585-
.await
1586-
{
1587-
warn!("Failed to send task status to telemetry: {:?}", e);
1588-
};
1589-
info!("Batch sent to S3 with name: {}", file_name);
15901616
if let Err(e) = self
15911617
.telemetry
15921618
.task_created(
@@ -1857,22 +1883,61 @@ impl Batcher {
18571883
unlocked
18581884
}
18591885

1886+
/// Uploads the batch to both S3 buckets and returns the comma-separated URLs of successful uploads.
1887+
/// Returns an error only if all uploads fail.
1888+
async fn upload_batch_to_multiple_s3(
1889+
&self,
1890+
batch_bytes: &[u8],
1891+
file_name: &str,
1892+
) -> Result<String, BatcherError> {
1893+
// Upload to both S3 buckets and collect successful URLs
1894+
let mut successful_urls = Vec::new();
1895+
1896+
// Try primary S3 upload
1897+
if let Ok(_) = self.upload_batch_to_s3(&self.s3_client, batch_bytes, file_name, &self.s3_bucket_name).await {
1898+
let primary_url = format!("{}/{}", self.download_endpoint, file_name);
1899+
successful_urls.push(primary_url.clone());
1900+
info!("Successfully uploaded batch to primary S3: {}", primary_url);
1901+
} else {
1902+
warn!("Failed to upload batch to primary S3");
1903+
}
1904+
1905+
// Try secondary S3 upload
1906+
if let Ok(_) = self.upload_batch_to_s3(&self.s3_client_secondary, batch_bytes, file_name, &self.s3_bucket_name_secondary).await {
1907+
let secondary_url = format!("{}/{}", self.download_endpoint_secondary, file_name);
1908+
successful_urls.push(secondary_url.clone());
1909+
info!("Successfully uploaded batch to secondary S3: {}", secondary_url);
1910+
} else {
1911+
warn!("Failed to upload batch to secondary S3");
1912+
}
1913+
1914+
// If no uploads succeeded, return error
1915+
if successful_urls.is_empty() {
1916+
error!("Failed to upload batch to both S3 buckets");
1917+
return Err(BatcherError::BatchUploadError("Failed to upload to any S3 bucket".to_string()));
1918+
}
1919+
1920+
Ok(successful_urls.join(","))
1921+
}
1922+
18601923
/// Uploads the batch to s3.
18611924
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
18621925
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
18631926
async fn upload_batch_to_s3(
18641927
&self,
1928+
s3_client: &S3Client,
18651929
batch_bytes: &[u8],
18661930
file_name: &str,
1931+
bucket_name: &str,
18671932
) -> Result<(), BatcherError> {
18681933
let start = Instant::now();
18691934
let result = retry_function(
18701935
|| {
18711936
Self::upload_batch_to_s3_retryable(
18721937
batch_bytes,
18731938
file_name,
1874-
self.s3_client.clone(),
1875-
&self.s3_bucket_name,
1939+
s3_client.clone(),
1940+
bucket_name,
18761941
)
18771942
},
18781943
ETHEREUM_CALL_MIN_RETRY_DELAY,

crates/batcher/src/s3/mod.rs

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,47 @@ use aws_sdk_s3::error::SdkError;
44
use aws_sdk_s3::operation::put_object::{PutObjectError, PutObjectOutput};
55
use aws_sdk_s3::primitives::ByteStream;
66
use aws_sdk_s3::Client;
7+
use aws_sdk_s3::config::Region;
78
use log::info;
89

9-
pub async fn create_client(endpoint_url: Option<String>) -> Client {
10-
let region_provider = RegionProviderChain::default_provider().or_else("us-east-2");
11-
let mut config = aws_config::defaults(BehaviorVersion::latest()).region(region_provider);
12-
if let Some(endpoint_url) = &endpoint_url {
10+
pub struct S3Config {
11+
pub access_key_id: Option<String>,
12+
pub secret_access_key: Option<String>,
13+
pub region: Option<String>,
14+
pub endpoint_url: Option<String>,
15+
}
16+
17+
pub async fn create_client(s3_config: S3Config) -> Client {
18+
let mut config = aws_config::defaults(BehaviorVersion::latest());
19+
20+
if let Some(region) = s3_config.region {
21+
let region_provider = RegionProviderChain::first_try(Region::new(region)).or_else("us-east-2");
22+
config = config.region(region_provider);
23+
} else {
24+
let region_provider = RegionProviderChain::default_provider().or_else("us-east-2");
25+
config = config.region(region_provider);
26+
}
27+
28+
if let (Some(access_key_id), Some(secret_access_key)) = (s3_config.access_key_id, s3_config.secret_access_key) {
29+
let credentials = aws_sdk_s3::config::Credentials::new(
30+
access_key_id,
31+
secret_access_key,
32+
None,
33+
None,
34+
"custom",
35+
);
36+
config = config.credentials_provider(credentials);
37+
}
38+
39+
if let Some(endpoint_url) = &s3_config.endpoint_url {
1340
info!("Using custom endpoint: {}", endpoint_url);
1441
config = config.endpoint_url(endpoint_url);
1542
}
43+
1644
let config = config.load().await;
1745

1846
let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&config);
19-
if endpoint_url.is_some() {
47+
if s3_config.endpoint_url.is_some() {
2048
info!("Forcing path style for custom endpoint");
2149
s3_config_builder = s3_config_builder.force_path_style(true);
2250
}

0 commit comments

Comments
 (0)