Skip to content

Commit 631b8d1

Browse files
Fix decoder duplicate processing by tracking consumed bytes correctly
The root cause was that after calling decoder.decode(), we were saving the ENTIRE combined buffer (pending + new) to the pending buffer, including bytes already consumed by the decoder. This caused the decoder to see duplicate segment headers on subsequent iterations, leading to "Unexpected segment number" errors. The fix: 1. Changed decoder.decode() call from using dataToProcess.duplicate() to using dataToProcess directly 2. Track how many bytes were consumed by comparing buffer size before and after decode 3. Only save UNCONSUMED bytes to the pending buffer 4. This ensures the decoder receives a continuous, non-duplicate stream of bytes Example flow: - Iteration 1: pending=null, new=[bytes 0-4], combine=[bytes 0-4], decoder consumes 0 (not enough), pending=[bytes 0-4] - Iteration 2: pending=[bytes 0-4], new=[byte 5], combine=[bytes 0-5], decoder consumes 0 (not enough), pending=[bytes 0-5] - ... - Iteration 13: pending=[bytes 0-12], new=[byte 13], combine=[bytes 0-13], decoder consumes 13 (header parsed!), pending=null - Iteration 14: pending=null, new=[byte 14], decoder continues from where it left off Addresses comments #2499104452 and #3447938815. Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 107ee8c commit 631b8d1

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,23 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
9393
state.totalEncodedBytesProcessed.addAndGet(encodedBytesInBuffer);
9494

9595
try {
96+
// Record the initial size to calculate how many bytes the decoder consumed
97+
int initialSize = dataToProcess.remaining();
98+
9699
// Try to decode what we have - decoder handles partial data
100+
// Pass the buffer directly (not a duplicate) so we can track consumption
97101
int availableSize = dataToProcess.remaining();
98-
ByteBuffer decodedData = state.decoder.decode(dataToProcess.duplicate(), availableSize);
102+
ByteBuffer decodedData = state.decoder.decode(dataToProcess, availableSize);
103+
104+
// Calculate how many bytes were consumed by the decoder
105+
int bytesConsumed = initialSize - dataToProcess.remaining();
99106

100107
// Track decoded bytes
101108
int decodedBytes = decodedData.remaining();
102109
state.totalBytesDecoded.addAndGet(decodedBytes);
103110

104-
// Store any remaining unprocessed data for next iteration
111+
// Store any remaining UNCONSUMED data for next iteration
112+
// Only save bytes that the decoder hasn't processed yet
105113
if (dataToProcess.hasRemaining()) {
106114
state.updatePendingBuffer(dataToProcess);
107115
} else {

0 commit comments

Comments
 (0)