Skip to content

Commit c89e9b2

Browse files
adding the pipeline policy changes
1 parent ff9bbf4 commit c89e9b2

File tree

7 files changed

+634
-5
lines changed

7 files changed

+634
-5
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BuilderHelper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.azure.storage.common.policy.ResponseValidationPolicyBuilder;
4040
import com.azure.storage.common.policy.ScrubEtagPolicy;
4141
import com.azure.storage.common.policy.StorageBearerTokenChallengeAuthorizationPolicy;
42+
import com.azure.storage.common.policy.StorageContentValidationDecoderPolicy;
4243
import com.azure.storage.common.policy.StorageSharedKeyCredentialPolicy;
4344

4445
import java.net.MalformedURLException;
@@ -140,6 +141,8 @@ public static HttpPipeline buildPipeline(StorageSharedKeyCredential storageShare
140141

141142
HttpPolicyProviders.addAfterRetryPolicies(policies);
142143

144+
policies.add(new StorageContentValidationDecoderPolicy());
145+
143146
policies.add(getResponseValidationPolicy());
144147

145148
policies.add(new HttpLoggingPolicy(logOptions));

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@
7979
import com.azure.storage.blob.options.BlobSetAccessTierOptions;
8080
import com.azure.storage.blob.options.BlobSetTagsOptions;
8181
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
82+
import com.azure.storage.common.DownloadContentValidationOptions;
8283
import com.azure.storage.common.StorageSharedKeyCredential;
8384
import com.azure.storage.common.Utility;
85+
import com.azure.storage.common.implementation.Constants;
8486
import com.azure.storage.common.implementation.SasImplUtils;
8587
import com.azure.storage.common.implementation.StorageImplUtils;
8688
import reactor.core.publisher.Flux;
@@ -1173,6 +1175,52 @@ public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange rang
11731175
}
11741176
}
11751177

