Skip to content

Commit ec7de83

Browse files
Fix smart retry by preserving partial data in pending buffer on decoder exceptions
The root cause of "Unexpected segment number" errors was that when the decoder threw exceptions due to insufficient data (e.g., "Content not long enough"), the partial data was not saved to the pending buffer. This caused retries to lose accumulated bytes and start over. Key changes: 1. Moved buffer combination and byte tracking outside try block to execute regardless of decoder success/failure 2. Added specific handling for IllegalArgumentException with "not long enough" message 3. When insufficient data error occurs, save all accumulated data to pending buffer and return empty (don't fail) 4. This allows the stream to continue accumulating bytes across retries until enough data is available for decoding How it works now: - Request bytes=0-: Get byte 0, insufficient for header, save to pending, return empty - Stream error triggers retry - Request bytes=1-: Get byte 1, combine with pending [byte 0], still insufficient, save [0,1] to pending - Continue until 13+ bytes accumulated in pending - Eventually enough bytes available, decoder succeeds and processes the header - Smart retry resumes from correct encoded offset This fix enables true smart retry where partial data is preserved across network interruptions. Co-authored-by: gunjansingh-msft <[email protected]>
1 parent d3d9700 commit ec7de83

File tree

1 file changed

+19
-6
lines changed

1 file changed

+19
-6
lines changed

sdk/storage/azure-storage-common/src/main/java/com/azure/storage/common/policy/StorageContentValidationDecoderPolicy.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,14 +85,14 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
8585
*/
8686
private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState state) {
8787
return encodedFlux.concatMap(encodedBuffer -> {
88-
try {
89-
// Combine with pending data if any
90-
ByteBuffer dataToProcess = state.combineWithPending(encodedBuffer);
88+
// Combine with pending data if any
89+
ByteBuffer dataToProcess = state.combineWithPending(encodedBuffer);
9190

92-
// Track encoded bytes
93-
int encodedBytesInBuffer = encodedBuffer.remaining();
94-
state.totalEncodedBytesProcessed.addAndGet(encodedBytesInBuffer);
91+
// Track encoded bytes
92+
int encodedBytesInBuffer = encodedBuffer.remaining();
93+
state.totalEncodedBytesProcessed.addAndGet(encodedBytesInBuffer);
9594

95+
try {
9696
// Try to decode what we have - decoder handles partial data
9797
int availableSize = dataToProcess.remaining();
9898
ByteBuffer decodedData = state.decoder.decode(dataToProcess.duplicate(), availableSize);
@@ -114,6 +114,19 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
114114
} else {
115115
return Flux.empty();
116116
}
117+
} catch (IllegalArgumentException e) {
118+
// Handle decoder exceptions - check if it's due to incomplete data
119+
if (e.getMessage() != null && e.getMessage().contains("not long enough")) {
120+
// Not enough data to decode yet - preserve all data in pending buffer
121+
state.updatePendingBuffer(dataToProcess);
122+
123+
// Don't fail - just return empty and wait for more data
124+
return Flux.empty();
125+
} else {
126+
// Other errors should propagate
127+
LOGGER.error("Failed to decode structured message chunk: " + e.getMessage(), e);
128+
return Flux.error(e);
129+
}
117130
} catch (Exception e) {
118131
LOGGER.error("Failed to decode structured message chunk: " + e.getMessage(), e);
119132
return Flux.error(e);

0 commit comments

Comments
 (0)