@@ -98,15 +98,17 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
9898 .addKeyValue ("lastCompleteSegment" , state .decoder .getLastCompleteSegmentStart ())
9999 .log ("Received buffer in decodeStream" );
100100
101- // Combine with pending data if any
101+ // Combine with pending data if any - always returns buffer with position=0 and LITTLE_ENDIAN
102102 ByteBuffer dataToProcess = state .combineWithPending (encodedBuffer );
103103
104104 try {
105- // Try to decode what we have - decoder handles partial data
106- // Create duplicate for decoder - it will advance the duplicate's position as it reads
105+ // Track initial position for consumption calculation
107106 int availableSize = dataToProcess .remaining ();
107+ int initialPosition = dataToProcess .position ();
108+
109+ // Create a duplicate for decoder - it will advance the duplicate's position as it reads
108110 ByteBuffer duplicateForDecode = dataToProcess .duplicate ();
109- int initialPosition = duplicateForDecode .position ( );
111+ duplicateForDecode .order ( java . nio . ByteOrder . LITTLE_ENDIAN );
110112
111113 // Decode - this advances duplicateForDecode's position
112114 ByteBuffer decodedData = state .decoder .decode (duplicateForDecode , availableSize );
@@ -115,25 +117,35 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
115117 int bytesConsumed = duplicateForDecode .position () - initialPosition ;
116118 int bytesRemaining = availableSize - bytesConsumed ;
117119
120+ LOGGER .atVerbose ()
121+ .addKeyValue ("bytesConsumed" , bytesConsumed )
122+ .addKeyValue ("bytesRemaining" , bytesRemaining )
123+ .addKeyValue ("decoderOffset" , state .decoder .getMessageOffset ())
124+ .log ("Decode iteration complete" );
125+
118126 // Save only unconsumed portion to pending
119127 if (bytesRemaining > 0 ) {
120128 // Position the original buffer to skip consumed bytes, then slice to get unconsumed
121- dataToProcess .position (bytesConsumed );
129+ dataToProcess .position (initialPosition + bytesConsumed );
122130 ByteBuffer unconsumed = dataToProcess .slice ();
131+ unconsumed .order (java .nio .ByteOrder .LITTLE_ENDIAN );
123132 state .updatePendingBuffer (unconsumed );
124133 } else {
125- // All data was consumed
134+ // All data was consumed - clear pending
126135 state .pendingBuffer = null ;
127136 }
128137
129- // Track decoded bytes
138+ // Track decoded bytes - ALWAYS track, even if zero (for bookkeeping)
130139 int decodedBytes = decodedData .remaining ();
131- state .totalBytesDecoded .addAndGet (decodedBytes );
140+ if (decodedBytes > 0 ) {
141+ state .totalBytesDecoded .addAndGet (decodedBytes );
142+ }
132143
133- // Return decoded data if any
144+ // Return decoded data if any, otherwise empty flux
134145 if (decodedBytes > 0 ) {
135146 return Flux .just (decodedData );
136147 } else {
148+ // Zero-length decoded payload - still successfully processed, just no output
137149 return Flux .empty ();
138150 }
139151 } catch (IllegalArgumentException e ) {
@@ -259,29 +271,43 @@ public DecoderState(long expectedContentLength) {
259271
260272 /**
261273 * Combines pending buffer with new data.
274+ * Always returns a buffer with position=0 and LITTLE_ENDIAN byte order.
262275 *
263276 * @param newBuffer The new buffer to combine.
264- * @return Combined buffer.
277+ * @return Combined buffer with LITTLE_ENDIAN byte order and position=0 .
265278 */
266279 private ByteBuffer combineWithPending (ByteBuffer newBuffer ) {
267280 if (pendingBuffer == null || !pendingBuffer .hasRemaining ()) {
268- return newBuffer .duplicate ();
281+ // Return a duplicate slice with LITTLE_ENDIAN and position=0
282+ ByteBuffer dup = newBuffer .duplicate ().slice ();
283+ dup .order (java .nio .ByteOrder .LITTLE_ENDIAN );
284+ return dup ;
269285 }
270286
271- ByteBuffer combined = ByteBuffer .allocate (pendingBuffer .remaining () + newBuffer .remaining ());
272- combined .put (pendingBuffer .duplicate ());
273- combined .put (newBuffer .duplicate ());
287+ // Create slices with LITTLE_ENDIAN order
288+ ByteBuffer pendingSlice = pendingBuffer .duplicate ().slice ();
289+ pendingSlice .order (java .nio .ByteOrder .LITTLE_ENDIAN );
290+ ByteBuffer newSlice = newBuffer .duplicate ().slice ();
291+ newSlice .order (java .nio .ByteOrder .LITTLE_ENDIAN );
292+
293+ // Allocate combined buffer with LITTLE_ENDIAN order
294+ ByteBuffer combined = ByteBuffer .allocate (pendingSlice .remaining () + newSlice .remaining ());
295+ combined .order (java .nio .ByteOrder .LITTLE_ENDIAN );
296+ combined .put (pendingSlice );
297+ combined .put (newSlice );
274298 combined .flip ();
275299 return combined ;
276300 }
277301
278302 /**
279303 * Updates the pending buffer with remaining data.
304+ * Allocates a new buffer with LITTLE_ENDIAN byte order.
280305 *
281306 * @param dataToProcess The buffer with remaining data.
282307 */
283308 private void updatePendingBuffer (ByteBuffer dataToProcess ) {
284309 pendingBuffer = ByteBuffer .allocate (dataToProcess .remaining ());
310+ pendingBuffer .order (java .nio .ByteOrder .LITTLE_ENDIAN );
285311 pendingBuffer .put (dataToProcess );
286312 pendingBuffer .flip ();
287313 }
0 commit comments