@@ -60,15 +60,8 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
6060 Long contentLength = getContentLength (httpResponse .getHeaders ());
6161
6262 if (contentLength != null && contentLength > 0 && validationOptions != null ) {
63- // Check if this is a retry - if so, get the number of decoded bytes to skip
64- long bytesToSkip = context .getData (Constants .STRUCTURED_MESSAGE_DECODED_BYTES_TO_SKIP_CONTEXT_KEY )
65- .filter (value -> value instanceof Long )
66- .map (value -> (Long ) value )
67- .orElse (0L );
68-
69- // Always create a fresh decoder for each request
70- // This is necessary because structured messages must be parsed from the beginning
71- DecoderState decoderState = new DecoderState (contentLength , bytesToSkip );
63+ // Get or create decoder with state tracking
64+ DecoderState decoderState = getOrCreateDecoderState (context , contentLength );
7265
7366 // Decode using the stateful decoder
7467 Flux <ByteBuffer > decodedStream = decodeStream (httpResponse .getBody (), decoderState );
@@ -92,13 +85,12 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
9285 */
9386 private Flux <ByteBuffer > decodeStream (Flux <ByteBuffer > encodedFlux , DecoderState state ) {
9487 return encodedFlux .concatMap (encodedBuffer -> {
88+ // Track how many bytes were pending before we process
89+ int previousPendingBytes = (state .pendingBuffer != null ) ? state .pendingBuffer .remaining () : 0 ;
90+
9591 // Combine with pending data if any
9692 ByteBuffer dataToProcess = state .combineWithPending (encodedBuffer );
9793
98- // Track encoded bytes
99- int encodedBytesInBuffer = encodedBuffer .remaining ();
100- state .totalEncodedBytesProcessed .addAndGet (encodedBytesInBuffer );
101-
10294 try {
10395 // Try to decode what we have - decoder handles partial data
10496 // Create duplicate for decoder - it will advance the duplicate's position as it reads
@@ -113,6 +105,14 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
113105 int bytesConsumed = duplicateForDecode .position () - initialPosition ;
114106 int bytesRemaining = availableSize - bytesConsumed ;
115107
108+ // Track the newly consumed encoded bytes (excluding previously pending bytes)
109+ // The consumed bytes include both old pending bytes and new bytes from this buffer
110+ // We only want to add the NEW bytes that were consumed
111+ int newBytesConsumed = bytesConsumed - previousPendingBytes ;
112+ if (newBytesConsumed > 0 ) {
113+ state .totalEncodedBytesProcessed .addAndGet (newBytesConsumed );
114+ }
115+
116116 // Save only unconsumed portion to pending
117117 if (bytesRemaining > 0 ) {
118118 // Position the original buffer to skip consumed bytes, then slice to get unconsumed
@@ -124,33 +124,13 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
124124 state .pendingBuffer = null ;
125125 }
126126
127- // Handle skipping bytes for retries and tracking decoded bytes
127+ // Track decoded bytes
128128 int decodedBytes = decodedData .remaining ();
129+ state .totalBytesDecoded .addAndGet (decodedBytes );
130+
131+ // Return decoded data if any
129132 if (decodedBytes > 0 ) {
130- // Track total decoded bytes
131- long totalDecoded = state .totalBytesDecoded .addAndGet (decodedBytes );
132-
133- // If we need to skip bytes (retry scenario), adjust the buffer
134- if (state .bytesToSkip > 0 ) {
135- long currentPosition = totalDecoded - decodedBytes ; // Where we were before adding these bytes
136-
137- if (currentPosition + decodedBytes <= state .bytesToSkip ) {
138- // All these bytes should be skipped
139- return Flux .empty ();
140- } else if (currentPosition < state .bytesToSkip ) {
141- // Some bytes should be skipped
142- int skipAmount = (int ) (state .bytesToSkip - currentPosition );
143- decodedData .position (decodedData .position () + skipAmount );
144- }
145- // else: no bytes need to be skipped, emit all
146- }
147-
148- // Return decoded data if any remains after skipping
149- if (decodedData .hasRemaining ()) {
150- return Flux .just (decodedData );
151- } else {
152- return Flux .empty ();
153- }
133+ return Flux .just (decodedData );
154134 } else {
155135 return Flux .empty ();
156136 }
@@ -226,6 +206,20 @@ private Long getContentLength(HttpHeaders headers) {
226206 return null ;
227207 }
228208
209+ /**
210+ * Gets or creates a decoder state from context.
211+ *
212+ * @param context The pipeline call context.
213+ * @param contentLength The content length.
214+ * @return The decoder state.
215+ */
216+ private DecoderState getOrCreateDecoderState (HttpPipelineCallContext context , long contentLength ) {
217+ return context .getData (Constants .STRUCTURED_MESSAGE_DECODER_STATE_CONTEXT_KEY )
218+ .filter (value -> value instanceof DecoderState )
219+ .map (value -> (DecoderState ) value )
220+ .orElseGet (() -> new DecoderState (contentLength ));
221+ }
222+
229223 /**
230224 * Checks if the response is a download response.
231225 *
@@ -246,21 +240,18 @@ public static class DecoderState {
246240 private final long expectedContentLength ;
247241 private final AtomicLong totalBytesDecoded ;
248242 private final AtomicLong totalEncodedBytesProcessed ;
249- private final long bytesToSkip ;
250243 private ByteBuffer pendingBuffer ;
251244
252245 /**
253246 * Creates a new decoder state.
254247 *
255248 * @param expectedContentLength The expected length of the encoded content.
256- * @param bytesToSkip The number of decoded bytes to skip (for retry scenarios).
257249 */
258- public DecoderState (long expectedContentLength , long bytesToSkip ) {
250+ public DecoderState (long expectedContentLength ) {
259251 this .expectedContentLength = expectedContentLength ;
260252 this .decoder = new StructuredMessageDecoder (expectedContentLength );
261253 this .totalBytesDecoded = new AtomicLong (0 );
262254 this .totalEncodedBytesProcessed = new AtomicLong (0 );
263- this .bytesToSkip = bytesToSkip ;
264255 this .pendingBuffer = null ;
265256 }
266257
0 commit comments