66import com .azure .core .test .utils .TestUtils ;
77import com .azure .core .util .FluxUtil ;
88import com .azure .storage .blob .BlobAsyncClient ;
9+ import com .azure .storage .blob .BlobClientBuilder ;
910import com .azure .storage .blob .BlobTestBase ;
1011import com .azure .storage .blob .models .BlobRange ;
1112import com .azure .storage .blob .models .BlobRequestConditions ;
1415import com .azure .storage .common .implementation .Constants ;
1516import com .azure .storage .common .implementation .structuredmessage .StructuredMessageEncoder ;
1617import com .azure .storage .common .implementation .structuredmessage .StructuredMessageFlags ;
18+ import com .azure .storage .common .test .shared .policy .MockPartialResponsePolicy ;
1719import org .junit .jupiter .api .BeforeEach ;
1820import org .junit .jupiter .api .Test ;
1921import reactor .core .publisher .Flux ;
2022import reactor .test .StepVerifier ;
2123
2224import java .io .IOException ;
2325import java .nio .ByteBuffer ;
26+ import java .util .List ;
2427
2528import static org .junit .jupiter .api .Assertions .assertEquals ;
2629import static org .junit .jupiter .api .Assertions .assertNotNull ;
@@ -78,8 +81,8 @@ public void downloadStreamWithResponseContentValidationRange() throws IOExceptio
7881 BlobRange range = new BlobRange (0 , 512L );
7982
8083 StepVerifier .create (bc .upload (input , null , true )
81- .then (bc . downloadStreamWithResponse ( range , ( DownloadRetryOptions ) null ,
82- (BlobRequestConditions ) null , false ))
84+ .then (
85+ bc . downloadStreamWithResponse ( range , ( DownloadRetryOptions ) null , (BlobRequestConditions ) null , false ))
8386 .flatMap (r -> FluxUtil .collectBytesInByteBufferStream (r .getValue ()))).assertNext (r -> {
8487 assertNotNull (r );
8588 // Should get exactly 512 bytes of encoded data
@@ -142,17 +145,14 @@ public void downloadStreamWithResponseNoValidation() throws IOException {
142145 Flux <ByteBuffer > input = Flux .just (encodedData );
143146
144147 // No validation options - should download encoded data as-is
145- StepVerifier
146- .create (bc .upload (input , null , true )
147- .then (bc .downloadStreamWithResponse ((BlobRange ) null , (DownloadRetryOptions ) null ,
148- (BlobRequestConditions ) null , false ))
149- .flatMap (r -> FluxUtil .collectBytesInByteBufferStream (r .getValue ())))
150- .assertNext (r -> {
148+ StepVerifier .create (bc .upload (input , null , true )
149+ .then (bc .downloadStreamWithResponse ((BlobRange ) null , (DownloadRetryOptions ) null ,
150+ (BlobRequestConditions ) null , false ))
151+ .flatMap (r -> FluxUtil .collectBytesInByteBufferStream (r .getValue ()))).assertNext (r -> {
151152 assertNotNull (r );
152153 // Should get encoded data, not decoded
153154 assertTrue (r .length > randomData .length ); // Encoded data is larger
154- })
155- .verifyComplete ();
155+ }).verifyComplete ();
156156 }
157157
158158 @ Test
@@ -168,17 +168,14 @@ public void downloadStreamWithResponseValidationDisabled() throws IOException {
168168 DownloadContentValidationOptions validationOptions
169169 = new DownloadContentValidationOptions ().setStructuredMessageValidationEnabled (false );
170170
171- StepVerifier
172- .create (bc .upload (input , null , true )
173- .then (bc .downloadStreamWithResponse ((BlobRange ) null , (DownloadRetryOptions ) null ,
174- (BlobRequestConditions ) null , false , validationOptions ))
175- .flatMap (r -> FluxUtil .collectBytesInByteBufferStream (r .getValue ())))
176- .assertNext (r -> {
171+ StepVerifier .create (bc .upload (input , null , true )
172+ .then (bc .downloadStreamWithResponse ((BlobRange ) null , (DownloadRetryOptions ) null ,
173+ (BlobRequestConditions ) null , false , validationOptions ))
174+ .flatMap (r -> FluxUtil .collectBytesInByteBufferStream (r .getValue ()))).assertNext (r -> {
177175 assertNotNull (r );
178176 // Should get encoded data, not decoded
179177 assertTrue (r .length > randomData .length ); // Encoded data is larger
180- })
181- .verifyComplete ();
178+ }).verifyComplete ();
182179 }
183180
184181 @ Test
@@ -224,4 +221,147 @@ public void downloadStreamWithResponseContentValidationVeryLargeBlob() throws IO
224221 .assertNext (r -> TestUtils .assertArraysEqual (r , randomData ))
225222 .verifyComplete ();
226223 }
224+
225+ @ Test
226+ public void downloadStreamWithResponseContentValidationSmartRetry () throws IOException {
227+ // Test smart retry functionality with structured message validation
228+ // This test simulates network interruptions and verifies that:
229+ // 1. The decoder validates checksums for all received data before retry
230+ // 2. The decoder state is preserved across retries
231+ // 3. The SDK continues from the correct offset after interruption
232+
233+ byte [] randomData = getRandomByteArray (Constants .KB );
234+ StructuredMessageEncoder encoder
235+ = new StructuredMessageEncoder (randomData .length , 512 , StructuredMessageFlags .STORAGE_CRC64 );
236+ ByteBuffer encodedData = encoder .encode (ByteBuffer .wrap (randomData ));
237+
238+ Flux <ByteBuffer > input = Flux .just (encodedData );
239+
240+ // Create a policy that will simulate 3 network interruptions
241+ MockPartialResponsePolicy mockPolicy = new MockPartialResponsePolicy (3 );
242+
243+ // Upload the encoded data using the regular client
244+ bc .upload (input , null , true ).block ();
245+
246+ // Create a download client with the mock policy to simulate interruptions
247+ BlobAsyncClient downloadClient
248+ = getBlobAsyncClient (ENVIRONMENT .getPrimaryAccount ().getCredential (), bc .getBlobUrl (), mockPolicy );
249+
250+ DownloadContentValidationOptions validationOptions
251+ = new DownloadContentValidationOptions ().setStructuredMessageValidationEnabled (true );
252+
253+ // Configure retry options to allow retries
254+ DownloadRetryOptions retryOptions = new DownloadRetryOptions ().setMaxRetryRequests (5 );
255+
256+ // Download with validation - should succeed despite interruptions
257+ StepVerifier .create (downloadClient
258+ .downloadStreamWithResponse ((BlobRange ) null , retryOptions , (BlobRequestConditions ) null , false ,
259+ validationOptions )
260+ .flatMap (r -> FluxUtil .collectBytesInByteBufferStream (r .getValue ()))).assertNext (r -> {
261+ // Verify the data is correctly decoded
262+ TestUtils .assertArraysEqual (r , randomData );
263+ }).verifyComplete ();
264+
265+ // Verify that retries occurred (3 interruptions means we should have 0 tries remaining)
266+ assertEquals (0 , mockPolicy .getTriesRemaining ());
267+
268+ // Verify that range headers were sent for retries
269+ List <String > rangeHeaders = mockPolicy .getRangeHeaders ();
270+ assertTrue (rangeHeaders .size () > 0 , "Expected range headers for retries" );
271+
272+ // The first request should be for the entire blob
273+ assertTrue (rangeHeaders .get (0 ).startsWith ("bytes=0-" ), "First request should start from offset 0" );
274+
275+ // Subsequent requests should continue from where we left off (after the first byte)
276+ for (int i = 1 ; i < rangeHeaders .size (); i ++) {
277+ String rangeHeader = rangeHeaders .get (i );
278+ assertTrue (rangeHeader .startsWith ("bytes=" + i + "-" ),
279+ "Retry request " + i + " should start from offset " + i + " but was: " + rangeHeader );
280+ }
281+ }
282+
283+ @ Test
284+ public void downloadStreamWithResponseContentValidationSmartRetryMultipleSegments () throws IOException {
285+ // Test smart retry with multiple segments to ensure checksum validation
286+ // works correctly across segment boundaries during network interruptions
287+
288+ byte [] randomData = getRandomByteArray (2 * Constants .KB );
289+ StructuredMessageEncoder encoder
290+ = new StructuredMessageEncoder (randomData .length , 512 , StructuredMessageFlags .STORAGE_CRC64 );
291+ ByteBuffer encodedData = encoder .encode (ByteBuffer .wrap (randomData ));
292+
293+ Flux <ByteBuffer > input = Flux .just (encodedData );
294+
295+ // Create a policy that will simulate 4 network interruptions
296+ MockPartialResponsePolicy mockPolicy = new MockPartialResponsePolicy (4 );
297+
298+ // Upload the encoded data
299+ bc .upload (input , null , true ).block ();
300+
301+ // Create a download client with the mock policy
302+ BlobAsyncClient downloadClient
303+ = getBlobAsyncClient (ENVIRONMENT .getPrimaryAccount ().getCredential (), bc .getBlobUrl (), mockPolicy );
304+
305+ DownloadContentValidationOptions validationOptions
306+ = new DownloadContentValidationOptions ().setStructuredMessageValidationEnabled (true );
307+
308+ DownloadRetryOptions retryOptions = new DownloadRetryOptions ().setMaxRetryRequests (5 );
309+
310+ // Download with validation - should succeed and validate all segment checksums
311+ StepVerifier .create (downloadClient
312+ .downloadStreamWithResponse ((BlobRange ) null , retryOptions , (BlobRequestConditions ) null , false ,
313+ validationOptions )
314+ .flatMap (r -> FluxUtil .collectBytesInByteBufferStream (r .getValue ()))).assertNext (r -> {
315+ // Verify the data is correctly decoded
316+ TestUtils .assertArraysEqual (r , randomData );
317+ }).verifyComplete ();
318+
319+ // Verify that retries occurred
320+ assertEquals (0 , mockPolicy .getTriesRemaining ());
321+
322+ // Verify multiple retry requests were made
323+ List <String > rangeHeaders = mockPolicy .getRangeHeaders ();
324+ assertTrue (rangeHeaders .size () >= 4 ,
325+ "Expected at least 4 range headers for retries, got: " + rangeHeaders .size ());
326+ }
327+
328+ @ Test
329+ public void downloadStreamWithResponseContentValidationSmartRetryLargeBlob () throws IOException {
330+ // Test smart retry with a larger blob to ensure decoder state
331+ // is correctly maintained across retries with more data
332+
333+ byte [] randomData = getRandomByteArray (5 * Constants .KB );
334+ StructuredMessageEncoder encoder
335+ = new StructuredMessageEncoder (randomData .length , 1024 , StructuredMessageFlags .STORAGE_CRC64 );
336+ ByteBuffer encodedData = encoder .encode (ByteBuffer .wrap (randomData ));
337+
338+ Flux <ByteBuffer > input = Flux .just (encodedData );
339+
340+ // Create a policy that will simulate 2 network interruptions
341+ MockPartialResponsePolicy mockPolicy = new MockPartialResponsePolicy (2 );
342+
343+ // Upload the encoded data
344+ bc .upload (input , null , true ).block ();
345+
346+ // Create a download client with the mock policy
347+ BlobAsyncClient downloadClient
348+ = getBlobAsyncClient (ENVIRONMENT .getPrimaryAccount ().getCredential (), bc .getBlobUrl (), mockPolicy );
349+
350+ DownloadContentValidationOptions validationOptions
351+ = new DownloadContentValidationOptions ().setStructuredMessageValidationEnabled (true );
352+
353+ DownloadRetryOptions retryOptions = new DownloadRetryOptions ().setMaxRetryRequests (5 );
354+
355+ // Download with validation - decoder should validate checksums before each retry
356+ StepVerifier .create (downloadClient
357+ .downloadStreamWithResponse ((BlobRange ) null , retryOptions , (BlobRequestConditions ) null , false ,
358+ validationOptions )
359+ .flatMap (r -> FluxUtil .collectBytesInByteBufferStream (r .getValue ()))).assertNext (r -> {
360+ // Verify the data is correctly decoded
361+ TestUtils .assertArraysEqual (r , randomData );
362+ }).verifyComplete ();
363+
364+ // Verify that retries occurred
365+ assertEquals (0 , mockPolicy .getTriesRemaining ());
366+ }
227367}
0 commit comments