@@ -610,6 +610,100 @@ void testCorrectHandlingOfOutOfOrderResponsesWhenSecondSucceeds() throws Excepti
610610 assertThat (future1 .get ()).isNull ();
611611 }
612612
613+ /**
614+ * Tests that when a batch's response is lost (e.g., due to request timeout) but the batch was
615+ * successfully written on the server, and subsequent batches with higher sequence numbers are
616+ * acknowledged, the client should treat the retried batch as already committed instead of
617+ * entering an infinite retry loop with {@link
618+ * org.apache.fluss.exception.OutOfOrderSequenceException}.
619+ *
620+ * <p>Detailed scenario:
621+ *
622+ * <ol>
623+ * <li>Send batch1(seq=0) ~ batch5(seq=4). All 5 batches are successfully written on the
624+ * server (server {@code lastBatchSeq=4}).
625+ * <li>batch2~5 (seq=1~4) responses return normally. Client {@code lastAckedBatchSequence=4}.
626+ * <li>Send batch6(seq=5) and ack successfully. Server {@code lastBatchSeq=5}.
627+ * <li>batch1(seq=0) response is lost due to {@code REQUEST_TIME_OUT}. batch1 is re-enqueued
628+ * for retry.
629+ * <li>Client retries batch1(seq=0). Since server {@code lastBatchSeq=5} and {@code 0 != 5+1},
630+ * server returns {@code OUT_OF_ORDER_SEQUENCE_EXCEPTION}.
631+ * <li>Client detects {@code batch1.seq(0) <= lastAckedBatchSequence(5)}: batch1 is already
632+ * committed. Client completes batch1 successfully without further retries.
633+ * </ol>
634+ */
635+ @ Test
636+ void testCorrectHandlingOfOutOfOrderResponsesWhenResponseLostButSubsequentBatchesSucceeded ()
637+ throws Exception {
638+ IdempotenceManager idempotenceManager = createIdempotenceManager (true );
639+ Sender sender1 = setupWithIdempotenceState (idempotenceManager );
640+ sender1 .runOnce ();
641+ assertThat (idempotenceManager .isWriterIdValid ()).isTrue ();
642+ assertThat (idempotenceManager .nextSequence (tb1 )).isEqualTo (0 );
643+
644+ // Send batch1 (seq=0): its response will be lost later.
645+ CompletableFuture <Exception > future1 = new CompletableFuture <>();
646+ appendToAccumulator (tb1 , row (1 , "a" ), (tb , leo , e ) -> future1 .complete (e ));
647+ sender1 .runOnce ();
648+ assertThat (future1 .isDone ()).isFalse ();
649+ assertThat (idempotenceManager .nextSequence (tb1 )).isEqualTo (1 );
650+ assertThat (idempotenceManager .lastAckedBatchSequence (tb1 )).isNotPresent ();
651+
652+ // Send batch2~5 (seq=1~4) and collect their futures.
653+ int numFollowingBatches = 4 ;
654+ List <CompletableFuture <Exception >> followingFutures = new ArrayList <>();
655+ for (int i = 0 ; i < numFollowingBatches ; i ++) {
656+ CompletableFuture <Exception > future = new CompletableFuture <>();
657+ followingFutures .add (future );
658+ appendToAccumulator (tb1 , row (i + 2 , "b" ), (tb , leo , e ) -> future .complete (e ));
659+ sender1 .runOnce ();
660+ assertThat (future .isDone ()).isFalse ();
661+ }
662+ assertThat (idempotenceManager .nextSequence (tb1 )).isEqualTo (5 );
663+
664+ // batch2~5 (seq=1~4) responses return normally.
665+ for (int seq = 1 ; seq <= numFollowingBatches ; seq ++) {
666+ finishIdempotentProduceLogRequest (
667+ seq , tb1 , 1 , createProduceLogResponse (tb1 , seq , seq + 1L ));
668+ assertThat (idempotenceManager .lastAckedBatchSequence (tb1 )).isEqualTo (Optional .of (seq ));
669+ assertThat (followingFutures .get (seq - 1 ).isDone ()).isTrue ();
670+ assertThat (followingFutures .get (seq - 1 ).get ()).isNull ();
671+ }
672+
673+ // Send batch6 (seq=5) and ack successfully.
674+ // Now server lastBatchSeq=5. batch1 (seq=0) is still waiting response.
675+ CompletableFuture <Exception > future6 = new CompletableFuture <>();
676+ appendToAccumulator (tb1 , row (6 , "f" ), (tb , leo , e ) -> future6 .complete (e ));
677+ sender1 .runOnce (); // drain and send batch6 (seq=5)
678+ finishIdempotentProduceLogRequest (5 , tb1 , 1 , createProduceLogResponse (tb1 , 5L , 6L ));
679+ assertThat (idempotenceManager .lastAckedBatchSequence (tb1 )).isEqualTo (Optional .of (5 ));
680+ assertThat (future6 .isDone ()).isTrue ();
681+ assertThat (future6 .get ()).isNull ();
682+
683+ // All 6 batches are written successfully on the server (server lastBatchSeq=5).
684+ // batch1 (seq=0) response is lost, simulated by REQUEST_TIME_OUT.
685+ finishIdempotentProduceLogRequest (
686+ 0 , tb1 , 0 , createProduceLogResponse (tb1 , Errors .REQUEST_TIME_OUT ));
687+ assertThat (future1 .isDone ()).isFalse ();
688+
689+ // Now retry batch1 (seq=0). Server lastBatchSeq=5, so 0 != 5+1,
690+ // server returns OUT_OF_ORDER_SEQUENCE_EXCEPTION.
691+ sender1 .runOnce (); // send retried batch1
692+ finishIdempotentProduceLogRequest (
693+ 0 , tb1 , 0 , createProduceLogResponse (tb1 , Errors .OUT_OF_ORDER_SEQUENCE_EXCEPTION ));
694+
695+ // The client should detect that batch1.seq(0) <= lastAckedBatchSequence(5),
696+ // meaning batch1 was already committed on the server (its response was just lost).
697+ // It should complete batch1 successfully instead of entering an infinite retry loop.
698+ assertThat (future1 .isDone ()).isTrue ();
699+ assertThat (future1 .get ()).isNull ();
700+ // lastAckedBatchSequence should remain at 5 (not changed by completing already-committed
701+ // batch1)
702+ assertThat (idempotenceManager .lastAckedBatchSequence (tb1 )).isEqualTo (Optional .of (5 ));
703+ // No more inflight batches
704+ assertThat (sender1 .numOfInFlightBatches (tb1 )).isEqualTo (0 );
705+ }
706+
613707 @ Test
614708 void testCorrectHandlingOfDuplicateSequenceError () throws Exception {
615709 IdempotenceManager idempotenceManager = createIdempotenceManager (true );
0 commit comments