Skip to content

Commit 46c2fb2

Browse files
Add foundation for smart retry support - stateful decoder and context passing
Created StatefulStructuredMessageDecoder class and updated policy to use it. Added STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY constant for passing decoder state across retries. Note: Full smart retry implementation requires refactoring StructuredMessageDecoder to support streaming/incremental validation instead of collect-then-decode pattern. Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 49b0a17 commit 46c2fb2

File tree

3 files changed

+163
-2
lines changed

3 files changed

+163
-2
lines changed

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/implementation/Constants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ public final class Constants {
105105
public static final String STRUCTURED_MESSAGE_VALIDATION_OPTIONS_CONTEXT_KEY =
106106
"azure-storage-structured-message-validation-options";
107107

108+
/**
109+
* Context key used to pass stateful decoder state across retry requests.
110+
*/
111+
public static final String STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY =
112+
"azure-storage-structured-message-decoder-state";
113+
108114
private Constants() {
109115
}
110116

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.storage.common.implementation.structuredmessage;
5+
6+
import com.azure.core.util.logging.ClientLogger;
7+
import com.azure.storage.common.implementation.BufferStagingArea;
8+
import reactor.core.publisher.Flux;
9+
import reactor.core.publisher.Mono;
10+
11+
import java.nio.ByteBuffer;
12+
import java.util.concurrent.atomic.AtomicLong;
13+
14+
/**
15+
* Stateful decoder for structured messages that supports mid-stream retries.
16+
* This decoder maintains state across network interruptions to ensure all data
17+
* is validated before retrying from the point of failure.
18+
*/
19+
public class StatefulStructuredMessageDecoder {
20+
private static final ClientLogger LOGGER = new ClientLogger(StatefulStructuredMessageDecoder.class);
21+
22+
private final long expectedContentLength;
23+
private final StructuredMessageDecoder decoder;
24+
private final AtomicLong totalBytesDecoded;
25+
private final AtomicLong totalEncodedBytesProcessed;
26+
private ByteBuffer pendingData;
27+
private boolean finalized;
28+
29+
/**
30+
* Creates a new stateful structured message decoder.
31+
*
32+
* @param expectedContentLength The expected length of the encoded content.
33+
*/
34+
public StatefulStructuredMessageDecoder(long expectedContentLength) {
35+
this.expectedContentLength = expectedContentLength;
36+
this.decoder = new StructuredMessageDecoder(expectedContentLength);
37+
this.totalBytesDecoded = new AtomicLong(0);
38+
this.totalEncodedBytesProcessed = new AtomicLong(0);
39+
this.pendingData = null;
40+
this.finalized = false;
41+
}
42+
43+
/**
44+
* Decodes a flux of byte buffers representing encoded structured message data.
45+
*
46+
* @param encodedFlux The flux of encoded byte buffers.
47+
* @return A flux of decoded byte buffers.
48+
*/
49+
public Flux<ByteBuffer> decode(Flux<ByteBuffer> encodedFlux) {
50+
if (finalized) {
51+
return Flux.error(new IllegalStateException("Decoder has already been finalized"));
52+
}
53+
54+
// Collect all data first (structured message needs complete data to decode)
55+
return encodedFlux
56+
.collect(() -> new EncodedDataCollector(), EncodedDataCollector::addBuffer)
57+
.flatMapMany(collector -> {
58+
try {
59+
ByteBuffer allEncodedData = collector.getAllData();
60+
61+
if (allEncodedData.remaining() == 0) {
62+
return Flux.empty();
63+
}
64+
65+
// Update total encoded bytes processed
66+
totalEncodedBytesProcessed.addAndGet(allEncodedData.remaining());
67+
68+
// Decode the complete message
69+
ByteBuffer decodedData = decoder.decode(allEncodedData);
70+
71+
// Update total bytes decoded
72+
totalBytesDecoded.addAndGet(decodedData.remaining());
73+
74+
// Finalize decoding
75+
decoder.finalizeDecoding();
76+
finalized = true;
77+
78+
return Flux.just(decodedData);
79+
} catch (Exception e) {
80+
LOGGER.error("Failed to decode structured message: " + e.getMessage(), e);
81+
return Flux.error(e);
82+
}
83+
});
84+
}
85+
86+
/**
87+
* Gets the total number of decoded bytes processed so far.
88+
*
89+
* @return The total decoded bytes.
90+
*/
91+
public long getTotalBytesDecoded() {
92+
return totalBytesDecoded.get();
93+
}
94+
95+
/**
96+
* Gets the total number of encoded bytes processed so far.
97+
*
98+
* @return The total encoded bytes processed.
99+
*/
100+
public long getTotalEncodedBytesProcessed() {
101+
return totalEncodedBytesProcessed.get();
102+
}
103+
104+
/**
105+
* Checks if the decoder has been finalized.
106+
*
107+
* @return true if finalized, false otherwise.
108+
*/
109+
public boolean isFinalized() {
110+
return finalized;
111+
}
112+
113+
/**
114+
* Helper class to collect encoded data buffers.
115+
*/
116+
private static class EncodedDataCollector {
117+
private ByteBuffer accumulatedBuffer;
118+
119+
EncodedDataCollector() {
120+
this.accumulatedBuffer = ByteBuffer.allocate(0);
121+
}
122+
123+
void addBuffer(ByteBuffer buffer) {
124+
// Accumulate the buffer
125+
ByteBuffer newBuffer = ByteBuffer.allocate(accumulatedBuffer.remaining() + buffer.remaining());
126+
newBuffer.put(accumulatedBuffer);
127+
newBuffer.put(buffer);
128+
newBuffer.flip();
129+
accumulatedBuffer = newBuffer;
130+
}
131+
132+
ByteBuffer getAllData() {
133+
return accumulatedBuffer;
134+
}
135+
}
136+
}

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import com.azure.storage.common.DownloadContentValidationOptions;
1616
import com.azure.storage.common.implementation.Constants;
1717
import com.azure.storage.common.implementation.structuredmessage.StructuredMessageDecodingStream;
18+
import com.azure.storage.common.implementation.structuredmessage.StatefulStructuredMessageDecoder;
1819
import reactor.core.publisher.Flux;
1920
import reactor.core.publisher.Mono;
2021

@@ -51,8 +52,12 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
5152
Long contentLength = getContentLength(httpResponse.getHeaders());
5253

5354
if (contentLength != null && contentLength > 0 && validationOptions != null) {
54-
Flux<ByteBuffer> decodedStream = StructuredMessageDecodingStream.wrapStreamIfNeeded(
55-
httpResponse.getBody(), contentLength, validationOptions);
55+
// Get or create stateful decoder
56+
StatefulStructuredMessageDecoder decoder = getOrCreateDecoder(context, contentLength);
57+
58+
// Decode using the stateful decoder
59+
Flux<ByteBuffer> decodedStream = decoder.decode(httpResponse.getBody());
60+
5661
return new DecodedResponse(httpResponse, decodedStream);
5762
}
5863

@@ -103,6 +108,20 @@ private Long getContentLength(HttpHeaders headers) {
103108
return null;
104109
}
105110

111+
/**
112+
* Gets or creates a stateful decoder from context.
113+
*
114+
* @param context The pipeline call context.
115+
* @param contentLength The content length.
116+
* @return The stateful decoder.
117+
*/
118+
private StatefulStructuredMessageDecoder getOrCreateDecoder(HttpPipelineCallContext context, Long contentLength) {
119+
return context.getData(Constants.STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY)
120+
.filter(value -> value instanceof StatefulStructuredMessageDecoder)
121+
.map(value -> (StatefulStructuredMessageDecoder) value)
122+
.orElseGet(() -> new StatefulStructuredMessageDecoder(contentLength));
123+
}
124+
106125
/**
107126
* Checks if the response is a download response (GET request with body).
108127
*

0 commit comments

Comments
 (0)