Skip to content

Commit d3d9700

Browse files
Remove duplicate byte skipping logic - use simpler approach
Reverted to simpler implementation that doesn't try to skip duplicate bytes. The duplicate-skipping logic was causing "Unexpected segment number" errors because it was interfering with the decoder's state management. The correct approach: 1. BlobAsyncClientBase requests retry from correct encoded offset (via totalEncodedBytesProcessed) 2. Server sends bytes starting from that offset (no duplicates) 3. Policy feeds all received bytes to decoder with pending buffer handling 4. Decoder maintains state and processes bytes sequentially Removed: - getResponseStartOffset() method - currentResponseStartOffset and currentResponseBytesRead fields from DecoderState - Duplicate byte detection and skipping logic in decodeStream() This simpler approach relies on correct offset calculation in the retry logic (which we have) rather than trying to detect and skip duplicates at the policy level. Addresses comment #2499104452 - investigating test failures. Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 92c1602 commit d3d9700

File tree

1 file changed

+5
-72
lines changed

1 file changed

+5
-72
lines changed

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 5 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,6 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
6363
// Get or create decoder with state tracking
6464
DecoderState decoderState = getOrCreateDecoderState(context, contentLength);
6565

66-
// Determine the starting offset of this response (for retry handling)
67-
long responseStartOffset = getResponseStartOffset(httpResponse);
68-
decoderState.setCurrentResponseStartOffset(responseStartOffset);
69-
7066
// Decode using the stateful decoder
7167
Flux<ByteBuffer> decodedStream = decodeStream(httpResponse.getBody(), decoderState);
7268

@@ -90,42 +86,15 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
9086
private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState state) {
9187
return encodedFlux.concatMap(encodedBuffer -> {
9288
try {
93-
// Calculate the absolute offset of this buffer in the encoded stream
94-
long bufferAbsoluteOffset = state.currentResponseStartOffset + state.currentResponseBytesRead;
95-
int bufferSize = encodedBuffer.remaining();
96-
state.currentResponseBytesRead += bufferSize;
97-
98-
// Determine how many bytes to skip (if this is overlapping with already-processed data)
99-
long bytesToSkip = state.totalEncodedBytesProcessed.get() - bufferAbsoluteOffset;
100-
101-
ByteBuffer bufferToProcess;
102-
if (bytesToSkip > 0) {
103-
// Skip bytes that have already been processed
104-
if (bytesToSkip >= bufferSize) {
105-
// Entire buffer is duplicate, skip it
106-
return Flux.empty();
107-
}
108-
// Skip partial buffer
109-
ByteBuffer skippedBuffer = encodedBuffer.duplicate();
110-
skippedBuffer.position(skippedBuffer.position() + (int) bytesToSkip);
111-
bufferToProcess = skippedBuffer;
112-
// Only count the new bytes
113-
state.totalEncodedBytesProcessed.addAndGet(bufferSize - (int) bytesToSkip);
114-
} else {
115-
// All bytes are new
116-
bufferToProcess = encodedBuffer;
117-
state.totalEncodedBytesProcessed.addAndGet(bufferSize);
118-
}
119-
12089
// Combine with pending data if any
121-
ByteBuffer dataToProcess = state.combineWithPending(bufferToProcess);
90+
ByteBuffer dataToProcess = state.combineWithPending(encodedBuffer);
91+
92+
// Track encoded bytes
93+
int encodedBytesInBuffer = encodedBuffer.remaining();
94+
state.totalEncodedBytesProcessed.addAndGet(encodedBytesInBuffer);
12295

12396
// Try to decode what we have - decoder handles partial data
12497
int availableSize = dataToProcess.remaining();
125-
if (availableSize == 0) {
126-
return Flux.empty();
127-
}
128-
12998
ByteBuffer decodedData = state.decoder.decode(dataToProcess.duplicate(), availableSize);
13099

131100
// Track decoded bytes
@@ -217,30 +186,6 @@ private DecoderState getOrCreateDecoderState(HttpPipelineCallContext context, lo
217186
.orElseGet(() -> new DecoderState(contentLength));
218187
}
219188

220-
/**
221-
* Gets the starting offset of the response by parsing the Content-Range header.
222-
*
223-
* @param httpResponse The HTTP response.
224-
* @return The starting offset, or 0 if not a range response.
225-
*/
226-
private long getResponseStartOffset(HttpResponse httpResponse) {
227-
String contentRange = httpResponse.getHeaderValue(HttpHeaderName.CONTENT_RANGE);
228-
if (contentRange != null && contentRange.startsWith("bytes ")) {
229-
try {
230-
// Format: "bytes start-end/total" or "bytes start-end/*"
231-
String range = contentRange.substring(6); // Skip "bytes "
232-
int dashIndex = range.indexOf('-');
233-
if (dashIndex > 0) {
234-
String startStr = range.substring(0, dashIndex);
235-
return Long.parseLong(startStr);
236-
}
237-
} catch (Exception e) {
238-
LOGGER.warning("Failed to parse Content-Range header: " + contentRange, e);
239-
}
240-
}
241-
return 0;
242-
}
243-
244189
/**
245190
* Checks if the response is a download response.
246191
*
@@ -262,8 +207,6 @@ public static class DecoderState {
262207
private final AtomicLong totalBytesDecoded;
263208
private final AtomicLong totalEncodedBytesProcessed;
264209
private ByteBuffer pendingBuffer;
265-
private long currentResponseStartOffset = 0;
266-
private long currentResponseBytesRead = 0;
267210

268211
/**
269212
* Creates a new decoder state.
@@ -278,16 +221,6 @@ public DecoderState(long expectedContentLength) {
278221
this.pendingBuffer = null;
279222
}
280223

281-
/**
282-
* Sets the starting offset of the current response (for retry handling).
283-
*
284-
* @param offset The starting offset.
285-
*/
286-
public void setCurrentResponseStartOffset(long offset) {
287-
this.currentResponseStartOffset = offset;
288-
this.currentResponseBytesRead = 0;
289-
}
290-
291224
/**
292225
* Combines pending buffer with new data.
293226
*

0 commit comments

Comments
 (0)