Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdk/storage/azure-storage-blob/assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"AssetsRepo": "Azure/azure-sdk-assets",
"AssetsRepoPrefixPath": "java",
"TagPrefix": "java/storage/azure-storage-blob",
"Tag": "java/storage/azure-storage-blob_80c07fe827"
"Tag": "java/storage/azure-storage-blob_c976afa88e"
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.azure.storage.common.policy.ResponseValidationPolicyBuilder;
import com.azure.storage.common.policy.ScrubEtagPolicy;
import com.azure.storage.common.policy.StorageBearerTokenChallengeAuthorizationPolicy;
import com.azure.storage.common.policy.StorageContentValidationDecoderPolicy;
import com.azure.storage.common.policy.StorageSharedKeyCredentialPolicy;

import java.net.MalformedURLException;
Expand Down Expand Up @@ -140,6 +141,8 @@ public static HttpPipeline buildPipeline(StorageSharedKeyCredential storageShare

HttpPolicyProviders.addAfterRetryPolicies(policies);

policies.add(new StorageContentValidationDecoderPolicy());

policies.add(getResponseValidationPolicy());

policies.add(new HttpLoggingPolicy(logOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@
import com.azure.storage.blob.options.BlobSetAccessTierOptions;
import com.azure.storage.blob.options.BlobSetTagsOptions;
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
import com.azure.storage.common.DownloadContentValidationOptions;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.Utility;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.SasImplUtils;
import com.azure.storage.common.implementation.StorageImplUtils;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -1173,6 +1175,52 @@ public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange rang
}
}

/**
* Reads a range of bytes from a blob with content validation options. Uploading data must be done from the {@link BlockBlobClient}, {@link
* PageBlobClient}, or {@link AppendBlobClient}.
*
* <p><strong>Code Samples</strong></p>
*
* <pre>{@code
* BlobRange range = new BlobRange(1024, 2048L);
* DownloadRetryOptions options = new DownloadRetryOptions().setMaxRetryRequests(5);
* DownloadContentValidationOptions validationOptions = new DownloadContentValidationOptions()
* .setStructuredMessageValidationEnabled(true);
*
* client.downloadStreamWithResponse(range, options, null, false, validationOptions).subscribe(response -> {
* ByteArrayOutputStream downloadData = new ByteArrayOutputStream();
* response.getValue().subscribe(piece -> {
* try {
* downloadData.write(piece.array());
* } catch (IOException ex) {
* throw new UncheckedIOException(ex);
* }
* });
* });
* }</pre>
*
* <p>For more information, see the
* <a href="https://docs.microsoft.com/rest/api/storageservices/get-blob">Azure Docs</a></p>
*
* @param range {@link BlobRange}
* @param options {@link DownloadRetryOptions}
* @param requestConditions {@link BlobRequestConditions}
* @param getRangeContentMd5 Whether the contentMD5 for the specified blob range should be returned.
* @param contentValidationOptions {@link DownloadContentValidationOptions} options for content validation
* @return A reactive response containing the blob data.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
BlobRequestConditions requestConditions, boolean getRangeContentMd5,
DownloadContentValidationOptions contentValidationOptions) {
try {
return withContext(context -> downloadStreamWithResponse(range, options, requestConditions,
getRangeContentMd5, contentValidationOptions, context));
} catch (RuntimeException ex) {
return monoError(LOGGER, ex);
}
}

/**
* Reads a range of bytes from a blob. Uploading data must be done from the {@link BlockBlobClient}, {@link
* PageBlobClient}, or {@link AppendBlobClient}.
Expand Down Expand Up @@ -1215,19 +1263,41 @@ public Mono<BlobDownloadContentAsyncResponse> downloadContentWithResponse(Downlo
}

Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) {
BlobRequestConditions requestConditions, boolean getRangeContentMd5,
DownloadContentValidationOptions contentValidationOptions, Context context) {
BlobRange finalRange = range == null ? new BlobRange(0) : range;
Boolean getMD5 = getRangeContentMd5 ? getRangeContentMd5 : null;

// Determine MD5 validation: properly consider both getRangeContentMd5 parameter and validation options
// MD5 validation is enabled if:
// 1. getRangeContentMd5 is explicitly true, OR
// 2. contentValidationOptions.isMd5ValidationEnabled() is true
final Boolean finalGetMD5;
if (getRangeContentMd5
|| (contentValidationOptions != null && contentValidationOptions.isMd5ValidationEnabled())) {
finalGetMD5 = true;
} else {
finalGetMD5 = null;
}

BlobRequestConditions finalRequestConditions
= requestConditions == null ? new BlobRequestConditions() : requestConditions;
DownloadRetryOptions finalOptions = (options == null) ? new DownloadRetryOptions() : options;

// The first range should eagerly convert headers as they'll be used to create response types.
Context firstRangeContext = context == null
Context initialContext = context == null
? new Context("azure-eagerly-convert-headers", true)
: context.addData("azure-eagerly-convert-headers", true);

return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), getMD5,
// Add structured message decoding context if enabled
final Context firstRangeContext;
if (contentValidationOptions != null && contentValidationOptions.isStructuredMessageValidationEnabled()) {
firstRangeContext = initialContext.addData(Constants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true)
.addData(Constants.STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY, contentValidationOptions);
} else {
firstRangeContext = initialContext;
}

return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), finalGetMD5,
firstRangeContext).map(response -> {
BlobsDownloadHeaders blobsDownloadHeaders = new BlobsDownloadHeaders(response.getHeaders());
String eTag = blobsDownloadHeaders.getETag();
Expand Down Expand Up @@ -1271,16 +1341,22 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down

try {
return downloadRange(new BlobRange(initialOffset + offset, newCount), finalRequestConditions,
eTag, getMD5, context);
eTag, finalGetMD5, firstRangeContext);
} catch (Exception e) {
return Mono.error(e);
}
};

// Structured message decoding is now handled by StructuredMessageDecoderPolicy
return BlobDownloadAsyncResponseConstructorProxy.create(response, onDownloadErrorResume, finalOptions);
});
}

Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) {
return downloadStreamWithResponse(range, options, requestConditions, getRangeContentMd5, null, context);
}

private Mono<StreamResponse> downloadRange(BlobRange range, BlobRequestConditions requestConditions, String eTag,
Boolean getMD5, Context context) {
return azureBlobStorage.getBlobs()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.blob;

import com.azure.core.test.utils.TestUtils;
import com.azure.core.util.FluxUtil;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.common.DownloadContentValidationOptions;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.structuredmessage.StructuredMessageEncoder;
import com.azure.storage.common.implementation.structuredmessage.StructuredMessageFlags;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.io.IOException;
import java.nio.ByteBuffer;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Tests for structured message decoding during blob downloads using StorageContentValidationDecoderPolicy.
* These tests verify that the pipeline policy correctly decodes structured messages when content validation is enabled.
*/
public class BlobMessageDecoderDownloadTests extends BlobTestBase {

private BlobAsyncClient bc;

@BeforeEach
public void setup() {
String blobName = generateBlobName();
bc = ccAsync.getBlobAsyncClient(blobName);
bc.upload(Flux.just(ByteBuffer.wrap(new byte[0])), null).block();
}

@Test
public void downloadStreamWithResponseContentValidation() throws IOException {
byte[] randomData = getRandomByteArray(Constants.KB);
StructuredMessageEncoder encoder
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));

Flux<ByteBuffer> input = Flux.just(encodedData);

DownloadContentValidationOptions validationOptions
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);

