@@ -39,9 +39,11 @@ use walrus_core::{
3939 SliverPairIndex ,
4040 encoding:: {
4141 BLOB_TYPE_ATTRIBUTE_KEY ,
42+ EncodingAxis ,
4243 EncodingFactory as _,
4344 Primary ,
4445 QUILT_TYPE_VALUE ,
46+ Secondary ,
4547 encoded_blob_length_for_n_shards,
4648 quilt_encoding:: { QuiltApi , QuiltStoreBlob , QuiltVersionV1 } ,
4749 } ,
@@ -56,6 +58,7 @@ use walrus_sdk::{
5658 StoreArgs ,
5759 StoreBlobsApi as _,
5860 WalrusNodeClient ,
61+ byte_range_read_client:: { ByteRangeReadClient , ByteRangeReadClientConfig } ,
5962 client_types:: WalrusStoreBlob ,
6063 quilt_client:: QuiltClientConfig ,
6164 responses:: { BlobStoreResult , QuiltStoreResult } ,
@@ -79,6 +82,7 @@ use walrus_sdk::{
7982} ;
8083use walrus_service:: test_utils:: {
8184 StorageNodeHandleTrait ,
85+ TestCluster ,
8286 TestNodesConfig ,
8387 UnusedSocketAddress ,
8488 test_cluster:: { self , FROST_PER_NODE_WEIGHT } ,
@@ -1147,9 +1151,10 @@ async fn test_store_quilt(blobs_to_create: u32) -> TestResult {
11471151 . collect :: < Vec < _ > > ( ) ;
11481152
11491153 // Store the quilt.
1150- let quilt_client = client
1151- . quilt_client ( )
1152- . with_config ( QuiltClientConfig :: new ( 6 , Duration :: from_mins ( 1 ) ) ) ;
1154+ let quilt_client =
1155+ client
1156+ . quilt_client ( )
1157+ . with_config ( QuiltClientConfig :: new ( 6 , Duration :: from_mins ( 1 ) , 100 ) ) ;
11531158 let quilt = quilt_client
11541159 . construct_quilt :: < QuiltVersionV1 > ( & quilt_store_blobs, encoding_type)
11551160 . await ?;
@@ -2856,22 +2861,34 @@ async fn test_store_with_upload_relay_with_tip() {
28562861 . expect ( "shutdown upload relay" ) ;
28572862}
28582863
2859- /// Tests client byte range read functionality.
2860- #[ ignore = "ignore E2E tests by default" ]
2861- #[ walrus_simtest]
2862- async fn test_byte_range_read_client ( ) -> TestResult {
2864+ fn fail_random_node ( walrus_cluster : & TestCluster ) {
2865+ let node_to_stop = thread_rng ( ) . gen_range ( 0 ..5 ) ;
2866+ tracing:: info!( "failing node {node_to_stop}" ) ;
2867+ walrus_cluster. nodes [ node_to_stop] . cancel ( ) ;
2868+ }
2869+
2870+ // Tests client byte range read functionality, with different node failure scenarios.
2871+ async_param_test ! {
2872+ #[ ignore = "ignore E2E tests by default" ]
2873+ #[ walrus_simtest]
2874+ test_byte_range_read_client -> TestResult : [
2875+ with_node_failure: ( true ) ,
2876+ without_node_failure: ( false ) ,
2877+ ]
2878+ }
2879+ async fn test_byte_range_read_client ( with_node_failure : bool ) -> TestResult {
28632880 walrus_test_utils:: init_tracing ( ) ;
28642881
28652882 let test_nodes_config = TestNodesConfig :: builder ( )
28662883 . with_node_weights ( & [ 7 , 7 , 7 , 7 , 7 ] )
28672884 . build ( ) ;
28682885 let test_cluster_builder =
28692886 test_cluster:: E2eTestSetupBuilder :: new ( ) . with_test_nodes_config ( test_nodes_config) ;
2870- let ( _sui_cluster_handle, _cluster , client, _, _) = test_cluster_builder. build ( ) . await ?;
2887+ let ( _sui_cluster_handle, walrus_cluster , client, _, _) = test_cluster_builder. build ( ) . await ?;
28712888 let client = client. as_ref ( ) ;
28722889
28732890 // Generate a non zero blob size.
2874- let blob_size: u64 = thread_rng ( ) . gen_range ( 1 ..=1000000 ) ;
2891+ let blob_size: u64 = thread_rng ( ) . gen_range ( 100 ..=1000000 ) ;
28752892 tracing:: info!( "blob size: {blob_size}" ) ;
28762893 let blobs = walrus_test_utils:: random_data_list (
28772894 usize:: try_from ( blob_size) . expect ( "blob size should be valid" ) ,
@@ -2890,6 +2907,10 @@ async fn test_byte_range_read_client() -> TestResult {
28902907 . blob_id ( )
28912908 . expect ( "blob ID should be present" ) ;
28922909
2910+ if with_node_failure {
2911+ fail_random_node ( & walrus_cluster) ;
2912+ }
2913+
28932914 // Test reading byte range from the blob. We do 20 randomly generated byte range reads and
28942915 // verify that the data is correct.
28952916 for _ in 0 ..20 {
@@ -2995,15 +3016,30 @@ async fn test_byte_range_read_size_too_large() -> TestResult {
29953016 Ok ( ( ) )
29963017}
29973018
2998- /// Tests that streaming a blob returns the correct data.
2999- #[ ignore = "ignore E2E tests by default" ]
3000- #[ walrus_simtest]
3001- async fn test_streaming_blob ( ) -> TestResult {
3019+ // Tests that streaming a blob returns the correct data.
3020+ async_param_test ! {
3021+ #[ ignore = "ignore E2E tests by default" ]
3022+ #[ walrus_simtest]
3023+ test_streaming_blob -> TestResult : [
3024+ with_node_failure: ( true ) ,
3025+ without_node_failure: ( false ) ,
3026+ ]
3027+ }
3028+ async fn test_streaming_blob ( with_node_failure : bool ) -> TestResult {
30023029 walrus_test_utils:: init_tracing ( ) ;
30033030
3031+ // Create a test cluster with 5 nodes with the same weight. This makes sure that when losing
3032+ // any node, it still has a quorum.
3033+ let test_nodes_config = TestNodesConfig :: builder ( )
3034+ . with_node_weights ( & [ 7 , 7 , 7 , 7 , 7 ] )
3035+ . build ( ) ;
3036+
30043037 // Setup test cluster
3005- let ( _sui_cluster_handle, _cluster, client, _, _) =
3006- test_cluster:: E2eTestSetupBuilder :: new ( ) . build ( ) . await ?;
3038+ let ( _sui_cluster_handle, walrus_cluster, client, _, _) =
3039+ test_cluster:: E2eTestSetupBuilder :: new ( )
3040+ . with_test_nodes_config ( test_nodes_config)
3041+ . build ( )
3042+ . await ?;
30073043
30083044 // Generate and store a test blob (~100KB to span multiple slivers)
30093045 let blob_size = 100_000 ;
@@ -3023,6 +3059,10 @@ async fn test_streaming_blob() -> TestResult {
30233059 . blob_id ( )
30243060 . expect ( "blob ID should be present" ) ;
30253061
3062+ if with_node_failure {
3063+ fail_random_node ( & walrus_cluster) ;
3064+ }
3065+
30263066 // Create a read-only client for streaming (SuiReadClient implements Clone)
30273067 let sui_read_client = client. inner . sui_client ( ) . read_client ( ) . clone ( ) ;
30283068 let config = client. inner . config ( ) . clone ( ) ;
@@ -3417,5 +3457,178 @@ async fn test_aggregator_get_blob_by_blob_id() -> TestResult {
34173457 assert_eq ! ( content_type. to_str( ) . unwrap( ) , "application/json" ) ;
34183458
34193459 aggregator. shutdown ( ) . await ?;
3460+
3461+ Ok ( ( ) )
3462+ }
3463+
3464+ // Tests client side recovering slivers and reconstructing the blob.
3465+ // This test uploads a blob, recovers all slivers, and verifies the reconstructed blob
3466+ // matches the original.
3467+ async fn run_test_client_recover_slivers < E : EncodingAxis > (
3468+ client : & WalrusNodeClient < SuiContractClient > ,
3469+ ) -> TestResult {
3470+ // Generate a random blob.
3471+ let blob_size: u64 = thread_rng ( ) . gen_range ( 10000 ..=100000 ) ;
3472+ tracing:: info!( "blob size: {blob_size}" ) ;
3473+ let blobs = walrus_test_utils:: random_data_list (
3474+ usize:: try_from ( blob_size) . expect ( "blob size should be valid" ) ,
3475+ 1 ,
3476+ ) ;
3477+ let original_blob = & blobs[ 0 ] ;
3478+
3479+ // Store the blob on Walrus.
3480+ let blob_read_result = client
3481+ . reserve_and_store_blobs_retry_committees (
3482+ blobs. clone ( ) ,
3483+ vec ! [ ] ,
3484+ & StoreArgs :: default_with_epochs ( 5 ) . with_persistence ( BlobPersistence :: Permanent ) ,
3485+ )
3486+ . await ?;
3487+
3488+ let blob_id = blob_read_result[ 0 ]
3489+ . blob_id ( )
3490+ . expect ( "blob ID should be present" ) ;
3491+
3492+ tracing:: info!( "stored blob {blob_id}" ) ;
3493+ let current_epoch = client. sui_client ( ) . current_epoch ( ) . await ?;
3494+ let metadata = client. retrieve_metadata ( current_epoch, & blob_id) . await ?;
3495+
3496+ // Get encoding config and calculate the number of systematic slivers for the encoding axis.
3497+ let encoding_config = client
3498+ . encoding_config ( )
3499+ . get_for_type ( metadata. metadata ( ) . encoding_type ( ) ) ;
3500+ let n_systematic = encoding_config. n_systematic_slivers :: < E > ( ) ;
3501+
3502+ // Create a list of all systematic sliver indices.
3503+ let all_primary_sliver_indices: Vec < _ > = ( 0 ..n_systematic. get ( ) )
3504+ . map ( walrus_core:: SliverIndex :: new)
3505+ . collect ( ) ;
3506+
3507+ tracing:: info!( "recovering {} slivers" , n_systematic) ;
3508+
3509+ // Recover all primary slivers using the recover_slivers method.
3510+ let recovered_slivers = client
3511+ . recover_slivers :: < E > ( & metadata, & all_primary_sliver_indices, current_epoch)
3512+ . await ?;
3513+
3514+ tracing:: info!( "recovered {} slivers" , recovered_slivers. len( ) ) ;
3515+
3516+ // Verify we got all the slivers.
3517+ assert_eq ! (
3518+ recovered_slivers. len( ) ,
3519+ usize :: from( n_systematic. get( ) ) ,
3520+ "should recover all systematic slivers"
3521+ ) ;
3522+
3523+ // Sort slivers by index to ensure they're in order.
3524+ let mut sorted_slivers = recovered_slivers;
3525+ sorted_slivers. sort_by_key ( |s| s. index . get ( ) ) ;
3526+
3527+ // Reconstruct the blob from the recovered slivers.
3528+ let mut reconstructed_blob = Vec :: new ( ) ;
3529+ if E :: IS_PRIMARY {
3530+ for sliver in & sorted_slivers {
3531+ reconstructed_blob. extend_from_slice ( sliver. symbols . data ( ) ) ;
3532+ }
3533+ } else {
3534+ // For secondary slivers, slivers are in columns. So the reconstructed blob needs to
3535+ // first take the first bytes from each sliver to form a row, and then concatenate the rows
3536+ // to form the reconstructed blob.
3537+ let row_length = sorted_slivers[ 0 ] . len ( ) / sorted_slivers[ 0 ] . symbols . symbol_usize ( ) ;
3538+ for row in 0 ..row_length {
3539+ for sliver in sorted_slivers. iter ( ) {
3540+ reconstructed_blob. extend_from_slice (
3541+ & sliver. symbols . data ( ) [ row * sliver. symbols . symbol_usize ( )
3542+ ..( row + 1 ) * sliver. symbols . symbol_usize ( ) ] ,
3543+ ) ;
3544+ }
3545+ }
3546+ }
3547+
3548+ // Truncate to the original blob size (last sliver may have padding).
3549+ reconstructed_blob. truncate ( original_blob. len ( ) ) ;
3550+
3551+ // Verify the reconstructed blob matches the original.
3552+ assert_eq ! (
3553+ reconstructed_blob, * original_blob,
3554+ "reconstructed blob should match original"
3555+ ) ;
3556+
3557+ Ok ( ( ) )
3558+ }
3559+
3560+ /// Tests client side recovering sliver functionality.
3561+ #[ ignore = "ignore E2E tests by default" ]
3562+ #[ walrus_simtest]
3563+ async fn test_client_recover_slivers ( ) -> TestResult {
3564+ walrus_test_utils:: init_tracing ( ) ;
3565+
3566+ let test_nodes_config = TestNodesConfig :: builder ( )
3567+ . with_node_weights ( & [ 7 , 7 , 7 , 7 , 7 ] )
3568+ . build ( ) ;
3569+ let test_cluster_builder =
3570+ test_cluster:: E2eTestSetupBuilder :: new ( ) . with_test_nodes_config ( test_nodes_config) ;
3571+ let ( _sui_cluster_handle, _cluster, client, _, _) = test_cluster_builder. build ( ) . await ?;
3572+ let client = client. as_ref ( ) ;
3573+
3574+ for _ in 0 ..10 {
3575+ run_test_client_recover_slivers :: < Primary > ( client) . await ?;
3576+ run_test_client_recover_slivers :: < Secondary > ( client) . await ?;
3577+ }
3578+
3579+ Ok ( ( ) )
3580+ }
3581+
3582+ /// Test client side recovery with short timeout returns proper error.
3583+ #[ ignore = "ignore E2E tests by default" ]
3584+ #[ walrus_simtest]
3585+ async fn test_client_recover_slivers_timeout ( ) -> TestResult {
3586+ walrus_test_utils:: init_tracing ( ) ;
3587+
3588+ let test_nodes_config = TestNodesConfig :: builder ( )
3589+ . with_node_weights ( & [ 7 , 7 , 7 , 7 , 7 ] )
3590+ . build ( ) ;
3591+ let test_cluster_builder =
3592+ test_cluster:: E2eTestSetupBuilder :: new ( ) . with_test_nodes_config ( test_nodes_config) ;
3593+ let ( _sui_cluster_handle, _cluster, client, _, _) = test_cluster_builder. build ( ) . await ?;
3594+ let client = client. as_ref ( ) ;
3595+
3596+ // Generate a random blob.
3597+ let blob_size: u64 = 10000 ;
3598+ let blobs = walrus_test_utils:: random_data_list (
3599+ usize:: try_from ( blob_size) . expect ( "blob size should be valid" ) ,
3600+ 1 ,
3601+ ) ;
3602+
3603+ // Store the blob on Walrus.
3604+ let blob_read_result = client
3605+ . reserve_and_store_blobs_retry_committees (
3606+ blobs. clone ( ) ,
3607+ vec ! [ ] ,
3608+ & StoreArgs :: default_with_epochs ( 5 ) . with_persistence ( BlobPersistence :: Permanent ) ,
3609+ )
3610+ . await ?;
3611+
3612+ let blob_id = blob_read_result[ 0 ]
3613+ . blob_id ( )
3614+ . expect ( "blob ID should be present" ) ;
3615+
3616+ let byte_range_client = ByteRangeReadClient :: new (
3617+ client,
3618+ ByteRangeReadClientConfig {
3619+ timeout : Duration :: from_secs ( 0 ) ,
3620+ ..Default :: default ( )
3621+ } ,
3622+ ) ;
3623+
3624+ // Given that the timeout is 0, the read byte range should fail with error.
3625+ let result = byte_range_client. read_byte_range ( & blob_id, 0 , 100 ) . await ;
3626+ assert ! (
3627+ result. is_err( )
3628+ && result
3629+ . unwrap_err( )
3630+ . to_string( )
3631+ . contains( "failed to retrieve some slivers" )
3632+ ) ;
34203633 Ok ( ( ) )
34213634}
0 commit comments