1178+
/**
1179+
* Reads a range of bytes from a blob with content validation options. Uploading data must be done from the {@link BlockBlobClient}, {@link
1180+
* PageBlobClient}, or {@link AppendBlobClient}.
1181+
*
1182+
* <p><strong>Code Samples</strong></p>
1183+
*
1184+
* <pre>{@code
1185+
* BlobRange range = new BlobRange(1024, 2048L);
1186+
* DownloadRetryOptions options = new DownloadRetryOptions().setMaxRetryRequests(5);
1187+
* DownloadContentValidationOptions validationOptions = new DownloadContentValidationOptions()
1188+
* .setStructuredMessageValidationEnabled(true);
1189+
*
1190+
* client.downloadStreamWithResponse(range, options, null, false, validationOptions).subscribe(response -> {
1191+
* ByteArrayOutputStream downloadData = new ByteArrayOutputStream();
1192+
* response.getValue().subscribe(piece -> {
1193+
* try {
1194+
* downloadData.write(piece.array());
1195+
* } catch (IOException ex) {
1196+
* throw new UncheckedIOException(ex);
1197+
* }
1198+
* });
1199+
* });
1200+
* }</pre>
1201+
*
1202+
* <p>For more information, see the
1203+
* <a href="https://docs.microsoft.com/rest/api/storageservices/get-blob">Azure Docs</a></p>
1204+
*
1205+
* @param range {@link BlobRange}
1206+
* @param options {@link DownloadRetryOptions}
1207+
* @param requestConditions {@link BlobRequestConditions}
1208+
* @param getRangeContentMd5 Whether the contentMD5 for the specified blob range should be returned.
1209+
* @param contentValidationOptions {@link DownloadContentValidationOptions} options for content validation
1210+
* @return A reactive response containing the blob data.
1211+
*/
1212+
@ServiceMethod(returns = ReturnType.SINGLE)
1213+
public Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
1214+
BlobRequestConditions requestConditions, boolean getRangeContentMd5,
1215+
DownloadContentValidationOptions contentValidationOptions) {
1216+
try {
1217+
return withContext(context -> downloadStreamWithResponse(range, options, requestConditions,
1218+
getRangeContentMd5, contentValidationOptions, context));
1219+
} catch (RuntimeException ex) {
1220+
return monoError(LOGGER, ex);
1221+
}
1222+
}
1223+
11761224
/**
11771225
* Reads a range of bytes from a blob. Uploading data must be done from the {@link BlockBlobClient}, {@link
11781226
* PageBlobClient}, or {@link AppendBlobClient}.
@@ -1215,19 +1263,41 @@ public Mono<BlobDownloadContentAsyncResponse> downloadContentWithResponse(Downlo
12151263
}
12161264

12171265
Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
1218-
BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) {
1266+
BlobRequestConditions requestConditions, boolean getRangeContentMd5,
1267+
DownloadContentValidationOptions contentValidationOptions, Context context) {
12191268
BlobRange finalRange = range == null ? new BlobRange(0) : range;
1220-
Boolean getMD5 = getRangeContentMd5 ? getRangeContentMd5 : null;
1269+
1270+
// Determine MD5 validation: properly consider both getRangeContentMd5 parameter and validation options
1271+
// MD5 validation is enabled if:
1272+
// 1. getRangeContentMd5 is explicitly true, OR
1273+
// 2. contentValidationOptions.isMd5ValidationEnabled() is true
1274+
final Boolean finalGetMD5;
1275+
if (getRangeContentMd5
1276+
|| (contentValidationOptions != null && contentValidationOptions.isMd5ValidationEnabled())) {
1277+
finalGetMD5 = true;
1278+
} else {
1279+
finalGetMD5 = null;
1280+
}
1281+
12211282
BlobRequestConditions finalRequestConditions
12221283
= requestConditions == null ? new BlobRequestConditions() : requestConditions;
12231284
DownloadRetryOptions finalOptions = (options == null) ? new DownloadRetryOptions() : options;
12241285

12251286
// The first range should eagerly convert headers as they'll be used to create response types.
1226-
Context firstRangeContext = context == null
1287+
Context initialContext = context == null
12271288
? new Context("azure-eagerly-convert-headers", true)
12281289
: context.addData("azure-eagerly-convert-headers", true);
12291290

1230-
return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), getMD5,
1291+
// Add structured message decoding context if enabled
1292+
final Context firstRangeContext;
1293+
if (contentValidationOptions != null && contentValidationOptions.isStructuredMessageValidationEnabled()) {
1294+
firstRangeContext = initialContext.addData(Constants.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true)
1295+
.addData(Constants.STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY, contentValidationOptions);
1296+
} else {
1297+
firstRangeContext = initialContext;
1298+
}
1299+
1300+
return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), finalGetMD5,
12311301
firstRangeContext).map(response -> {
12321302
BlobsDownloadHeaders blobsDownloadHeaders = new BlobsDownloadHeaders(response.getHeaders());
12331303
String eTag = blobsDownloadHeaders.getETag();
@@ -1271,16 +1341,22 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
12711341

12721342
try {
12731343
return downloadRange(new BlobRange(initialOffset + offset, newCount), finalRequestConditions,
1274-
eTag, getMD5, context);
1344+
eTag, finalGetMD5, firstRangeContext);
12751345
} catch (Exception e) {
12761346
return Mono.error(e);
12771347
}
12781348
};
12791349

1350+
// Structured message decoding is now handled by StructuredMessageDecoderPolicy
12801351
return BlobDownloadAsyncResponseConstructorProxy.create(response, onDownloadErrorResume, finalOptions);
12811352
});
12821353
}
12831354

1355+
Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, DownloadRetryOptions options,
1356+
BlobRequestConditions requestConditions, boolean getRangeContentMd5, Context context) {
1357+
return downloadStreamWithResponse(range, options, requestConditions, getRangeContentMd5, null, context);
1358+
}
1359+
12841360
private Mono<StreamResponse> downloadRange(BlobRange range, BlobRequestConditions requestConditions, String eTag,
12851361
Boolean getMD5, Context context) {
12861362
return azureBlobStorage.getBlobs()
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.storage.blob;
5+
6+
import com.azure.core.test.utils.TestUtils;
7+
import com.azure.core.util.FluxUtil;
8+
import com.azure.storage.blob.models.BlobRange;
9+
import com.azure.storage.common.DownloadContentValidationOptions;
10+
import com.azure.storage.common.implementation.Constants;
11+
import com.azure.storage.common.implementation.structuredmessage.StructuredMessageEncoder;
12+
import com.azure.storage.common.implementation.structuredmessage.StructuredMessageFlags;
13+
import org.junit.jupiter.api.BeforeEach;
14+
import org.junit.jupiter.api.Test;
15+
import reactor.core.publisher.Flux;
16+
import reactor.test.StepVerifier;
17+
18+
import java.io.IOException;
19+
import java.nio.ByteBuffer;
20+
21+
import static org.junit.jupiter.api.Assertions.assertNotNull;
22+
import static org.junit.jupiter.api.Assertions.assertTrue;
23+
24+
/**
25+
* Tests for structured message decoding during blob downloads using StorageContentValidationDecoderPolicy.
26+
* These tests verify that the pipeline policy correctly decodes structured messages when content validation is enabled.
27+
*/
28+
public class BlobMessageDecoderDownloadTests extends BlobTestBase {
29+
30+
private BlobAsyncClient bc;
31+
32+
@BeforeEach
33+
public void setup() {
34+
String blobName = generateBlobName();
35+
bc = ccAsync.getBlobAsyncClient(blobName);
36+
bc.upload(Flux.just(ByteBuffer.wrap(new byte[0])), null).block();
37+
}
38+
39+
@Test
40+
public void downloadStreamWithResponseContentValidation() throws IOException {
41+
byte[] randomData = getRandomByteArray(Constants.KB);
42+
StructuredMessageEncoder encoder
43+
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
44+
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));
45+
46+
Flux<ByteBuffer> input = Flux.just(encodedData);
47+
48+
DownloadContentValidationOptions validationOptions
49+
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);
50+
51+
StepVerifier
52+
.create(bc.upload(input, null, true)
53+
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
54+
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
55+
.assertNext(r -> TestUtils.assertArraysEqual(r, randomData))
56+
.verifyComplete();
57+
}
58+
59+
@Test
60+
public void downloadStreamWithResponseContentValidationRange() throws IOException {
61+
byte[] randomData = getRandomByteArray(Constants.KB);
62+
StructuredMessageEncoder encoder
63+
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
64+
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));
65+
66+
Flux<ByteBuffer> input = Flux.just(encodedData);
67+
68+
DownloadContentValidationOptions validationOptions
69+
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);
70+
71+
BlobRange range = new BlobRange(0, 512L);
72+
73+
StepVerifier.create(bc.upload(input, null, true)
74+
.then(bc.downloadStreamWithResponse(range, null, null, false, validationOptions))
75+
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(r -> {
76+
assertNotNull(r);
77+
assertTrue(r.length > 0);
78+
}).verifyComplete();
79+
}
80+
81+
@Test
82+
public void downloadStreamWithResponseContentValidationLargeBlob() throws IOException {
83+
// Test with larger data to verify chunking works correctly
84+
byte[] randomData = getRandomByteArray(5 * Constants.KB);
85+
StructuredMessageEncoder encoder
86+
= new StructuredMessageEncoder(randomData.length, 1024, StructuredMessageFlags.STORAGE_CRC64);
87+
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));
88+
89+
Flux<ByteBuffer> input = Flux.just(encodedData);
90+
91+
DownloadContentValidationOptions validationOptions
92+
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);
93+
94+
StepVerifier
95+
.create(bc.upload(input, null, true)
96+
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
97+
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
98+
.assertNext(r -> TestUtils.assertArraysEqual(r, randomData))
99+
.verifyComplete();
100+
}
101+
102+
@Test
103+
public void downloadStreamWithResponseContentValidationMultipleSegments() throws IOException {
104+
// Test with multiple segments to ensure all segments are decoded correctly
105+
byte[] randomData = getRandomByteArray(2 * Constants.KB);
106+
StructuredMessageEncoder encoder
107+
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
108+
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));
109+
110+
Flux<ByteBuffer> input = Flux.just(encodedData);
111+
112+
DownloadContentValidationOptions validationOptions
113+
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);
114+
115+
StepVerifier
116+
.create(bc.upload(input, null, true)
117+
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
118+
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
119+
.assertNext(r -> TestUtils.assertArraysEqual(r, randomData))
120+
.verifyComplete();
121+
}
122+
123+
@Test
124+
public void downloadStreamWithResponseNoValidation() throws IOException {
125+
// Test that download works normally when validation is not enabled
126+
byte[] randomData = getRandomByteArray(Constants.KB);
127+
StructuredMessageEncoder encoder
128+
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
129+
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));
130+
131+
Flux<ByteBuffer> input = Flux.just(encodedData);
132+
133+
// No validation options - should download encoded data as-is
134+
StepVerifier.create(bc.upload(input, null, true)
135+
.then(bc.downloadStreamWithResponse(null, null, null, false))
136+
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(r -> {
137+
assertNotNull(r);
138+
// Should get encoded data, not decoded
139+
assertTrue(r.length > randomData.length); // Encoded data is larger
140+
}).verifyComplete();
141+
}
142+
143+
@Test
144+
public void downloadStreamWithResponseValidationDisabled() throws IOException {
145+
// Test with validation options but validation disabled
146+
byte[] randomData = getRandomByteArray(Constants.KB);
147+
StructuredMessageEncoder encoder
148+
= new StructuredMessageEncoder(randomData.length, 512, StructuredMessageFlags.STORAGE_CRC64);
149+
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));
150+
151+
Flux<ByteBuffer> input = Flux.just(encodedData);
152+
153+
DownloadContentValidationOptions validationOptions
154+
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(false);
155+
156+
StepVerifier.create(bc.upload(input, null, true)
157+
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
158+
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue()))).assertNext(r -> {
159+
assertNotNull(r);
160+
// Should get encoded data, not decoded
161+
assertTrue(r.length > randomData.length); // Encoded data is larger
162+
}).verifyComplete();
163+
}
164+
165+
@Test
166+
public void downloadStreamWithResponseContentValidationSmallSegment() throws IOException {
167+
// Test with small segment size to ensure boundary conditions are handled
168+
byte[] randomData = getRandomByteArray(256);
169+
StructuredMessageEncoder encoder
170+
= new StructuredMessageEncoder(randomData.length, 128, StructuredMessageFlags.STORAGE_CRC64);
171+
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));
172+
173+
Flux<ByteBuffer> input = Flux.just(encodedData);
174+
175+
DownloadContentValidationOptions validationOptions
176+
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);
177+
178+
StepVerifier
179+
.create(bc.upload(input, null, true)
180+
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
181+
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
182+
.assertNext(r -> TestUtils.assertArraysEqual(r, randomData))
183+
.verifyComplete();
184+
}
185+
186+
@Test
187+
public void downloadStreamWithResponseContentValidationVeryLargeBlob() throws IOException {
188+
// Test with very large data to verify chunking and policy work correctly with large blobs
189+
byte[] randomData = getRandomByteArray(10 * Constants.KB);
190+
StructuredMessageEncoder encoder
191+
= new StructuredMessageEncoder(randomData.length, 2048, StructuredMessageFlags.STORAGE_CRC64);
192+
ByteBuffer encodedData = encoder.encode(ByteBuffer.wrap(randomData));
193+
194+
Flux<ByteBuffer> input = Flux.just(encodedData);
195+
196+
DownloadContentValidationOptions validationOptions
197+
= new DownloadContentValidationOptions().setStructuredMessageValidationEnabled(true);
198+
199+
StepVerifier
200+
.create(bc.upload(input, null, true)
201+
.then(bc.downloadStreamWithResponse(null, null, null, false, validationOptions))
202+
.flatMap(r -> FluxUtil.collectBytesInByteBufferStream(r.getValue())))
203+
.assertNext(r -> TestUtils.assertArraysEqual(r, randomData))
204+
.verifyComplete();
205+
}
206+
}

0 commit comments

Comments
 (0)