Skip to content

Commit a9abd2e

Browse files
smart retry changes
1 parent 5597101 commit a9abd2e

File tree

4 files changed

+224
-14
lines changed

4 files changed

+224
-14
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1340,8 +1340,26 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
13401340
}
13411341

13421342
try {
1343+
// For retry context, preserve decoder state if structured message validation is enabled
1344+
Context retryContext = firstRangeContext;
1345+
1346+
// If structured message decoding is enabled, we need to include the decoder state
1347+
// so the retry can continue from where we left off
1348+
if (contentValidationOptions != null
1349+
&& contentValidationOptions.isStructuredMessageValidationEnabled()) {
1350+
// The decoder state will be set by the policy during processing
1351+
// We preserve it in the context for the retry request
1352+
Object decoderState
1353+
= firstRangeContext.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY)
1354+
.orElse(null);
1355+
if (decoderState != null) {
1356+
retryContext = retryContext
1357+
.addData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY, decoderState);
1358+
}
1359+
}
1360+
13431361
return downloadRange(new BlobRange(initialOffset + offset, newCount), finalRequestConditions,
1344-
eTag, finalGetMD5, firstRangeContext);
1362+
eTag, finalGetMD5, retryContext);
13451363
} catch (Exception e) {
13461364
return Mono.error(e);
13471365
}

sdk/storage/azure-storage-blob/src/test/java/com/azure/storage/blob/BlobMessageDecoderDownloadTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import com.azure.core.test.utils.TestUtils;
77
import com.azure.core.util.FluxUtil;
88
import com.azure.storage.blob.models.BlobRange;
9+
import com.azure.storage.blob.models.BlobRequestConditions;
10+
import com.azure.storage.blob.models.DownloadRetryOptions;
911
import com.azure.storage.common.DownloadContentValidationOptions;
1012
import com.azure.storage.common.implementation.Constants;
1113
import com.azure.storage.common.implementation.structuredmessage.StructuredMessageEncoder;
@@ -18,6 +20,7 @@
1820
import java.io.IOException;
1921
import java.nio.ByteBuffer;
2022

23+
import static org.junit.jupiter.api.Assertions.assertEquals;
2124
import static org.junit.jupiter.api.Assertions.assertNotNull;
2225
import static org.junit.jupiter.api.Assertions.assertTrue;
2326

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/Constants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ public final class Constants {
105105
public static final String STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY
106106
= "azure-storage-structured-message-validation-options";
107107

108+
/**
109+
* Context key used to pass stateful decoder state across retry requests.
110+
*/
111+
public static final String STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY
112+
= "azure-storage-structured-message-decoder-state";
113+
108114
private Constants() {
109115
}
110116

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 196 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,25 @@
1414
import com.azure.core.util.logging.ClientLogger;
1515
import com.azure.storage.common.DownloadContentValidationOptions;
1616
import com.azure.storage.common.implementation.Constants;
17-
import com.azure.storage.common.implementation.structuredmessage.StructuredMessageDecodingStream;
17+
import com.azure.storage.common.implementation.structuredmessage.StructuredMessageDecoder;
1818
import reactor.core.publisher.Flux;
1919
import reactor.core.publisher.Mono;
2020

2121
import java.nio.ByteBuffer;
2222
import java.nio.charset.Charset;
23+
import java.util.concurrent.atomic.AtomicLong;
2324

2425
/**
2526
* This is a decoding policy in an {@link com.azure.core.http.HttpPipeline} to decode structured messages in
2627
* storage download requests. The policy checks for a context value to determine when to apply structured message decoding.
28+
*
29+
* <p>The policy supports smart retries by maintaining decoder state across network interruptions, ensuring:
30+
* <ul>
31+
* <li>All received segment checksums are validated before retry</li>
32+
* <li>Exact encoded and decoded byte positions are tracked</li>
33+
* <li>Decoder state is preserved across retry requests</li>
34+
* <li>Retries continue from the correct offset after network faults</li>
35+
* </ul>
2736
*/
2837
public class StorageContentValidationDecoderPolicy implements HttpPipelinePolicy {
2938
private static final ClientLogger LOGGER = new ClientLogger(StorageContentValidationDecoderPolicy.class);
@@ -51,15 +60,75 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
5160
Long contentLength = getContentLength(httpResponse.getHeaders());
5261

5362
if (contentLength != null && contentLength > 0 && validationOptions != null) {
54-
Flux<ByteBuffer> decodedStream = StructuredMessageDecodingStream
55-
.wrapStreamIfNeeded(httpResponse.getBody(), contentLength, validationOptions);
56-
return new DecodedResponse(httpResponse, decodedStream);
63+
// Get or create decoder with state tracking
64+
DecoderState decoderState = getOrCreateDecoderState(context, contentLength);
65+
66+
// Decode using the stateful decoder
67+
Flux<ByteBuffer> decodedStream = decodeStream(httpResponse.getBody(), decoderState);
68+
69+
// Update context with decoder state for potential retries
70+
context.setData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY, decoderState);
71+
72+
return new DecodedResponse(httpResponse, decodedStream, decoderState);
5773
}
5874

5975
return httpResponse;
6076
});
6177
}
6278

