4747import java .util .Map ;
4848import java .util .Map .Entry ;
4949import java .util .Properties ;
50+ import java .util .Set ;
5051import java .util .concurrent .CountDownLatch ;
5152import java .util .concurrent .Executors ;
5253import java .util .concurrent .TimeUnit ;
@@ -2553,14 +2554,6 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
25532554 AtomicBoolean first = new AtomicBoolean (true );
25542555 AtomicBoolean rebalance = new AtomicBoolean (true );
25552556 AtomicReference <ConsumerRebalanceListener > rebal = new AtomicReference <>();
2556- given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
2557- Thread .sleep (50 );
2558- if (rebalance .getAndSet (false )) {
2559- rebal .get ().onPartitionsRevoked (Collections .emptyList ());
2560- rebal .get ().onPartitionsAssigned (records .keySet ());
2561- }
2562- return first .getAndSet (false ) ? consumerRecords : emptyRecords ;
2563- });
25642557 final CountDownLatch seekLatch = new CountDownLatch (7 );
25652558 willAnswer (i -> {
25662559 seekLatch .countDown ();
@@ -2569,17 +2562,32 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
25692562 given (consumer .assignment ()).willReturn (records .keySet ());
25702563 final CountDownLatch pauseLatch1 = new CountDownLatch (2 ); // consumer, event publisher
25712564 final CountDownLatch pauseLatch2 = new CountDownLatch (2 ); // consumer, consumer
2565+ Set <TopicPartition > pausedParts = new HashSet <>();
25722566 willAnswer (i -> {
25732567 pauseLatch1 .countDown ();
25742568 pauseLatch2 .countDown ();
2569+ pausedParts .addAll (i .getArgument (0 ));
25752570 return null ;
25762571 }).given (consumer ).pause (records .keySet ());
2577- given (consumer .paused ()).willReturn (records .keySet ());
2572+ given (consumer .paused ()).willReturn (pausedParts );
2573+ CountDownLatch pollWhilePausedLatch = new CountDownLatch (2 );
2574+ given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
2575+ Thread .sleep (50 );
2576+ if (pauseLatch1 .getCount () == 0 ) {
2577+ pollWhilePausedLatch .countDown ();
2578+ }
2579+ if (rebalance .getAndSet (false )) {
2580+ rebal .get ().onPartitionsRevoked (Collections .emptyList ());
2581+ rebal .get ().onPartitionsAssigned (records .keySet ());
2582+ }
2583+ return first .getAndSet (false ) ? consumerRecords : emptyRecords ;
2584+ });
25782585 final CountDownLatch resumeLatch = new CountDownLatch (2 );
25792586 willAnswer (i -> {
25802587 resumeLatch .countDown ();
2588+ pausedParts .removeAll (i .getArgument (0 ));
25812589 return null ;
2582- }).given (consumer ).resume (records . keySet ());
2590+ }).given (consumer ).resume (any ());
25832591 willAnswer (invoc -> {
25842592 rebal .set (invoc .getArgument (1 ));
25852593 return null ;
@@ -2671,6 +2679,8 @@ else if (e instanceof ConsumerStoppedEvent) {
26712679 assertThat (container .isPaused ()).isTrue ();
26722680 assertThat (pauseLatch1 .await (10 , TimeUnit .SECONDS )).isTrue ();
26732681 assertThat (container .isContainerPaused ()).isTrue ();
2682+ assertThat (pollWhilePausedLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
2683+ verify (consumer , never ()).resume (any ());
26742684 rebalance .set (true ); // force a re-pause
26752685 assertThat (pauseLatch2 .await (10 , TimeUnit .SECONDS )).isTrue ();
26762686 container .resume ();
@@ -2680,6 +2690,59 @@ else if (e instanceof ConsumerStoppedEvent) {
26802690 verify (consumer , times (6 )).commitSync (anyMap (), eq (Duration .ofSeconds (41 )));
26812691 }
26822692
2693+ @ SuppressWarnings ({ "unchecked" })
2694+ @ Test
2695+ public void dontResumePausedPartition () throws Exception {
2696+ ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
2697+ Consumer <Integer , String > consumer = mock (Consumer .class );
2698+ given (cf .createConsumer (eq ("grp" ), eq ("clientId" ), isNull (), any ())).willReturn (consumer );
2699+ ConsumerRecords <Integer , String > emptyRecords = new ConsumerRecords <>(Collections .emptyMap ());
2700+ AtomicBoolean first = new AtomicBoolean (true );
2701+ given (consumer .assignment ()).willReturn (Set .of (new TopicPartition ("foo" , 0 ), new TopicPartition ("foo" , 1 )));
2702+ final CountDownLatch pauseLatch1 = new CountDownLatch (1 );
2703+ final CountDownLatch pauseLatch2 = new CountDownLatch (2 );
2704+ Set <TopicPartition > pausedParts = new HashSet <>();
2705+ willAnswer (i -> {
2706+ pausedParts .addAll (i .getArgument (0 ));
2707+ pauseLatch1 .countDown ();
2708+ pauseLatch2 .countDown ();
2709+ return null ;
2710+ }).given (consumer ).pause (any ());
2711+ given (consumer .paused ()).willReturn (pausedParts );
2712+ given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
2713+ Thread .sleep (50 );
2714+ return emptyRecords ;
2715+ });
2716+ final CountDownLatch resumeLatch = new CountDownLatch (1 );
2717+ willAnswer (i -> {
2718+ resumeLatch .countDown ();
2719+ pausedParts .removeAll (i .getArgument (0 ));
2720+ return null ;
2721+ }).given (consumer ).resume (any ());
2722+ ContainerProperties containerProps = new ContainerProperties (new TopicPartitionOffset ("foo" , 0 ),
2723+ new TopicPartitionOffset ("foo" , 1 ));
2724+ containerProps .setGroupId ("grp" );
2725+ containerProps .setAckMode (AckMode .RECORD );
2726+ containerProps .setClientId ("clientId" );
2727+ containerProps .setIdleEventInterval (100L );
2728+ containerProps .setMessageListener ((MessageListener ) rec -> { });
2729+ containerProps .setMissingTopicsFatal (false );
2730+ KafkaMessageListenerContainer <Integer , String > container =
2731+ new KafkaMessageListenerContainer <>(cf , containerProps );
2732+ container .start ();
2733+ InOrder inOrder = inOrder (consumer );
2734+ container .pausePartition (new TopicPartition ("foo" , 1 ));
2735+ assertThat (pauseLatch1 .await (10 , TimeUnit .SECONDS )).isTrue ();
2736+ assertThat (pausedParts ).hasSize (1 );
2737+ container .pause ();
2738+ assertThat (pauseLatch2 .await (10 , TimeUnit .SECONDS )).isTrue ();
2739+ assertThat (pausedParts ).hasSize (2 );
2740+ container .resume ();
2741+ assertThat (resumeLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
2742+ assertThat (pausedParts ).hasSize (1 );
2743+ container .stop ();
2744+ }
2745+
26832746 @ SuppressWarnings ({ "unchecked" , "rawtypes" })
26842747 @ Test
26852748 public void testInitialSeek () throws Exception {
0 commit comments