Skip to content

Commit e345a37

Browse files
authored
feat: upload batches to multiple storage services (#2045)
1 parent 58d5b99 commit e345a37

File tree

18 files changed

+511
-47
lines changed

18 files changed

+511
-47
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ crates/cli/batch_inclusion_responses/*
1212
**/aligned_verification_data
1313
**/broadcast
1414
volume
15+
volume2
1516
config-files/*.last_processed_batch.json
1617
config-files/*.last_aggregated_block.json
1718

aggregation_mode/src/backend/fetcher.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
risc0_aggregator::Risc0ProofReceiptAndImageId, sp1_aggregator::SP1ProofWithPubValuesAndElf,
1010
AlignedProof, ZKVMEngine,
1111
},
12-
backend::s3::get_aligned_batch_from_s3,
12+
backend::s3::get_aligned_batch_from_s3_with_multiple_urls,
1313
};
1414
use aligned_sdk::common::types::ProvingSystemId;
1515
use alloy::{
@@ -97,13 +97,14 @@ impl ProofsFetcher {
9797
);
9898

9999
// Download batch proofs from s3
100-
let data = match get_aligned_batch_from_s3(batch.batchDataPointer).await {
101-
Ok(data) => data,
102-
Err(err) => {
103-
error!("Error while downloading proofs from s3. Err {:?}", err);
104-
continue;
105-
}
106-
};
100+
let data =
101+
match get_aligned_batch_from_s3_with_multiple_urls(batch.batchDataPointer).await {
102+
Ok(data) => data,
103+
Err(err) => {
104+
error!("Error while downloading proofs from s3. Err {:?}", err);
105+
continue;
106+
}
107+
};
107108

108109
info!("Data downloaded from S3, number of proofs {}", data.len());
109110

aggregation_mode/src/backend/s3.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use aligned_sdk::common::types::VerificationData;
2+
use tracing::{info, warn};
23

34
#[derive(Debug)]
45
#[allow(dead_code)]
@@ -12,10 +13,61 @@ pub enum GetBatchProofsError {
1213

1314
// needed to make S3 bucket work
1415
const DEFAULT_USER_AGENT: &str = "proof-aggregator/aligned-layer";
16+
const MAX_BATCH_URLS: usize = 5;
17+
18+
// get_aligned_batch_from_s3_with_multiple_urls tries multiple comma-separated URLs until first successful response
19+
pub async fn get_aligned_batch_from_s3_with_multiple_urls(
20+
urls: String,
21+
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
22+
// Parse comma-separated URLs and limit to max 5
23+
let parsed_urls = parse_batch_urls(&urls);
24+
info!(
25+
"Getting batch from data service with {} URLs: {:?}",
26+
parsed_urls.len(),
27+
parsed_urls
28+
);
29+
30+
let mut errors = Vec::new();
31+
32+
// Try each URL until first successful response
33+
for url in parsed_urls.iter() {
34+
match get_aligned_batch_from_s3(url.clone()).await {
35+
Ok(data) => {
36+
return Ok(data);
37+
}
38+
Err(err) => {
39+
warn!("Failed to fetch batch from URL {}: {:?}", url, err);
40+
errors.push(format!("URL {}: {:?}", url, err));
41+
}
42+
}
43+
}
44+
45+
// All URLs failed
46+
Err(GetBatchProofsError::FetchingS3Batch(format!(
47+
"Failed to get batch from all URLs, errors: {}",
48+
errors.join("; ")
49+
)))
50+
}
51+
52+
// parse_batch_urls parses comma-separated URLs and limits to max 5
53+
fn parse_batch_urls(batch_urls: &str) -> Vec<String> {
54+
let mut urls = Vec::new();
55+
for url in batch_urls.split(',') {
56+
let trimmed_url = url.trim();
57+
if !trimmed_url.is_empty() {
58+
urls.push(trimmed_url.to_string());
59+
if urls.len() > MAX_BATCH_URLS {
60+
break;
61+
}
62+
}
63+
}
64+
urls
65+
}
1566

1667
pub async fn get_aligned_batch_from_s3(
1768
url: String,
1869
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
70+
info!("Fetching batch from S3 URL: {}", url);
1971
let client = reqwest::Client::builder()
2072
.user_agent(DEFAULT_USER_AGENT)
2173
.build()

crates/batcher/.env.dev

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
1+
# Main S3 bucket configuration
12
AWS_SECRET_ACCESS_KEY=test
23
AWS_REGION=us-east-2
34
AWS_ACCESS_KEY_ID=test
45
AWS_BUCKET_NAME=aligned.storage
56
UPLOAD_ENDPOINT=http://localhost:4566
67
DOWNLOAD_ENDPOINT=http://localhost:4566/aligned.storage
8+
9+
# Secondary S3 bucket configuration
10+
AWS_SECRET_ACCESS_KEY_SECONDARY=test2
11+
AWS_REGION_SECONDARY=us-west-1
12+
AWS_ACCESS_KEY_ID_SECONDARY=test2
13+
AWS_BUCKET_NAME_SECONDARY=aligned.storage
14+
UPLOAD_ENDPOINT_SECONDARY=http://localhost:4567
15+
DOWNLOAD_ENDPOINT_SECONDARY=http://localhost:4567/aligned.storage
16+
717
RUST_LOG=info
818
RUST_BACKTRACE=1

crates/batcher/.env.docker

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,18 @@
1+
# Main S3 bucket configuration
12
AWS_SECRET_ACCESS_KEY=test
23
AWS_REGION=us-east-2
34
AWS_ACCESS_KEY_ID=test
45
AWS_BUCKET_NAME=aligned.storage
56
UPLOAD_ENDPOINT=http://localstack:4566
67
DOWNLOAD_ENDPOINT=http://localstack:4566/aligned.storage
8+
9+
# Secondary S3 bucket configuration
10+
AWS_SECRET_ACCESS_KEY_SECONDARY=test2
11+
AWS_REGION_SECONDARY=us-west-1
12+
AWS_ACCESS_KEY_ID_SECONDARY=test2
13+
AWS_BUCKET_NAME_SECONDARY=aligned.storage
14+
UPLOAD_ENDPOINT_SECONDARY=http://localstack2:4567
15+
DOWNLOAD_ENDPOINT_SECONDARY=http://localstack2:4567/aligned.storage
16+
717
RUST_LOG=info
818
RUST_BACKTRACE=1

crates/batcher/.env.example

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
1+
# Main S3 bucket configuration
12
AWS_SECRET_ACCESS_KEY=<secret_access_key>
23
AWS_REGION=<region>
34
AWS_ACCESS_KEY_ID=<access_key_id>
45
AWS_BUCKET_NAME=<bucket_name>
56
UPLOAD_ENDPOINT=<upload_endpoint. For non-aws storage>
67
DOWNLOAD_ENDPOINT=<download_endpoint>
8+
9+
# Secondary S3 bucket configuration
10+
AWS_SECRET_ACCESS_KEY_SECONDARY=<secondary_secret_access_key>
11+
AWS_REGION_SECONDARY=<secondary_region>
12+
AWS_ACCESS_KEY_ID_SECONDARY=<secondary_access_key_id>
13+
AWS_BUCKET_NAME_SECONDARY=<secondary_bucket_name>
14+
UPLOAD_ENDPOINT_SECONDARY=<secondary_upload_endpoint. For non-aws storage>
15+
DOWNLOAD_ENDPOINT_SECONDARY=<secondary_download_endpoint>
16+
717
RUST_LOG=<log_level> # info, debug, error, warn, etc.

crates/batcher/src/lib.rs

Lines changed: 124 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ pub struct Batcher {
7575
s3_client: S3Client,
7676
s3_bucket_name: String,
7777
download_endpoint: String,
78+
s3_client_secondary: Option<S3Client>,
79+
s3_bucket_name_secondary: Option<String>,
80+
download_endpoint_secondary: Option<String>,
7881
eth_ws_url: String,
7982
eth_ws_url_fallback: String,
8083
batcher_signer: Arc<SignerMiddlewareT>,
@@ -106,15 +109,40 @@ impl Batcher {
106109
dotenv().ok();
107110

108111
// https://docs.aws.amazon.com/sdk-for-rust/latest/dg/localstack.html
109-
let upload_endpoint = env::var("UPLOAD_ENDPOINT").ok();
112+
// Primary S3 configuration
113+
let s3_config_primary = s3::S3Config {
114+
access_key_id: env::var("AWS_ACCESS_KEY_ID").ok(),
115+
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY").ok(),
116+
region: env::var("AWS_REGION").ok(),
117+
endpoint_url: env::var("UPLOAD_ENDPOINT").ok(),
118+
};
110119

111120
let s3_bucket_name =
112121
env::var("AWS_BUCKET_NAME").expect("AWS_BUCKET_NAME not found in environment");
113122

114123
let download_endpoint =
115124
env::var("DOWNLOAD_ENDPOINT").expect("DOWNLOAD_ENDPOINT not found in environment");
116125

117-
let s3_client = s3::create_client(upload_endpoint).await;
126+
let s3_client = s3::create_client(s3_config_primary).await;
127+
128+
// Secondary S3 configuration (optional)
129+
let s3_bucket_name_secondary = env::var("AWS_BUCKET_NAME_SECONDARY").ok();
130+
let download_endpoint_secondary = env::var("DOWNLOAD_ENDPOINT_SECONDARY").ok();
131+
132+
let s3_client_secondary = if s3_bucket_name_secondary.is_some()
133+
&& download_endpoint_secondary.is_some()
134+
{
135+
let s3_config_secondary = s3::S3Config {
136+
access_key_id: env::var("AWS_ACCESS_KEY_ID_SECONDARY").ok(),
137+
secret_access_key: env::var("AWS_SECRET_ACCESS_KEY_SECONDARY").ok(),
138+
region: env::var("AWS_REGION_SECONDARY").ok(),
139+
endpoint_url: env::var("UPLOAD_ENDPOINT_SECONDARY").ok(),
140+
};
141+
Some(s3::create_client(s3_config_secondary).await)
142+
} else {
143+
info!("Secondary S3 configuration not found or incomplete. Operating with primary S3 only.");
144+
None
145+
};
118146

119147
let config = ConfigFromYaml::new(config_file);
120148
// Ensure max_batch_bytes_size can at least hold one proof of max_proof_size,
@@ -252,6 +280,9 @@ impl Batcher {
252280
s3_client,
253281
s3_bucket_name,
254282
download_endpoint,
283+
s3_client_secondary,
284+
s3_bucket_name_secondary,
285+
download_endpoint_secondary,
255286
eth_ws_url: config.eth_ws_url,
256287
eth_ws_url_fallback: config.eth_ws_url_fallback,
257288
batcher_signer,
@@ -1541,7 +1572,18 @@ impl Batcher {
15411572
let batch_merkle_root_hex = hex::encode(batch_merkle_root);
15421573
info!("Batch merkle root: 0x{}", batch_merkle_root_hex);
15431574
let file_name = batch_merkle_root_hex.clone() + ".json";
1544-
let batch_data_pointer: String = "".to_owned() + &self.download_endpoint + "/" + &file_name;
1575+
1576+
let batch_data_pointer = self
1577+
.upload_batch_to_multiple_s3(batch_bytes, &file_name)
1578+
.await?;
1579+
if let Err(e) = self
1580+
.telemetry
1581+
.task_uploaded_to_s3(&batch_merkle_root_hex)
1582+
.await
1583+
{
1584+
warn!("Failed to send task status to telemetry: {:?}", e);
1585+
};
1586+
info!("Batch upload to: {}", batch_data_pointer);
15451587

15461588
let num_proofs_in_batch = leaves.len();
15471589
let gas_per_proof = (self.constant_gas_cost()
@@ -1577,16 +1619,6 @@ impl Batcher {
15771619
.gas_price_used_on_latest_batch
15781620
.set(gas_price.as_u64() as i64);
15791621

1580-
info!("Uploading batch to S3...");
1581-
self.upload_batch_to_s3(batch_bytes, &file_name).await?;
1582-
if let Err(e) = self
1583-
.telemetry
1584-
.task_uploaded_to_s3(&batch_merkle_root_hex)
1585-
.await
1586-
{
1587-
warn!("Failed to send task status to telemetry: {:?}", e);
1588-
};
1589-
info!("Batch sent to S3 with name: {}", file_name);
15901622
if let Err(e) = self
15911623
.telemetry
15921624
.task_created(
@@ -1857,22 +1889,99 @@ impl Batcher {
18571889
unlocked
18581890
}
18591891

1892+
/// Uploads the batch to both S3 buckets and returns the comma-separated URLs of successful uploads.
1893+
/// Returns an error only if all uploads fail.
1894+
async fn upload_batch_to_multiple_s3(
1895+
&self,
1896+
batch_bytes: &[u8],
1897+
file_name: &str,
1898+
) -> Result<String, BatcherError> {
1899+
// Upload to both S3 buckets and collect successful URLs
1900+
let mut successful_urls = Vec::new();
1901+
1902+
// Try primary S3 upload
1903+
if self
1904+
.upload_batch_to_s3(
1905+
&self.s3_client,
1906+
batch_bytes,
1907+
file_name,
1908+
&self.s3_bucket_name,
1909+
)
1910+
.await
1911+
.is_ok()
1912+
{
1913+
let primary_url = format!("{}/{}", self.download_endpoint, file_name);
1914+
successful_urls.push(primary_url.clone());
1915+
info!("Successfully uploaded batch to primary S3: {}", primary_url);
1916+
} else {
1917+
warn!("Failed to upload batch to primary S3");
1918+
}
1919+
1920+
// Try secondary S3 upload (if configured)
1921+
if let (
1922+
Some(s3_client_secondary),
1923+
Some(s3_bucket_name_secondary),
1924+
Some(download_endpoint_secondary),
1925+
) = (
1926+
&self.s3_client_secondary,
1927+
&self.s3_bucket_name_secondary,
1928+
&self.download_endpoint_secondary,
1929+
) {
1930+
if self
1931+
.upload_batch_to_s3(
1932+
s3_client_secondary,
1933+
batch_bytes,
1934+
file_name,
1935+
s3_bucket_name_secondary,
1936+
)
1937+
.await
1938+
.is_ok()
1939+
{
1940+
let secondary_url = format!("{}/{}", download_endpoint_secondary, file_name);
1941+
successful_urls.push(secondary_url.clone());
1942+
info!(
1943+
"Successfully uploaded batch to secondary S3: {}",
1944+
secondary_url
1945+
);
1946+
} else {
1947+
warn!("Failed to upload batch to secondary S3");
1948+
}
1949+
}
1950+
1951+
// Update metrics with number of available data services
1952+
self.metrics
1953+
.available_data_services
1954+
.set(successful_urls.len() as i64);
1955+
1956+
// If no uploads succeeded, return error
1957+
if successful_urls.is_empty() {
1958+
error!("Failed to upload batch to both S3 buckets");
1959+
return Err(BatcherError::BatchUploadError(
1960+
"Failed to upload to any S3 bucket".to_string(),
1961+
));
1962+
}
1963+
1964+
Ok(successful_urls.join(","))
1965+
}
1966+
18601967
/// Uploads the batch to s3.
18611968
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
18621969
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
18631970
async fn upload_batch_to_s3(
18641971
&self,
1972+
s3_client: &S3Client,
18651973
batch_bytes: &[u8],
18661974
file_name: &str,
1975+
bucket_name: &str,
18671976
) -> Result<(), BatcherError> {
18681977
let start = Instant::now();
18691978
let result = retry_function(
18701979
|| {
18711980
Self::upload_batch_to_s3_retryable(
18721981
batch_bytes,
18731982
file_name,
1874-
self.s3_client.clone(),
1875-
&self.s3_bucket_name,
1983+
s3_client.clone(),
1984+
bucket_name,
18761985
)
18771986
},
18781987
ETHEREUM_CALL_MIN_RETRY_DELAY,

crates/batcher/src/metrics.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub struct BatcherMetrics {
2727
pub cancel_create_new_task_duration: IntGauge,
2828
pub batcher_gas_cost_create_task_total: GenericCounter<AtomicF64>,
2929
pub batcher_gas_cost_cancel_task_total: GenericCounter<AtomicF64>,
30+
pub available_data_services: IntGauge,
3031
}
3132

3233
impl BatcherMetrics {
@@ -79,6 +80,10 @@ impl BatcherMetrics {
7980
"batcher_gas_cost_cancel_task_total",
8081
"Batcher Gas Cost Cancel Task Total"
8182
))?;
83+
let available_data_services = register_int_gauge!(opts!(
84+
"available_data_services",
85+
"Number of available data services (0-2)"
86+
))?;
8287

8388
registry.register(Box::new(open_connections.clone()))?;
8489
registry.register(Box::new(received_proofs.clone()))?;
@@ -96,6 +101,7 @@ impl BatcherMetrics {
96101
registry.register(Box::new(cancel_create_new_task_duration.clone()))?;
97102
registry.register(Box::new(batcher_gas_cost_create_task_total.clone()))?;
98103
registry.register(Box::new(batcher_gas_cost_cancel_task_total.clone()))?;
104+
registry.register(Box::new(available_data_services.clone()))?;
99105

100106
let metrics_route = warp::path!("metrics")
101107
.and(warp::any().map(move || registry.clone()))
@@ -124,6 +130,7 @@ impl BatcherMetrics {
124130
cancel_create_new_task_duration,
125131
batcher_gas_cost_create_task_total,
126132
batcher_gas_cost_cancel_task_total,
133+
available_data_services,
127134
})
128135
}
129136

0 commit comments

Comments
 (0)