Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/walrus-e2e-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ anyhow.workspace = true
prometheus.workspace = true
rand.workspace = true
# explicitly import reqwest in test to disable its system proxy cache. It causes indeterminism in simtest.
bytes.workspace = true
futures.workspace = true
hex.workspace = true
indicatif.workspace = true
Expand Down
80 changes: 79 additions & 1 deletion crates/walrus-e2e-tests/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use std::{
time::Duration,
};

use bytes::Bytes;
use rand::{Rng, random, seq::SliceRandom, thread_rng};
use reqwest::Url;
#[cfg(msim)]
use sui_macros::{clear_fail_point, register_fail_point_if};
use sui_types::base_types::{SUI_ADDRESS_LENGTH, SuiAddress};
use tokio::sync::Mutex;
use tokio::{sync::Mutex, time::Instant};
use tokio_stream::StreamExt;
use walrus_core::{
BlobId,
Expand Down Expand Up @@ -58,6 +59,7 @@ use walrus_sdk::{
client_types::WalrusStoreBlob,
quilt_client::QuiltClientConfig,
responses::{BlobStoreResult, QuiltStoreResult},
streaming::start_streaming_blob,
upload_relay_client::UploadRelayClient,
},
config::ClientConfig,
Expand Down Expand Up @@ -2990,3 +2992,79 @@ async fn test_byte_range_read_size_too_large() -> TestResult {
}
Ok(())
}

/// Tests that streaming a blob returns the correct data.
#[ignore = "ignore E2E tests by default"]
#[walrus_simtest]
async fn test_streaming_blob() -> TestResult {
walrus_test_utils::init_tracing();

// Setup test cluster
let (_sui_cluster_handle, _cluster, client, _) =
test_cluster::E2eTestSetupBuilder::new().build().await?;

// Generate and store a test blob (~100KB to span multiple slivers)
let blob_size = 100_000;
let original_data = walrus_test_utils::random_data(blob_size);

let store_args = StoreArgs::default_with_epochs(5).with_encoding_type(DEFAULT_ENCODING);

let store_results = client
.inner
.reserve_and_store_blobs(vec![original_data.clone()], &store_args)
.await?;

let blob_id = store_results
.into_iter()
.next()
.expect("should have one blob store result")
.blob_id()
.expect("blob ID should be present");

// Create a read-only client for streaming (SuiReadClient implements Clone)
let sui_read_client = client.inner.sui_client().read_client().clone();
let config = client.inner.config().clone();
let streaming_config = config.streaming_config.clone();

let read_client =
WalrusNodeClient::new_read_client_with_refresher(config, sui_read_client).await?;
let arc_client = Arc::new(read_client);

// Call start_streaming_blob directly
let start = Instant::now();
let (stream, returned_size) =
start_streaming_blob(arc_client, streaming_config, blob_id).await?;

// Verify returned blob size matches
assert_eq!(
returned_size, blob_size as u64,
"returned blob size should match"
);

// Collect stream chunks
let collected: Vec<Bytes> = stream
.map(|result| result.expect("stream chunk should succeed"))
.collect()
.await;

tracing::info!(
"Collected {} chunks in {:?}",
collected.len(),
start.elapsed()
);
// Concatenate all chunks
let streamed_data: Vec<u8> = collected.into_iter().flat_map(|b| b.to_vec()).collect();

// Verify data matches original
assert_eq!(
streamed_data.len(),
original_data.len(),
"streamed data length should match original"
);
assert_eq!(
streamed_data, original_data,
"streamed data should match original blob"
);

Ok(())
}
4 changes: 4 additions & 0 deletions crates/walrus-sdk/client_config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ quilt_client_config:
byte_range_read_client_config:
max_retrieve_slivers_attempts: 2
timeout_secs: 10
streaming_config:
max_sliver_retry_attempts: 5
sliver_timeout_secs: 30
prefetch_count: 4
1 change: 1 addition & 0 deletions crates/walrus-sdk/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ use crate::{
pub mod byte_range_read_client;
pub mod client_types;
pub mod communication;
pub mod streaming;
pub use communication::NodeCommunicationFactory;
pub mod metrics;
pub mod quilt_client;
Expand Down
Loading
Loading