Skip to content

Commit a8fe5f1

Browse files
Add StructuredMessageDecoderPolicy and integrate with downloadStreamWithResponse
Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 73b20d7 commit a8fe5f1

File tree

3 files changed

+192
-27
lines changed

3 files changed

+192
-27
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/implementation/util/BuilderHelper.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ public static HttpPipeline buildPipeline(StorageSharedKeyCredential storageShare
140140

141141
HttpPolicyProviders.addAfterRetryPolicies(policies);
142142

143+
// Add structured message decoder policy to handle structured message decoding
144+
policies.add(new StructuredMessageDecoderPolicy());
145+
143146
policies.add(getResponseValidationPolicy());
144147

145148
policies.add(new HttpLoggingPolicy(logOptions));
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.storage.blob.implementation.util;
5+
6+
import com.azure.core.http.HttpHeaderName;
7+
import com.azure.core.http.HttpHeaders;
8+
import com.azure.core.http.HttpMethod;
9+
import com.azure.core.http.HttpPipelineCallContext;
10+
import com.azure.core.http.HttpPipelineNextPolicy;
11+
import com.azure.core.http.HttpResponse;
12+
import com.azure.core.http.policy.HttpPipelinePolicy;
13+
import com.azure.core.util.FluxUtil;
14+
import com.azure.core.util.logging.ClientLogger;
15+
import com.azure.storage.common.DownloadContentValidationOptions;
16+
import com.azure.storage.common.implementation.structuredmessage.StructuredMessageDecodingStream;
17+
import reactor.core.publisher.Flux;
18+
import reactor.core.publisher.Mono;
19+
20+
import java.nio.ByteBuffer;
21+
import java.nio.charset.Charset;
22+
23+
/**
24+
* This is a decoding policy in an {@link com.azure.core.http.HttpPipeline} to decode structured messages in blob
25+
* download requests. The policy checks for a context value to determine when to apply structured message decoding.
26+
*/
27+
public class StructuredMessageDecoderPolicy implements HttpPipelinePolicy {
28+
private static final ClientLogger LOGGER = new ClientLogger(StructuredMessageDecoderPolicy.class);
29+
30+
/**
31+
* Context key used to signal that structured message decoding should be applied.
32+
*/
33+
public static final String STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY = "azure-storage-structured-message-decoding";
34+
35+
/**
36+
* Context key used to pass DownloadContentValidationOptions to the policy.
37+
*/
38+
public static final String STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY =
39+
"azure-storage-structured-message-validation-options";
40+
41+
/**
42+
* Creates a new instance of {@link StructuredMessageDecoderPolicy}.
43+
*/
44+
public StructuredMessageDecoderPolicy() {
45+
}
46+
47+
@Override
48+
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
49+
// Check if structured message decoding is enabled for this request
50+
if (!shouldApplyDecoding(context)) {
51+
return next.process();
52+
}
53+
54+
return next.process().map(httpResponse -> {
55+
// Only apply decoding to download responses (GET requests with body)
56+
if (isDownloadResponse(httpResponse)) {
57+
DownloadContentValidationOptions validationOptions = getValidationOptions(context);
58+
Long contentLength = getContentLength(httpResponse.getHeaders());
59+
60+
if (contentLength != null && contentLength > 0 && validationOptions != null) {
61+
Flux<ByteBuffer> decodedStream = StructuredMessageDecodingStream.wrapStreamIfNeeded(
62+
httpResponse.getBody(), contentLength, validationOptions);
63+
64+
return new DecodedResponse(httpResponse, decodedStream);
65+
}
66+
}
67+
return httpResponse;
68+
});
69+
}
70+
71+
/**
72+
* Checks if structured message decoding should be applied based on context.
73+
*
74+
* @param context The pipeline call context.
75+
* @return true if decoding should be applied, false otherwise.
76+
*/
77+
private boolean shouldApplyDecoding(HttpPipelineCallContext context) {
78+
return context.getData(STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY)
79+
.map(value -> value instanceof Boolean && (Boolean) value)
80+
.orElse(false);
81+
}
82+
83+
/**
84+
* Gets the validation options from context.
85+
*
86+
* @param context The pipeline call context.
87+
* @return The validation options or null if not present.
88+
*/
89+
private DownloadContentValidationOptions getValidationOptions(HttpPipelineCallContext context) {
90+
return context.getData(STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY)
91+
.filter(value -> value instanceof DownloadContentValidationOptions)
92+
.map(value -> (DownloadContentValidationOptions) value)
93+
.orElse(null);
94+
}
95+
96+
/**
97+
* Gets the content length from response headers.
98+
*
99+
* @param headers The response headers.
100+
* @return The content length or null if not present.
101+
*/
102+
private Long getContentLength(HttpHeaders headers) {
103+
String contentLengthStr = headers.getValue(HttpHeaderName.CONTENT_LENGTH);
104+
if (contentLengthStr != null) {
105+
try {
106+
return Long.parseLong(contentLengthStr);
107+
} catch (NumberFormatException e) {
108+
LOGGER.warning("Invalid content length in response headers: " + contentLengthStr);
109+
}
110+
}
111+
return null;
112+
}
113+
114+
/**
115+
* Checks if the response is a download response (GET request with body).
116+
*
117+
* @param httpResponse The HTTP response.
118+
* @return true if it's a download response, false otherwise.
119+
*/
120+
private boolean isDownloadResponse(HttpResponse httpResponse) {
121+
return httpResponse.getRequest().getHttpMethod() == HttpMethod.GET && httpResponse.getBody() != null;
122+
}
123+
124+
/**
125+
* HTTP response wrapper that provides a decoded response body.
126+
*/
127+
static class DecodedResponse extends HttpResponse {
128+
private final Flux<ByteBuffer> decodedBody;
129+
private final HttpHeaders httpHeaders;
130+
private final int statusCode;
131+
132+
DecodedResponse(HttpResponse httpResponse, Flux<ByteBuffer> decodedBody) {
133+
super(httpResponse.getRequest());
134+
this.decodedBody = decodedBody;
135+
this.httpHeaders = httpResponse.getHeaders();
136+
this.statusCode = httpResponse.getStatusCode();
137+
}
138+
139+
@Override
140+
public int getStatusCode() {
141+
return statusCode;
142+
}
143+
144+
@Override
145+
public String getHeaderValue(String name) {
146+
return httpHeaders.getValue(name);
147+
}
148+
149+
@Override
150+
public HttpHeaders getHeaders() {
151+
return httpHeaders;
152+
}
153+
154+
@Override
155+
public Flux<ByteBuffer> getBody() {
156+
return decodedBody;
157+
}
158+
159+
@Override
160+
public Mono<byte[]> getBodyAsByteArray() {
161+
return FluxUtil.collectBytesInByteBufferStream(decodedBody);
162+
}
163+
164+
@Override
165+
public Mono<String> getBodyAsString() {
166+
return FluxUtil.collectBytesInByteBufferStream(decodedBody).map(String::new);
167+
}
168+
169+
@Override
170+
public Mono<String> getBodyAsString(Charset charset) {
171+
return FluxUtil.collectBytesInByteBufferStream(decodedBody).map(b -> new String(b, charset));
172+
}
173+
}
174+
}

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.azure.storage.blob.implementation.util.BlobSasImplUtil;
4747
import com.azure.storage.blob.implementation.util.ChunkedDownloadUtils;
4848
import com.azure.storage.blob.implementation.util.ModelHelper;
49+
import com.azure.storage.blob.implementation.util.StructuredMessageDecoderPolicy;
4950
import com.azure.storage.blob.models.AccessTier;
5051
import com.azure.storage.blob.models.BlobBeginCopySourceRequestConditions;
5152
import com.azure.storage.blob.models.BlobCopyInfo;
@@ -83,7 +84,6 @@
8384
import com.azure.storage.common.Utility;
8485
import com.azure.storage.common.implementation.SasImplUtils;
8586
import com.azure.storage.common.implementation.StorageImplUtils;
86-
import com.azure.storage.common.implementation.structuredmessage.StructuredMessageDecodingStream;
8787
import com.azure.storage.common.DownloadContentValidationOptions;
8888
import reactor.core.publisher.Flux;
8989
import reactor.core.publisher.Mono;
@@ -1337,6 +1337,16 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13371337
? new Context("azure-eagerly-convert-headers", true)
13381338
: context.addData("azure-eagerly-convert-headers", true);
13391339