79+
/**
80+
* Decodes a stream of byte buffers using the decoder state.
81+
*
82+
* @param encodedFlux The flux of encoded byte buffers.
83+
* @param state The decoder state.
84+
* @return A flux of decoded byte buffers.
85+
*/
86+
private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState state) {
87+
return encodedFlux.concatMap(encodedBuffer -> {
88+
try {
89+
// Combine with pending data if any
90+
ByteBuffer dataToProcess = state.combineWithPending(encodedBuffer);
91+
92+
// Track encoded bytes
93+
int encodedBytesInBuffer = encodedBuffer.remaining();
94+
state.totalEncodedBytesProcessed.addAndGet(encodedBytesInBuffer);
95+
96+
// Try to decode what we have - decoder handles partial data
97+
int availableSize = dataToProcess.remaining();
98+
ByteBuffer decodedData = state.decoder.decode(dataToProcess.duplicate(), availableSize);
99+
100+
// Track decoded bytes
101+
int decodedBytes = decodedData.remaining();
102+
state.totalBytesDecoded.addAndGet(decodedBytes);
103+
104+
// Store any remaining unprocessed data for next iteration
105+
if (dataToProcess.hasRemaining()) {
106+
state.updatePendingBuffer(dataToProcess);
107+
} else {
108+
state.pendingBuffer = null;
109+
}
110+
111+
// Return decoded data if any
112+
if (decodedBytes > 0) {
113+
return Flux.just(decodedData);
114+
} else {
115+
return Flux.empty();
116+
}
117+
} catch (Exception e) {
118+
LOGGER.error("Failed to decode structured message chunk: " + e.getMessage(), e);
119+
return Flux.error(e);
120+
}
121+
}).doOnComplete(() -> {
122+
// Finalize when stream completes
123+
try {
124+
state.decoder.finalizeDecoding();
125+
} catch (IllegalArgumentException e) {
126+
// Expected if we haven't received all data yet (e.g., interrupted download)
127+
LOGGER.verbose("Decoding not finalized - may resume on retry: " + e.getMessage());
128+
}
129+
});
130+
}
131+
63132
/**
64133
* Checks if structured message decoding should be applied based on context.
65134
*
@@ -104,26 +173,131 @@ private Long getContentLength(HttpHeaders headers) {
104173
}
105174

106175
/**
107-
* Checks if the response is a download response (GET request with body).
176+
* Gets or creates a decoder state from context.
177+
*
178+
* @param context The pipeline call context.
179+
* @param contentLength The content length.
180+
* @return The decoder state.
181+
*/
182+
private DecoderState getOrCreateDecoderState(HttpPipelineCallContext context, long contentLength) {
183+
return context.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY)
184+
.filter(value -> value instanceof DecoderState)
185+
.map(value -> (DecoderState) value)
186+
.orElseGet(() -> new DecoderState(contentLength));
187+
}
188+
189+
/**
190+
* Checks if the response is a download response.
108191
*
109192
* @param httpResponse The HTTP response.
110193
* @return true if it's a download response, false otherwise.
111194
*/
112195
private boolean isDownloadResponse(HttpResponse httpResponse) {
113-
return httpResponse.getRequest().getHttpMethod() == HttpMethod.GET && httpResponse.getBody() != null;
196+
HttpMethod method = httpResponse.getRequest().getHttpMethod();
197+
return method == HttpMethod.GET && httpResponse.getStatusCode() / 100 == 2;
114198
}
115199

