@@ -458,13 +458,13 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
458458 // mark all consumed messages as ready to be acknowledged
459459 CheckpointMark checkpointMark = reader .getCheckpointMark ();
460460
461- // consume 1 more message. This will call #ackMsg() on messages that were ready to be acked.
461+ // consume 1 more message.
462462 reader .advance ();
463- assertEquals (4 , countAckMessages .get ());
463+ assertEquals (0 , countAckMessages .get ());
464464
465465 // consume 1 more message. No change in the acknowledged messages.
466466 reader .advance ();
467- assertEquals (4 , countAckMessages .get ());
467+ assertEquals (0 , countAckMessages .get ());
468468
469469 // acknowledge from the first checkpoint
470470 checkpointMark .finalizeCheckpoint ();
@@ -473,6 +473,73 @@ public void testCheckpointMarkAndFinalizeSeparately() throws Exception {
473473 assertEquals (4 , countAckMessages .get ());
474474 }
475475
476+ @ Test
477+ public void testLateCheckpointOverlappingFlushingOfNextBundle () throws Exception {
478+ AtomicInteger countConsumedMessages = new AtomicInteger (0 );
479+ AtomicInteger countAckMessages = new AtomicInteger (0 );
480+
481+ // Broker that creates input data
482+ SerializableFunction <Integer , BytesXMLMessage > recordFn =
483+ index -> {
484+ List <BytesXMLMessage > messages = new ArrayList <>();
485+ for (int i = 0 ; i < 10 ; i ++) {
486+ messages .add (
487+ SolaceDataUtils .getBytesXmlMessage (
488+ "payload_test" + i , "45" + i , (num ) -> countAckMessages .incrementAndGet ()));
489+ }
490+ countConsumedMessages .incrementAndGet ();
491+ return getOrNull (index , messages );
492+ };
493+
494+ SessionServiceFactory fakeSessionServiceFactory =
495+ MockSessionServiceFactory .builder ().recordFn (recordFn ).minMessagesReceived (10 ).build ();
496+
497+ Read <Record > spec =
498+ getDefaultRead ()
499+ .withSessionServiceFactory (fakeSessionServiceFactory )
500+ .withMaxNumConnections (4 );
501+
502+ UnboundedSolaceSource <Record > initialSource = getSource (spec , pipeline );
503+
504+ UnboundedReader <Record > reader =
505+ initialSource .createReader (PipelineOptionsFactory .create (), null );
506+
507+ // start the reader and move to the first record
508+ assertTrue (reader .start ());
509+
510+ // consume 3 messages (NB: #start() already consumed the first message)
511+ for (int i = 0 ; i < 3 ; i ++) {
512+ assertTrue (String .format ("Failed at %d-th message" , i ), reader .advance ());
513+ }
514+
515+ // #advance() was called, but the messages were not ready to be acknowledged.
516+ assertEquals (0 , countAckMessages .get ());
517+
518+ // mark all consumed messages as ready to be acknowledged
519+ CheckpointMark checkpointMark = reader .getCheckpointMark ();
520+
521+ // data is flushed
522+
523+ // consume 1 more message.
524+ reader .advance ();
525+ assertEquals (0 , countAckMessages .get ());
526+
527+ // consume 1 more message. No change in the acknowledged messages.
528+ reader .advance ();
529+ assertEquals (0 , countAckMessages .get ());
530+
531+ CheckpointMark checkpointMark2 = reader .getCheckpointMark ();
532+ // data is prepared for flushing that will be rejected
533+
534+ // acknowledge from the first checkpoint may arrive late
535+ checkpointMark .finalizeCheckpoint ();
536+
537+ assertEquals (4 , countAckMessages .get ());
538+
539+ checkpointMark2 .finalizeCheckpoint ();
540+ assertEquals (6 , countAckMessages .get ());
541+ }
542+
476543 @ Test
477544 public void testCheckpointMarkSafety () throws Exception {
478545
0 commit comments