Skip to content

Commit 3b0fa7b

Browse files
Fix smart retry offset calculation to exclude pending buffer bytes
The root cause of "Unexpected segment number" errors was incorrect retry offset calculation. The pending buffer contains bytes that have already been counted in totalEncodedBytesProcessed but haven't been successfully processed by the decoder yet. When retrying, we were requesting bytes AFTER the pending buffer, causing gaps in the data stream. Key issue: - totalEncodedBytesProcessed tracks ALL bytes received (including those in pending) - Pending buffer holds bytes waiting for more data to complete a structure (header/segment/footer) - Retry offset was set to totalEncodedBytesProcessed, skipping pending bytes - This caused decoder to receive segment N when expecting segment N-1 Solution: 1. Added getRetryOffset() method to DecoderState that returns: totalEncodedBytesProcessed - pendingBufferSize 2. Updated BlobAsyncClientBase to use getRetryOffset() instead of getTotalEncodedBytesProcessed() 3. Added import for DecoderState inner class Example flow: - Receive bytes 0-4, add to totalEncodedBytesProcessed (=5), insufficient for 13-byte header, store in pending - IOException occurs - Retry requests from offset 0 (5 - 5 pending bytes = 0) ✓ - Get byte 5, combine with pending [0-4] = [0-5], still insufficient, store in pending - totalEncodedBytesProcessed now = 6, pending = 6 bytes - Retry requests from offset 0 (6 - 6 = 0) ✓ - Continue until enough bytes accumulated This ensures continuous byte stream to decoder with no gaps or duplicates. Co-authored-by: gunjansingh-msft <[email protected]>
1 parent 899bfb4 commit 3b0fa7b

File tree

2 files changed

+18
-4
lines changed

2 files changed

+18
-4
lines changed

sdk/storage/azure-storage-blob/src/main/java/com/azure/storage/blob/specialized/BlobAsyncClientBase.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import com.azure.storage.common.implementation.StorageImplUtils;
8787
import com.azure.storage.common.DownloadContentValidationOptions;
8888
import com.azure.storage.common.policy.StorageContentValidationDecoderPolicy;
89+
import com.azure.storage.common.policy.StorageContentValidationDecoderPolicy.DecoderState;
8990
import reactor.core.publisher.Flux;
9091
import reactor.core.publisher.Mono;
9192
import reactor.core.publisher.SignalType;
@@ -1404,11 +1405,11 @@ Mono<BlobDownloadAsyncResponse> downloadStreamWithResponse(BlobRange range, Down
14041405
.orElse(null);
14051406

14061407
if (decoderStateObj instanceof StorageContentValidationDecoderPolicy.DecoderState) {
1407-
StorageContentValidationDecoderPolicy.DecoderState decoderState
1408-
= (StorageContentValidationDecoderPolicy.DecoderState) decoderStateObj;
1408+
DecoderState decoderState = (DecoderState) decoderStateObj;
14091409

1410-
// Use the encoded offset for retry (number of encoded bytes processed)
1411-
long encodedOffset = decoderState.getTotalEncodedBytesProcessed();
1410+
// Use the retry offset (encoded bytes processed minus pending buffer)
1411+
// The pending buffer contains bytes that have been counted but not yet successfully processed
1412+
long encodedOffset = decoderState.getRetryOffset();
14121413
long remainingCount = finalCount - encodedOffset;
14131414
retryRange = new BlobRange(initialOffset + encodedOffset, remainingCount);
14141415

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,19 @@ public long getTotalEncodedBytesProcessed() {
282282
return totalEncodedBytesProcessed.get();
283283
}
284284

285+
/**
286+
* Gets the offset to use for retry requests.
287+
* This is the total encoded bytes processed minus any bytes in the pending buffer,
288+
* since pending bytes have already been counted but haven't been successfully processed yet.
289+
*
290+
* @return The offset for retry requests.
291+
*/
292+
public long getRetryOffset() {
293+
long processed = totalEncodedBytesProcessed.get();
294+
int pending = (pendingBuffer != null) ? pendingBuffer.remaining() : 0;
295+
return processed - pending;
296+
}
297+
285298
/**
286299
* Checks if the decoder has finalized.
287300
*

0 commit comments

Comments
 (0)