@@ -184,12 +184,15 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
184184 encodedSlice = encodedSlice .slice ();
185185 encodedSlice .order (ByteOrder .LITTLE_ENDIAN );
186186
187+ // Track the last complete segment before decoding
188+ long lastCompleteSegmentBefore = state .decoder .getLastCompleteSegmentStart ();
189+
187190 // Decode the segment
188191 ByteBuffer decoded = state .decoder .decode (encodedSlice );
189192
190193 // Track decoded bytes - update counter regardless of whether bytes were produced
191194 int decodedProduced = (decoded != null ) ? decoded .remaining () : 0 ;
192-
195+
193196 LOGGER .atInfo ()
194197 .addKeyValue ("relativeIndex" , relativeIndex )
195198 .addKeyValue ("encodedSegmentSize" , encodedSegmentSize )
@@ -200,7 +203,7 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
200203
201204 // Update tracked bytes
202205 state .totalEncodedBytesProcessed .addAndGet (encodedSegmentSize );
203-
206+
204207 // Always update decoded byte counter (even if zero bytes produced)
205208 if (decodedProduced > 0 ) {
206209 state .totalBytesDecoded .addAndGet (decodedProduced );
@@ -216,6 +219,16 @@ private Flux<ByteBuffer> decodeStream(Flux<ByteBuffer> encodedFlux, DecoderState
216219 .log ("No decoded bytes produced from segment" );
217220 }
218221
222+ // Check if a segment was completed - if so, snapshot decoded bytes count
223+ long lastCompleteSegmentAfter = state .decoder .getLastCompleteSegmentStart ();
224+ if (lastCompleteSegmentAfter > lastCompleteSegmentBefore ) {
225+ state .decodedBytesAtLastCompleteSegment = state .totalBytesDecoded .get ();
226+ LOGGER .atInfo ()
227+ .addKeyValue ("lastCompleteSegment" , lastCompleteSegmentAfter )
228+ .addKeyValue ("decodedBytesSnapshot" , state .decodedBytesAtLastCompleteSegment )
229+ .log ("Segment completed, snapshotting decoded bytes" );
230+ }
231+
219232 // Check if we've completed the message
220233 if (state .decoder .getMessageOffset () >= state .expectedContentLength ) {
221234 state .pendingBuffer = null ;
@@ -335,6 +348,7 @@ public static class DecoderState {
335348 private final AtomicLong totalBytesDecoded ;
336349 private final AtomicLong totalEncodedBytesProcessed ;
337350 private ByteBuffer pendingBuffer ;
351+ private long decodedBytesAtLastCompleteSegment ;
338352
339353 /**
340354 * Creates a new decoder state.
@@ -347,6 +361,7 @@ public DecoderState(long expectedContentLength) {
347361 this .totalBytesDecoded = new AtomicLong (0 );
348362 this .totalEncodedBytesProcessed = new AtomicLong (0 );
349363 this .pendingBuffer = null ;
364+ this .decodedBytesAtLastCompleteSegment = 0 ;
350365 }
351366
352367 /**
@@ -426,18 +441,27 @@ public long getRetryOffset() {
426441 long retryOffset = decoder .getLastCompleteSegmentStart ();
427442 long decoderOffsetBefore = decoder .getMessageOffset ();
428443 int pendingSize = (pendingBuffer != null ) ? pendingBuffer .remaining () : 0 ;
444+ long totalProcessedBefore = totalEncodedBytesProcessed .get ();
429445
430446 LOGGER .atInfo ()
431447 .addKeyValue ("retryOffset" , retryOffset )
432448 .addKeyValue ("decoderOffsetBefore" , decoderOffsetBefore )
433449 .addKeyValue ("pendingBytes" , pendingSize )
434- .addKeyValue ("totalProcessed " , totalEncodedBytesProcessed . get () )
450+ .addKeyValue ("totalProcessedBefore " , totalProcessedBefore )
435451 .log ("Computing retry offset" );
436452
437453 // Reset decoder to the last complete segment boundary
438454 // This ensures messageOffset and segment state match the retry offset
439455 decoder .resetToLastCompleteSegment ();
440456
457+ // Reset totalEncodedBytesProcessed to match the retry offset
458+ // This ensures absoluteStartOfCombined calculation is correct for retry data
459+ totalEncodedBytesProcessed .set (retryOffset );
460+
461+ // Reset totalBytesDecoded to the snapshot at last complete segment
462+ // This ensures decoded byte counting is correct for retry
463+ totalBytesDecoded .set (decodedBytesAtLastCompleteSegment );
464+
441465 // Clear pending buffer since we're restarting from the segment boundary
442466 // Any bytes in pending are from after this boundary and will be re-fetched
443467 if (pendingBuffer != null && pendingBuffer .hasRemaining ()) {
@@ -450,6 +474,8 @@ public long getRetryOffset() {
450474
451475 LOGGER .atInfo ()
452476 .addKeyValue ("retryOffset" , retryOffset )
477+ .addKeyValue ("totalProcessedAfter" , totalEncodedBytesProcessed .get ())
478+ .addKeyValue ("totalDecodedAfter" , totalBytesDecoded .get ())
453479 .log ("Retry offset calculated (last complete segment boundary)" );
454480 return retryOffset ;
455481 }
0 commit comments