1340+
// Add structured message decoding context if enabled
1341+
if (contentValidationOptions != null
1342+
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
1343+
firstRangeContext = firstRangeContext.addData(
1344+
StructuredMessageDecoderPolicy.STRUCTURED_MESSAGE_DECODING_CONTEXT_KEY, true);
1345+
firstRangeContext = firstRangeContext.addData(
1346+
StructuredMessageDecoderPolicy.STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY,
1347+
contentValidationOptions);
1348+
}
1349+
13401350
return downloadRange(finalRange, finalRequestConditions, finalRequestConditions.getIfMatch(), finalGetMD5,
13411351
firstRangeContext).map(response -> {
13421352
BlobsDownloadHeaders blobsDownloadHeaders = new BlobsDownloadHeaders(response.getHeaders());
@@ -1357,16 +1367,6 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13571367
finalCount = finalRange.getCount();
13581368
}
13591369

1360-
// Apply structured message decoding if enabled - this allows both MD5 and structured message to coexist
1361-
Flux<ByteBuffer> processedStream = response.getValue();
1362-
if (contentValidationOptions != null
1363-
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
1364-
// Use the content length from headers to determine expected length for structured message decoding
1365-
Long contentLength = blobDownloadHeaders.getContentLength();
1366-
processedStream = StructuredMessageDecodingStream.wrapStreamIfNeeded(response.getValue(),
1367-
contentLength, contentValidationOptions);
1368-
}
1369-
13701370
// The resume function takes throwable and offset at the destination.
13711371
// I.e. offset is relative to the starting point.
13721372
BiFunction<Throwable, Long, Mono<StreamResponse>> onDownloadErrorResume = (throwable, offset) -> {
@@ -1391,27 +1391,15 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13911391

13921392
try {
13931393
return downloadRange(new BlobRange(initialOffset + offset, newCount), finalRequestConditions,
1394-
eTag, finalGetMD5, context);
1394+
eTag, finalGetMD5, firstRangeContext);
13951395
} catch (Exception e) {
13961396
return Mono.error(e);
13971397
}
13981398
};
13991399

1400-
// If structured message decoding was applied, we need to create a new StreamResponse with the processed stream
1401-
if (contentValidationOptions != null
1402-
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
1403-
// Create a new StreamResponse using the deprecated but available constructor
1404-
@SuppressWarnings("deprecation")
1405-
StreamResponse processedResponse = new StreamResponse(response.getRequest(),
1406-
response.getStatusCode(), response.getHeaders(), processedStream);
1407-
1408-
return BlobDownloadAsyncResponseConstructorProxy.create(processedResponse, onDownloadErrorResume,
1409-
finalOptions);
1410-
} else {
1411-
// No structured message processing needed, use original response
1412-
return BlobDownloadAsyncResponseConstructorProxy.create(response, onDownloadErrorResume,
1413-
finalOptions);
1414-
}
1400+
// Structured message decoding is now handled by StructuredMessageDecoderPolicy
1401+
return BlobDownloadAsyncResponseConstructorProxy.create(response, onDownloadErrorResume,
1402+
finalOptions);
14151403
});
14161404
}
14171405

0 commit comments

Comments
 (0)