116200
/**
117-
* HTTP response wrapper that provides a decoded response body.
201+
* State holder for the structured message decoder that tracks decoding progress
202+
* across network interruptions.
118203
*/
119-
static class DecodedResponse extends HttpResponse {
120-
private final Flux<ByteBuffer> decodedBody;
204+
public static class DecoderState {
205+
private final StructuredMessageDecoder decoder;
206+
private final long expectedContentLength;
207+
private final AtomicLong totalBytesDecoded;
208+
private final AtomicLong totalEncodedBytesProcessed;
209+
private ByteBuffer pendingBuffer;
210+
211+
/**
212+
* Creates a new decoder state.
213+
*
214+
* @param expectedContentLength The expected length of the encoded content.
215+
*/
216+
public DecoderState(long expectedContentLength) {
217+
this.expectedContentLength = expectedContentLength;
218+
this.decoder = new StructuredMessageDecoder(expectedContentLength);
219+
this.totalBytesDecoded = new AtomicLong(0);
220+
this.totalEncodedBytesProcessed = new AtomicLong(0);
221+
this.pendingBuffer = null;
222+
}
223+
224+
/**
225+
* Combines pending buffer with new data.
226+
*
227+
* @param newBuffer The new buffer to combine.
228+
* @return Combined buffer.
229+
*/
230+
private ByteBuffer combineWithPending(ByteBuffer newBuffer) {
231+
if (pendingBuffer == null || !pendingBuffer.hasRemaining()) {
232+
return newBuffer.duplicate();
233+
}
234+
235+
ByteBuffer combined = ByteBuffer.allocate(pendingBuffer.remaining() + newBuffer.remaining());
236+
combined.put(pendingBuffer.duplicate());
237+
combined.put(newBuffer.duplicate());
238+
combined.flip();
239+
return combined;
240+
}
241+
242+
/**
243+
* Updates the pending buffer with remaining data.
244+
*
245+
* @param dataToProcess The buffer with remaining data.
246+
*/
247+
private void updatePendingBuffer(ByteBuffer dataToProcess) {
248+
pendingBuffer = ByteBuffer.allocate(dataToProcess.remaining());
249+
pendingBuffer.put(dataToProcess);
250+
pendingBuffer.flip();
251+
}
252+
253+
/**
254+
* Gets the total number of decoded bytes processed so far.
255+
*
256+
* @return The total decoded bytes.
257+
*/
258+
public long getTotalBytesDecoded() {
259+
return totalBytesDecoded.get();
260+
}
261+
262+
/**
263+
* Gets the total number of encoded bytes processed so far.
264+
*
265+
* @return The total encoded bytes processed.
266+
*/
267+
public long getTotalEncodedBytesProcessed() {
268+
return totalEncodedBytesProcessed.get();
269+
}
270+
271+
/**
272+
* Checks if the decoder has finalized.
273+
*
274+
* @return true if finalized, false otherwise.
275+
*/
276+
public boolean isFinalized() {
277+
return totalEncodedBytesProcessed.get() >= expectedContentLength;
278+
}
279+
}
280+
281+
/**
282+
* Decoded HTTP response that wraps the original response with a decoded stream.
283+
*/
284+
private static class DecodedResponse extends HttpResponse {
121285
private final HttpResponse originalResponse;
286+
private final Flux<ByteBuffer> decodedBody;
287+
private final DecoderState decoderState;
122288

123-
DecodedResponse(HttpResponse httpResponse, Flux<ByteBuffer> decodedBody) {
124-
super(httpResponse.getRequest());
125-
this.originalResponse = httpResponse;
289+
/**
290+
* Creates a new decoded response.
291+
*
292+
* @param originalResponse The original HTTP response.
293+
* @param decodedBody The decoded body stream.
294+
* @param decoderState The decoder state.
295+
*/
296+
DecodedResponse(HttpResponse originalResponse, Flux<ByteBuffer> decodedBody, DecoderState decoderState) {
297+
super(originalResponse.getRequest());
298+
this.originalResponse = originalResponse;
126299
this.decodedBody = decodedBody;
300+
this.decoderState = decoderState;
127301
}
128302

129303
@Override
@@ -153,12 +327,21 @@ public Mono<byte[]> getBodyAsByteArray() {
153327

154328
@Override
155329
public Mono<String> getBodyAsString() {
156-
return getBodyAsByteArray().map(String::new);
330+
return getBodyAsByteArray().map(bytes -> new String(bytes, Charset.defaultCharset()));
157331
}
158332

159333
@Override
160334
public Mono<String> getBodyAsString(Charset charset) {
161335
return getBodyAsByteArray().map(bytes -> new String(bytes, charset));
162336
}
337+
338+
/**
339+
* Gets the decoder state.
340+
*
341+
* @return The decoder state.
342+
*/
343+
public DecoderState getDecoderState() {
344+
return decoderState;
345+
}
163346
}
164347
}

0 commit comments

Comments
 (0)