StepVerifier
.create(bc.upload(input, null, true)
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
.assertNext(r -> TestUtils.assertArraysEqual(r, randomData))
.verifyComplete();
}

@Test
public void downloadStreamWithResponseContentValidationRange() throws IOException {
byte[] randomData = getRandomByteArray(Constants.KB);
StructuredMessageEncoder encoder
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));

Flux<ByteBuffer> input = Flux.just(encodedData);

DownloadContentValidationOptions validationOptions
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);

BlobRange range = new BlobRange(0, 512L);

StepVerifier.create(bc.upload(input, null, true)
.then(bc.downloadStreamWithResponse(range, null, null, false, validationOptions))
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(r -> {
assertNotNull(r);
assertTrue(r.length > 0);
}).verifyComplete();
}

@Test
public void downloadStreamWithResponseContentValidationLargeBlob() throws IOException {
// Test with larger data to verify chunking works correctly
byte[] randomData = getRandomByteArray(5 * Constants.KB);
StructuredMessageEncoder encoder
= new StructuredMessageEncoder(randomData.length, 1024, StructuredMessageFlags.STORAGE_CRC64);
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));

Flux<ByteBuffer> input = Flux.just(encodedData);

DownloadContentValidationOptions validationOptions
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);

StepVerifier
.create(bc.upload(input, null, true)
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
.assertNext(r -> TestUtils.assertArraysEqual(r, randomData))
.verifyComplete();
}

