@@ -60,8 +60,15 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
6060 Long contentLength = getContentLength (httpResponse .getHeaders ());
6161
6262 if (contentLength != null && contentLength > 0 && validationOptions != null ) {
63- // Get or create decoder with state tracking
64- DecoderState decoderState = getOrCreateDecoderState (context , contentLength );
63+ // Check if this is a retry - if so, get the number of decoded bytes to skip
64+ long bytesToSkip = context .getData (Constants .STRUCTURED_MESSAGE_DECODED_BYTES_TO_SKIP_CONTEXT_KEY )
65+ .filter (value -> value instanceof Long )
66+ .map (value -> (Long ) value )
67+ .orElse (0L );
68+
69+ // Always create a fresh decoder for each request
70+ // This is necessary because structured messages must be parsed from the beginning
71+ DecoderState decoderState = new DecoderState (contentLength , bytesToSkip );
6572
6673 // Decode using the stateful decoder
6774 Flux <ByteBuffer > decodedStream = decodeStream (httpResponse .getBody (), decoderState );
@@ -102,10 +109,6 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
102109 // Decode - this advances duplicateForDecode's position
103110 ByteBuffer decodedData = state .decoder .decode (duplicateForDecode , availableSize );
104111
105- // Track decoded bytes
106- int decodedBytes = decodedData .remaining ();
107- state .totalBytesDecoded .addAndGet (decodedBytes );
108-
109112 // Calculate how much of the input buffer was consumed by checking the duplicate's position
110113 int bytesConsumed = duplicateForDecode .position () - initialPosition ;
111114 int bytesRemaining = availableSize - bytesConsumed ;
@@ -121,9 +124,33 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
121124 state .pendingBuffer = null ;
122125 }
123126
124- // Return decoded data if any
127+ // Handle skipping bytes for retries and tracking decoded bytes
128+ int decodedBytes = decodedData .remaining ();
125129 if (decodedBytes > 0 ) {
126- return Flux .just (decodedData );
130+ // Track total decoded bytes
131+ long totalDecoded = state .totalBytesDecoded .addAndGet (decodedBytes );
132+
133+ // If we need to skip bytes (retry scenario), adjust the buffer
134+ if (state .bytesToSkip > 0 ) {
135+ long currentPosition = totalDecoded - decodedBytes ; // Where we were before adding these bytes
136+
137+ if (currentPosition + decodedBytes <= state .bytesToSkip ) {
138+ // All these bytes should be skipped
139+ return Flux .empty ();
140+ } else if (currentPosition < state .bytesToSkip ) {
141+ // Some bytes should be skipped
142+ int skipAmount = (int ) (state .bytesToSkip - currentPosition );
143+ decodedData .position (decodedData .position () + skipAmount );
144+ }
145+ // else: no bytes need to be skipped, emit all
146+ }
147+
148+ // Return decoded data if any remains after skipping
149+ if (decodedData .hasRemaining ()) {
150+ return Flux .just (decodedData );
151+ } else {
152+ return Flux .empty ();
153+ }
127154 } else {
128155 return Flux .empty ();
129156 }
@@ -199,20 +226,6 @@ private Long getContentLength(HttpHeaders headers) {
199226 return null ;
200227 }
201228
202- /**
203- * Gets or creates a decoder state from context.
204- *
205- * @param context The pipeline call context.
206- * @param contentLength The content length.
207- * @return The decoder state.
208- */
209- private DecoderState getOrCreateDecoderState (HttpPipelineCallContext context , long contentLength ) {
210- return context .getData (Constants .STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY )
211- .filter (value -> value instanceof DecoderState )
212- .map (value -> (DecoderState ) value )
213- .orElseGet (() -> new DecoderState (contentLength ));
214- }
215-
216229 /**
217230 * Checks if the response is a download response.
218231 *
@@ -233,18 +246,21 @@ public static class DecoderState {
233246 private final long expectedContentLength ;
234247 private final AtomicLong totalBytesDecoded ;
235248 private final AtomicLong totalEncodedBytesProcessed ;
249+ private final long bytesToSkip ;
236250 private ByteBuffer pendingBuffer ;
237251
238252 /**
239253 * Creates a new decoder state.
240254 *
241255 * @param expectedContentLength The expected length of the encoded content.
256+ * @param bytesToSkip The number of decoded bytes to skip (for retry scenarios).
242257 */
243- public DecoderState (long expectedContentLength ) {
258+ public DecoderState (long expectedContentLength , long bytesToSkip ) {
244259 this .expectedContentLength = expectedContentLength ;
245260 this .decoder = new StructuredMessageDecoder (expectedContentLength );
246261 this .totalBytesDecoded = new AtomicLong (0 );
247262 this .totalEncodedBytesProcessed = new AtomicLong (0 );
263+ this .bytesToSkip = bytesToSkip ;
248264 this .pendingBuffer = null ;
249265 }
250266
0 commit comments