Skip to content

Commit e3a5ab6

Browse files
authored
feat: implement blob streaming in the aggregator by slivers (#2801)
* feat: implement blob streaming in the aggregator by slivers
1 parent 0a2c97a commit e3a5ab6

File tree

12 files changed

+837
-7
lines changed

12 files changed

+837
-7
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/walrus-e2e-tests/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ anyhow.workspace = true
1111
prometheus.workspace = true
1212
rand.workspace = true
1313
# explicitly import reqwest in test to disable its system proxy cache. It causes indeterminism in simtest.
14+
bytes.workspace = true
1415
futures.workspace = true
1516
hex.workspace = true
1617
indicatif.workspace = true

crates/walrus-e2e-tests/tests/test_client.rs

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ use std::{
2121
time::Duration,
2222
};
2323

24+
use bytes::Bytes;
2425
use rand::{Rng, random, seq::SliceRandom, thread_rng};
2526
use reqwest::Url;
2627
#[cfg(msim)]
2728
use sui_macros::{clear_fail_point, register_fail_point_if};
2829
use sui_types::base_types::{SUI_ADDRESS_LENGTH, SuiAddress};
29-
use tokio::sync::Mutex;
30+
use tokio::{sync::Mutex, time::Instant};
3031
use tokio_stream::StreamExt;
3132
use walrus_core::{
3233
BlobId,
@@ -58,6 +59,7 @@ use walrus_sdk::{
5859
client_types::WalrusStoreBlob,
5960
quilt_client::QuiltClientConfig,
6061
responses::{BlobStoreResult, QuiltStoreResult},
62+
streaming::start_streaming_blob,
6163
upload_relay_client::UploadRelayClient,
6264
},
6365
config::ClientConfig,
@@ -2990,3 +2992,79 @@ async fn test_byte_range_read_size_too_large() -> TestResult {
29902992
}
29912993
Ok(())
29922994
}
2995+
2996+
/// Tests that streaming a blob returns the correct data.
2997+
#[ignore = "ignore E2E tests by default"]
2998+
#[walrus_simtest]
2999+
async fn test_streaming_blob() -> TestResult {
3000+
walrus_test_utils::init_tracing();
3001+
3002+
// Setup test cluster
3003+
let (_sui_cluster_handle, _cluster, client, _) =
3004+
test_cluster::E2eTestSetupBuilder::new().build().await?;
3005+
3006+
// Generate and store a test blob (~100KB to span multiple slivers)
3007+
let blob_size = 100_000;
3008+
let original_data = walrus_test_utils::random_data(blob_size);
3009+
3010+
let store_args = StoreArgs::default_with_epochs(5).with_encoding_type(DEFAULT_ENCODING);
3011+
3012+
let store_results = client
3013+
.inner
3014+
.reserve_and_store_blobs(vec![original_data.clone()], &store_args)
3015+
.await?;
3016+
3017+
let blob_id = store_results
3018+
.into_iter()
3019+
.next()
3020+
.expect("should have one blob store result")
3021+
.blob_id()
3022+
.expect("blob ID should be present");
3023+
3024+
// Create a read-only client for streaming (SuiReadClient implements Clone)
3025+
let sui_read_client = client.inner.sui_client().read_client().clone();
3026+
let config = client.inner.config().clone();
3027+
let streaming_config = config.streaming_config.clone();
3028+
3029+
let read_client =
3030+
WalrusNodeClient::new_read_client_with_refresher(config, sui_read_client).await?;
3031+
let arc_client = Arc::new(read_client);
3032+
3033+
// Call start_streaming_blob directly
3034+
let start = Instant::now();
3035+
let (stream, returned_size) =
3036+
start_streaming_blob(arc_client, streaming_config, blob_id).await?;
3037+
3038+
// Verify returned blob size matches
3039+
assert_eq!(
3040+
returned_size, blob_size as u64,
3041+
"returned blob size should match"
3042+
);
3043+
3044+
// Collect stream chunks
3045+
let collected: Vec<Bytes> = stream
3046+
.map(|result| result.expect("stream chunk should succeed"))
3047+
.collect()
3048+
.await;
3049+
3050+
tracing::info!(
3051+
"Collected {} chunks in {:?}",
3052+
collected.len(),
3053+
start.elapsed()
3054+
);
3055+
// Concatenate all chunks
3056+
let streamed_data: Vec<u8> = collected.into_iter().flat_map(|b| b.to_vec()).collect();
3057+
3058+
// Verify data matches original
3059+
assert_eq!(
3060+
streamed_data.len(),
3061+
original_data.len(),
3062+
"streamed data length should match original"
3063+
);
3064+
assert_eq!(
3065+
streamed_data, original_data,
3066+
"streamed data should match original blob"
3067+
);
3068+
3069+
Ok(())
3070+
}

crates/walrus-sdk/client_config_example.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,3 +66,7 @@ quilt_client_config:
6666
byte_range_read_client_config:
6767
max_retrieve_slivers_attempts: 2
6868
timeout_secs: 10
69+
streaming_config:
70+
max_sliver_retry_attempts: 5
71+
sliver_timeout_secs: 30
72+
prefetch_count: 4

crates/walrus-sdk/src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ use crate::{
113113
pub mod byte_range_read_client;
114114
pub mod client_types;
115115
pub mod communication;
116+
pub mod streaming;
116117
pub use communication::NodeCommunicationFactory;
117118
pub mod metrics;
118119
pub mod quilt_client;

0 commit comments

Comments
 (0)