@@ -2824,6 +2824,93 @@ public void rePausePartitionAfterRebalance() throws Exception {
28242824 container .stop ();
28252825 }
28262826
2827+ @ SuppressWarnings ({ "unchecked" })
2828+ @ Test
2829+ public void resumePartitionAfterRevokeAndReAssign () throws Exception {
2830+ ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
2831+ Consumer <Integer , String > consumer = mock (Consumer .class );
2832+ given (cf .createConsumer (eq ("grp" ), eq ("clientId" ), isNull (), any ())).willReturn (consumer );
2833+ AtomicBoolean first = new AtomicBoolean (true );
2834+ TopicPartition tp0 = new TopicPartition ("foo" , 0 );
2835+ TopicPartition tp1 = new TopicPartition ("foo" , 1 );
2836+ given (consumer .assignment ()).willReturn (Set .of (tp0 , tp1 ));
2837+ final CountDownLatch pauseLatch1 = new CountDownLatch (1 );
2838+ final CountDownLatch suspendConsumerThread = new CountDownLatch (1 );
2839+ Set <TopicPartition > pausedParts = ConcurrentHashMap .newKeySet ();
2840+ Thread testThread = Thread .currentThread ();
2841+ AtomicBoolean paused = new AtomicBoolean ();
2842+ willAnswer (i -> {
2843+ pausedParts .clear ();
2844+ pausedParts .addAll (i .getArgument (0 ));
2845+ if (!Thread .currentThread ().equals (testThread )) {
2846+ paused .set (true );
2847+ }
2848+ return null ;
2849+ }).given (consumer ).pause (any ());
2850+ given (consumer .paused ()).willReturn (pausedParts );
2851+ given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
2852+ if (paused .get ()) {
2853+ pauseLatch1 .countDown ();
2854+ // hold up the consumer thread while we revoke/assign partitions on the test thread
2855+ suspendConsumerThread .await (10 , TimeUnit .SECONDS );
2856+ }
2857+ Thread .sleep (50 );
2858+ return ConsumerRecords .empty ();
2859+ });
2860+ AtomicReference <ConsumerRebalanceListener > rebal = new AtomicReference <>();
2861+ Collection <String > foos = new ArrayList <>();
2862+ foos .add ("foo" );
2863+ willAnswer (inv -> {
2864+ rebal .set (inv .getArgument (1 ));
2865+ rebal .get ().onPartitionsAssigned (Set .of (tp0 , tp1 ));
2866+ return null ;
2867+ }).given (consumer ).subscribe (eq (foos ), any (ConsumerRebalanceListener .class ));
2868+ final CountDownLatch resumeLatch = new CountDownLatch (1 );
2869+ willAnswer (i -> {
2870+ pausedParts .removeAll (i .getArgument (0 ));
2871+ resumeLatch .countDown ();
2872+ return null ;
2873+ }).given (consumer ).resume (any ());
2874+ ContainerProperties containerProps = new ContainerProperties ("foo" );
2875+ containerProps .setGroupId ("grp" );
2876+ containerProps .setAckMode (AckMode .RECORD );
2877+ containerProps .setClientId ("clientId" );
2878+ containerProps .setIdleEventInterval (100L );
2879+ containerProps .setMessageListener ((MessageListener ) rec -> { });
2880+ containerProps .setMissingTopicsFatal (false );
2881+ KafkaMessageListenerContainer <Integer , String > container =
2882+ new KafkaMessageListenerContainer <>(cf , containerProps );
2883+ container .start ();
2884+ container .pausePartition (tp0 );
2885+ container .pausePartition (tp1 );
2886+ assertThat (pauseLatch1 .await (10 , TimeUnit .SECONDS )).isTrue ();
2887+ assertThat (pausedParts ).hasSize (2 )
2888+ .contains (tp0 , tp1 );
2889+ rebal .get ().onPartitionsRevoked (Set .of (tp0 , tp1 ));
2890+ rebal .get ().onPartitionsAssigned (Collections .singleton (tp0 ));
2891+ rebal .get ().onPartitionsRevoked (Set .of (tp0 ));
2892+ rebal .get ().onPartitionsAssigned (Set .of (tp0 , tp1 ));
2893+ assertThat (pausedParts ).hasSize (2 )
2894+ .contains (tp0 , tp1 );
2895+ assertThat (container ).extracting ("listenerConsumer" )
2896+ .extracting ("pausedPartitions" )
2897+ .asInstanceOf (InstanceOfAssertFactories .collection (TopicPartition .class ))
2898+ .hasSize (2 )
2899+ .contains (tp0 , tp1 );
2900+ assertThat (container )
2901+ .extracting ("pauseRequestedPartitions" )
2902+ .asInstanceOf (InstanceOfAssertFactories .collection (TopicPartition .class ))
2903+ .hasSize (2 )
2904+ .contains (tp0 , tp1 );
2905+ container .resumePartition (tp0 );
2906+ container .resumePartition (tp1 );
2907+ suspendConsumerThread .countDown ();
2908+ assertThat (resumeLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
2909+ assertThat (pausedParts ).hasSize (0 );
2910+ verify (consumer ).resume (List .of (tp0 , tp1 ));
2911+ container .stop ();
2912+ }
2913+
28272914 @ SuppressWarnings ({ "unchecked" , "rawtypes" })
28282915 @ Test
28292916 public void testInitialSeek () throws Exception {
0 commit comments