diff --git a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageDecoderDownloadTests.java b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageDecoderDownloadTests.java index 47e9e9023f0f..11786dc6f166 100644 --- a/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageDecoderDownloadTests.java +++ b/sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageDecoderDownloadTests.java @@ -467,4 +467,234 @@ public void downloadStreamWithResponseContentValidationSmartRetryLargeBlob() thr "Request " + i + " should have a valid range header, but was: " + rangeHeader); } } + + @Test + public void uninterruptedStreamWithStructuredMessageDecoding() throws IOException { + // Test: Verify that structured message decoding works correctly without any interruptions + // This mirrors the .NET test: UninterruptedStream + byte[] randomData = getRandomByteArray(4 * Constants.KB); + StructuredMessageEncoder encoder + = new StructuredMessageEncoder(randomData.length, Constants.KB, StructuredMessageFlags.STORAGE_CRC64); + ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData)); + + Flux input = Flux.just(encodedData); + + // Upload the encoded data + bc.upload(input, null, true).block(); + + // Create a download client with decoder policy but NO mock interruption policy + StorageContentValidationDecoderPolicy decoderPolicy = new StorageContentValidationDecoderPolicy(); + BlobAsyncClient downloadClient + = getBlobAsyncClient(ENVIRONMENT.getPrimaryAccount().getCredential(), bc.getBlobUrl(), decoderPolicy); + + DownloadContentValidationOptions validationOptions + = new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true); + + // Download with validation - should succeed without any interruptions + StepVerifier + .create( + downloadClient + .downloadStreamWithResponse((BlobRange) null, null, (BlobRequestConditions) null, false, + validationOptions) + .flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))) + .assertNext(result -> { + // Verify the decoded data matches the original + TestUtils.assertArraysEqual(result, randomData); + }) + .verifyComplete(); + } + + @Test + public void interruptWithDataIntact() throws IOException { + // Test: Verify that data remains intact after a single interruption and retry + // This mirrors the .NET test: Interrupt_DataIntact with single interrupt + final int segmentSize = Constants.KB; + byte[] randomData = getRandomByteArray(4 * segmentSize); + StructuredMessageEncoder encoder + = new StructuredMessageEncoder(randomData.length, segmentSize, StructuredMessageFlags.STORAGE_CRC64); + ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData)); + + Flux input = Flux.just(encodedData); + + // Create a policy that will simulate 1 network interruption at a specific position + // Interrupt after first segment completes to test smart retry from segment boundary + // Use 1200 bytes to ensure at least one 1KB segment completes + MockPartialResponsePolicy mockPolicy = new MockPartialResponsePolicy(1, 1200); + + // Upload the encoded data + bc.upload(input, null, true).block(); + + // Create download client with mock interruption and decoder policies + StorageContentValidationDecoderPolicy decoderPolicy = new StorageContentValidationDecoderPolicy(); + BlobAsyncClient downloadClient = getBlobAsyncClient(ENVIRONMENT.getPrimaryAccount().getCredential(), + bc.getBlobUrl(), mockPolicy, decoderPolicy); + + DownloadContentValidationOptions validationOptions + = new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true); + DownloadRetryOptions retryOptions = new DownloadRetryOptions().setMaxRetryRequests(5); + + // Download with validation - should succeed despite the interruption + StepVerifier.create(downloadClient + .downloadStreamWithResponse((BlobRange) null, retryOptions, (BlobRequestConditions) null, false, + validationOptions) + .flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(result -> { + // Verify the decoded data matches the original exactly + TestUtils.assertArraysEqual(result, randomData); + }).verifyComplete(); + + // Verify that exactly 1 interruption was used (retry occurred) + assertEquals(0, mockPolicy.getTriesRemaining()); + + // Verify that retry used appropriate range header + List rangeHeaders = mockPolicy.getRangeHeaders(); + assertTrue(rangeHeaders.size() >= 2, "Expected at least 2 requests (initial + 1 retry)"); + } + + @Test + public void interruptMultipleTimesWithDataIntact() throws IOException { + // Test: Verify that data remains intact after multiple interruptions and retries + // This mirrors the .NET test: Interrupt_DataIntact with multiple interrupts + final int segmentSize = Constants.KB; + byte[] randomData = getRandomByteArray(4 * segmentSize); + StructuredMessageEncoder encoder + = new StructuredMessageEncoder(randomData.length, segmentSize, StructuredMessageFlags.STORAGE_CRC64); + ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData)); + + Flux input = Flux.just(encodedData); + + // Create a policy that will simulate 3 network interruptions + // Use 800 bytes for subsequent interruptions to get 3 interrupts with 4KB data + 1KB segments + MockPartialResponsePolicy mockPolicy = new MockPartialResponsePolicy(3, 800); + + // Upload the encoded data + bc.upload(input, null, true).block(); + + // Create download client with mock interruption and decoder policies + StorageContentValidationDecoderPolicy decoderPolicy = new StorageContentValidationDecoderPolicy(); + BlobAsyncClient downloadClient = getBlobAsyncClient(ENVIRONMENT.getPrimaryAccount().getCredential(), + bc.getBlobUrl(), mockPolicy, decoderPolicy); + + DownloadContentValidationOptions validationOptions + = new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true); + DownloadRetryOptions retryOptions = new DownloadRetryOptions().setMaxRetryRequests(10); + + // Download with validation - should succeed despite multiple interruptions + StepVerifier.create(downloadClient + .downloadStreamWithResponse((BlobRange) null, retryOptions, (BlobRequestConditions) null, false, + validationOptions) + .flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(result -> { + // Verify the decoded data matches the original exactly + TestUtils.assertArraysEqual(result, randomData); + }).verifyComplete(); + + // Verify that all 3 interruptions were used + assertEquals(0, mockPolicy.getTriesRemaining()); + + // Verify that retries used appropriate range headers (initial + 3 retries = 4 requests) + List rangeHeaders = mockPolicy.getRangeHeaders(); + assertTrue(rangeHeaders.size() >= 4, "Expected at least 4 requests (initial + 3 retries)"); + } + + @Test + public void interruptAndVerifyProperRewind() throws IOException { + // Test: Verify that interruption causes proper rewind to last complete segment boundary + // This mirrors the .NET test: Interrupt_AppropriateRewind + final int segmentSize = 512; + byte[] randomData = getRandomByteArray(Constants.KB); + StructuredMessageEncoder encoder + = new StructuredMessageEncoder(randomData.length, segmentSize, StructuredMessageFlags.STORAGE_CRC64); + ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData)); + + Flux input = Flux.just(encodedData); + + // Create a policy that will simulate 1 interruption + MockPartialResponsePolicy mockPolicy = new MockPartialResponsePolicy(1); + + // Upload the encoded data + bc.upload(input, null, true).block(); + + // Create download client with mock interruption and decoder policies + StorageContentValidationDecoderPolicy decoderPolicy = new StorageContentValidationDecoderPolicy(); + BlobAsyncClient downloadClient = getBlobAsyncClient(ENVIRONMENT.getPrimaryAccount().getCredential(), + bc.getBlobUrl(), mockPolicy, decoderPolicy); + + DownloadContentValidationOptions validationOptions + = new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true); + DownloadRetryOptions retryOptions = new DownloadRetryOptions().setMaxRetryRequests(5); + + // Download with validation + StepVerifier.create(downloadClient + .downloadStreamWithResponse((BlobRange) null, retryOptions, (BlobRequestConditions) null, false, + validationOptions) + .flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(result -> { + // Verify the decoded data matches the original + TestUtils.assertArraysEqual(result, randomData); + }).verifyComplete(); + + // Verify retry occurred + assertEquals(0, mockPolicy.getTriesRemaining()); + + // Verify the retry started from a segment boundary (not from 0) + List rangeHeaders = mockPolicy.getRangeHeaders(); + assertTrue(rangeHeaders.size() >= 2, "Expected at least 2 requests"); + + // First request should start from 0 + assertTrue(rangeHeaders.get(0).startsWith("bytes=0-") || rangeHeaders.get(0).startsWith("bytes=0"), + "First request should start from offset 0"); + + // Second request (retry) should start from a non-zero offset (segment boundary) + // With 512-byte segments and proper encoding, first segment completes at ~543 bytes + if (rangeHeaders.size() > 1) { + String retryRange = rangeHeaders.get(1); + assertTrue(retryRange.startsWith("bytes="), "Retry request should have a range header"); + // Extract the start offset from "bytes=X-Y" or "bytes=X" + String offsetStr = retryRange.substring(6).split("-")[0]; + long retryOffset = Long.parseLong(offsetStr); + assertTrue(retryOffset > 0, + "Retry should start from non-zero offset (segment boundary), but was: " + retryOffset); + } + } + + @Test + public void interruptAndVerifyProperDecode() throws IOException { + // Test: Verify that after interruption and retry, decoding continues correctly + // This mirrors the .NET test: Interrupt_ProperDecode + final int segmentSize = Constants.KB; + final int dataSize = 4 * segmentSize; + byte[] randomData = getRandomByteArray(dataSize); + StructuredMessageEncoder encoder + = new StructuredMessageEncoder(randomData.length, segmentSize, StructuredMessageFlags.STORAGE_CRC64); + ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData)); + + Flux input = Flux.just(encodedData); + + // Create a policy with 2 interruptions to test multi-step decode after retries + // Use 1000 bytes to get 2 interruptions with 4KB data + 1KB segments + MockPartialResponsePolicy mockPolicy = new MockPartialResponsePolicy(2, 1000); + + // Upload the encoded data + bc.upload(input, null, true).block(); + + // Create download client with mock interruption and decoder policies + StorageContentValidationDecoderPolicy decoderPolicy = new StorageContentValidationDecoderPolicy(); + BlobAsyncClient downloadClient = getBlobAsyncClient(ENVIRONMENT.getPrimaryAccount().getCredential(), + bc.getBlobUrl(), mockPolicy, decoderPolicy); + + DownloadContentValidationOptions validationOptions + = new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true); + DownloadRetryOptions retryOptions = new DownloadRetryOptions().setMaxRetryRequests(10); + + // Download with validation - decoder must properly handle state across retries + StepVerifier.create(downloadClient + .downloadStreamWithResponse((BlobRange) null, retryOptions, (BlobRequestConditions) null, false, + validationOptions) + .flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(result -> { + // Verify every byte is correctly decoded despite multiple interruptions + assertEquals(dataSize, result.length, "Decoded data should have exactly " + dataSize + " bytes"); + TestUtils.assertArraysEqual(result, randomData); + }).verifyComplete(); + + // Verify both interruptions were used + assertEquals(0, mockPolicy.getTriesRemaining()); + } } diff --git a/sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockPartialResponsePolicy.java b/sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockPartialResponsePolicy.java index 9dff1fcc2be5..fa6125d027f2 100644 --- a/sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockPartialResponsePolicy.java +++ b/sdk/storage/azure-storage-common/src/test-shared/java/com/azure/storage/common/test/shared/policy/MockPartialResponsePolicy.java @@ -22,10 +22,28 @@ public class MockPartialResponsePolicy implements HttpPipelinePolicy { static final HttpHeaderName X_MS_RANGE_HEADER = HttpHeaderName.fromString("x-ms-range"); static final HttpHeaderName RANGE_HEADER = HttpHeaderName.RANGE; private int tries; + private int triesUsed = 0; // Track how many interruptions have occurred private final List rangeHeaders = new ArrayList<>(); + private final int maxBytesPerResponse; // Maximum bytes to return before simulating timeout + /** + * Creates a MockPartialResponsePolicy that simulates network interruptions. + * + * @param tries Number of times to simulate interruptions (0 = no interruptions) + */ public MockPartialResponsePolicy(int tries) { + this(tries, 200); // Default: return 200 bytes for subsequent interruptions (enables 3 interrupts with 1KB data) + } + + /** + * Creates a MockPartialResponsePolicy with configurable interruption behavior. + * + * @param tries Number of times to simulate interruptions (0 = no interruptions) + * @param maxBytesPerResponse Maximum bytes to return in each interrupted response + */ + public MockPartialResponsePolicy(int tries, int maxBytesPerResponse) { this.tries = tries; + this.maxBytesPerResponse = maxBytesPerResponse; } @Override @@ -51,26 +69,76 @@ public Mono process(HttpPipelineCallContext context, HttpPipelineN return Mono.just(response); } else { this.tries -= 1; - return response.getBody().collectList().flatMap(bodyBuffers -> { - if (bodyBuffers.isEmpty()) { - // If no body was returned, don't attempt to slice a partial response. Simply propagate - // the original response to avoid test failures when the service unexpectedly returns an - // empty body (for example, after a retry on the underlying transport). - return Mono.just(response); - } - ByteBuffer firstBuffer = bodyBuffers.get(0); - byte firstByte = firstBuffer.get(); - - // Simulate partial response by returning the first byte only from the requested range and timeout - return Mono.just(new MockDownloadHttpResponse(response, 206, - Flux.just(ByteBuffer.wrap(new byte[] { firstByte })) - .concatWith(Flux.error(new IOException("Simulated timeout"))) - )); - }); + this.triesUsed++; + + // Use variable byte limits per interruption to properly test smart retry: + // - First interruption: Return enough to complete at least one segment (for smart retry testing) + // - Subsequent interruptions: Return smaller amounts to exercise multiple retries + int byteLimitForThisRequest; + if (triesUsed == 1) { + // First interruption: ensure first segment completes (543 bytes total for segment 1) + // Return 560 bytes to be safe + byteLimitForThisRequest = Math.max(maxBytesPerResponse, 560); + } else { + // Subsequent interruptions: use configured limit (default 270 bytes) + byteLimitForThisRequest = maxBytesPerResponse; + } + + // Don't use collectList() as it would consume the entire stream. + // Instead, manipulate the Flux directly to limit bytes before throwing error. + // This works correctly whether the body is encoded or decoded. + Flux limitedBody = limitStreamToBytes(response.getBody(), byteLimitForThisRequest); + return Mono.just(new MockDownloadHttpResponse(response, 206, limitedBody)); } }); } + /** + * Limits a Flux of ByteBuffers to emit at most maxBytes before throwing an IOException. + * This works on the stream directly without collecting all buffers, allowing it to work + * correctly whether the stream contains encoded or decoded data. + */ + private Flux limitStreamToBytes(Flux body, int maxBytes) { + return Flux.defer(() -> { + final long[] bytesEmitted = new long[]{0}; + return body.concatMap(buffer -> { + if (buffer == null || !buffer.hasRemaining()) { + return Flux.just(buffer); + } + + long remaining = maxBytes - bytesEmitted[0]; + if (remaining <= 0) { + // Already emitted enough, throw error immediately + return Flux.error(new IOException("Simulated timeout")); + } + + int bufferSize = buffer.remaining(); + if (bufferSize <= remaining) { + // Emit the entire buffer + bytesEmitted[0] += bufferSize; + if (bytesEmitted[0] >= maxBytes) { + // Hit the limit, emit this buffer then error + return Flux.just(buffer).concatWith(Flux.error(new IOException("Simulated timeout"))); + } + return Flux.just(buffer); + } else { + // Buffer is larger than remaining, need to slice it + int bytesToEmit = (int) remaining; + ByteBuffer slice = buffer.duplicate(); + slice.limit(slice.position() + bytesToEmit); + + ByteBuffer limited = ByteBuffer.allocate(bytesToEmit); + limited.put(slice); + limited.flip(); + + bytesEmitted[0] += bytesToEmit; + // Emit the limited buffer then error + return Flux.just(limited).concatWith(Flux.error(new IOException("Simulated timeout"))); + } + }); + }); + } + public int getTriesRemaining() { return tries; }