Skip to content

Commit 431a5b3

Browse files
Implement smart retry with proper duplicate byte skipping for structured messages
Enhanced StorageContentValidationDecoderPolicy to properly handle retries from arbitrary mid-segment offsets by detecting and skipping duplicate bytes. Changes to StorageContentValidationDecoderPolicy: - Added getResponseStartOffset() to parse Content-Range header and determine retry offset - Enhanced DecoderState to track currentResponseStartOffset and currentResponseBytesRead - Modified decodeStream() to calculate absolute buffer offsets and skip duplicate bytes - When a retry occurs, the policy now: 1. Detects the starting offset from Content-Range header 2. Calculates which bytes in each buffer are duplicates 3. Skips duplicate bytes before feeding to decoder 4. Only processes new bytes, maintaining continuous decoder state This implementation allows the decoder to receive a continuous stream of non-duplicate encoded bytes, even when retries occur from arbitrary offsets. The decoder doesn't need to be aware of retries - the policy handles all offset calculations and duplicate filtering. Compatible with range downloads and works for arbitrary mid-segment offset resumption. Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 85e81f6 commit 431a5b3

File tree

1 file changed

+72
-5
lines changed

1 file changed

+72
-5
lines changed

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
6363
// Get or create decoder with state tracking
6464
DecoderState decoderState = getOrCreateDecoderState(context, contentLength);
6565

66+
// Determine the starting offset of this response (for retry handling)
67+
long responseStartOffset = getResponseStartOffset(httpResponse);
68+
decoderState.setCurrentResponseStartOffset(responseStartOffset);
69+
6670
// Decode using the stateful decoder
6771
Flux<ByteBuffer> decodedStream = decodeStream(httpResponse.getBody(), decoderState);
6872

@@ -86,15 +90,42 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
8690
private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState state) {
8791
return encodedFlux.concatMap(encodedBuffer -> {
8892
try {
89-
// Combine with pending data if any
90-
ByteBuffer dataToProcess = state.combineWithPending(encodedBuffer);
93+
// Calculate the absolute offset of this buffer in the encoded stream
94+
long bufferAbsoluteOffset = state.currentResponseStartOffset + state.currentResponseBytesRead;
95+
int bufferSize = encodedBuffer.remaining();
96+
state.currentResponseBytesRead += bufferSize;
97+
98+
// Determine how many bytes to skip (if this is overlapping with already-processed data)
99+
long bytesToSkip = state.totalEncodedBytesProcessed.get() - bufferAbsoluteOffset;
100+
101+
ByteBuffer bufferToProcess;
102+
if (bytesToSkip > 0) {
103+
// Skip bytes that have already been processed
104+
if (bytesToSkip >= bufferSize) {
105+
// Entire buffer is duplicate, skip it
106+
return Flux.empty();
107+
}
108+
// Skip partial buffer
109+
ByteBuffer skippedBuffer = encodedBuffer.duplicate();
110+
skippedBuffer.position(skippedBuffer.position() + (int) bytesToSkip);
111+
bufferToProcess = skippedBuffer;
112+
// Only count the new bytes
113+
state.totalEncodedBytesProcessed.addAndGet(bufferSize - (int) bytesToSkip);
114+
} else {
115+
// All bytes are new
116+
bufferToProcess = encodedBuffer;
117+
state.totalEncodedBytesProcessed.addAndGet(bufferSize);
118+
}
91119

92-
// Track encoded bytes
93-
int encodedBytesInBuffer = encodedBuffer.remaining();
94-
state.totalEncodedBytesProcessed.addAndGet(encodedBytesInBuffer);
120+
// Combine with pending data if any
121+
ByteBuffer dataToProcess = state.combineWithPending(bufferToProcess);
95122

96123
// Try to decode what we have - decoder handles partial data
97124
int availableSize = dataToProcess.remaining();
125+
if (availableSize == 0) {
126+
return Flux.empty();
127+
}
128+
98129
ByteBuffer decodedData = state.decoder.decode(dataToProcess.duplicate(), availableSize);
99130

100131
// Track decoded bytes
@@ -186,6 +217,30 @@ private DecoderState getOrCreateDecoderState(HttpPipelineCallContext context, lo
186217
.orElseGet(() -> new DecoderState(contentLength));
187218
}
188219

220+
/**
221+
* Gets the starting offset of the response by parsing the Content-Range header.
222+
*
223+
* @param httpResponse The HTTP response.
224+
* @return The starting offset, or 0 if not a range response.
225+
*/
226+
private long getResponseStartOffset(HttpResponse httpResponse) {
227+
String contentRange = httpResponse.getHeaderValue(HttpHeaderName.CONTENT_RANGE);
228+
if (contentRange != null && contentRange.startsWith("bytes ")) {
229+
try {
230+
// Format: "bytes start-end/total" or "bytes start-end/*"
231+
String range = contentRange.substring(6); // Skip "bytes "
232+
int dashIndex = range.indexOf('-');
233+
if (dashIndex > 0) {
234+
String startStr = range.substring(0, dashIndex);
235+
return Long.parseLong(startStr);
236+
}
237+
} catch (Exception e) {
238+
LOGGER.warning("Failed to parse Content-Range header: " + contentRange, e);
239+
}
240+
}
241+
return 0;
242+
}
243+
189244
/**
190245
* Checks if the response is a download response.
191246
*
@@ -207,6 +262,8 @@ public static class DecoderState {
207262
private final AtomicLong totalBytesDecoded;
208263
private final AtomicLong totalEncodedBytesProcessed;
209264
private ByteBuffer pendingBuffer;
265+
private long currentResponseStartOffset = 0;
266+
private long currentResponseBytesRead = 0;
210267

211268
/**
212269
* Creates a new decoder state.
@@ -221,6 +278,16 @@ public DecoderState(long expectedContentLength) {
221278
this.pendingBuffer = null;
222279
}
223280

281+
/**
282+
* Sets the starting offset of the current response (for retry handling).
283+
*
284+
* @param offset The starting offset.
285+
*/
286+
public void setCurrentResponseStartOffset(long offset) {
287+
this.currentResponseStartOffset = offset;
288+
this.currentResponseBytesRead = 0;
289+
}
290+
224291
/**
225292
* Combines pending buffer with new data.
226293
*

0 commit comments

Comments
 (0)