@Test
public void downloadStreamWithResponseContentValidationMultipleSegments() throws IOException {
// Test with multiple segments to ensure all segments are decoded correctly
byte[] randomData = getRandomByteArray(2 * Constants.KB);
StructuredMessageEncoder encoder
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));

Flux<ByteBuffer> input = Flux.just(encodedData);

DownloadContentValidationOptions validationOptions
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);

StepVerifier
.create(bc.upload(input, null, true)
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
.assertNext(r -> TestUtils.assertArraysEqual(r, randomData))
.verifyComplete();
}

@Test
public void downloadStreamWithResponseNoValidation() throws IOException {
// Test that download works normally when validation is not enabled
byte[] randomData = getRandomByteArray(Constants.KB);
StructuredMessageEncoder encoder
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));

Flux<ByteBuffer> input = Flux.just(encodedData);

// No validation options - should download encoded data as-is
StepVerifier.create(bc.upload(input, null, true)
.then(bc.downloadStreamWithResponse(null, null, null, false))
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(r -> {
assertNotNull(r);
// Should get encoded data, not decoded
assertTrue(r.length > randomData.length); // Encoded data is larger
}).verifyComplete();
}

@Test
public void downloadStreamWithResponseValidationDisabled() throws IOException {
// Test with validation options but validation disabled
byte[] randomData = getRandomByteArray(Constants.KB);
StructuredMessageEncoder encoder
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));

Flux<ByteBuffer> input = Flux.just(encodedData);

DownloadContentValidationOptions validationOptions
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(false);

StepVerifier.create(bc.upload(input, null, true)
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(r -> {
assertNotNull(r);
// Should get encoded data, not decoded
assertTrue(r.length > randomData.length); // Encoded data is larger
}).verifyComplete();
}

@Test
public void downloadStreamWithResponseContentValidationSmallSegment() throws IOException {
// Test with small segment size to ensure boundary conditions are handled
byte[] randomData = getRandomByteArray(256);
StructuredMessageEncoder encoder
= new StructuredMessageEncoder(randomData.length, 128, StructuredMessageFlags.STORAGE_CRC64);
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));

Flux<ByteBuffer> input = Flux.just(encodedData);

DownloadContentValidationOptions validationOptions
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);

StepVerifier
.create(bc.upload(input, null, true)
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
.assertNext(r -> TestUtils.assertArraysEqual(r, randomData))
.verifyComplete();
}

@Test
public void downloadStreamWithResponseContentValidationVeryLargeBlob() throws IOException {
// Test with very large data to verify chunking and policy work correctly with large blobs
byte[] randomData = getRandomByteArray(10 * Constants.KB);
StructuredMessageEncoder encoder
= new StructuredMessageEncoder(randomData.length, 2048, StructuredMessageFlags.STORAGE_CRC64);
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));

Flux<ByteBuffer> input = Flux.just(encodedData);

DownloadContentValidationOptions validationOptions
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);

StepVerifier
.create(bc.upload(input, null, true)
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
.assertNext(r -> TestUtils.assertArraysEqual(r, randomData))
.verifyComplete();
}
}
Loading
Loading