@@ -2769,7 +2769,6 @@ public void rePausePartitionAfterRebalance() throws Exception {
27692769 rebal .get ().onPartitionsAssigned (Set .of (tp0 , tp1 ));
27702770 return null ;
27712771 }).given (consumer ).subscribe (eq (foos ), any (ConsumerRebalanceListener .class ));
2772- final CountDownLatch resumeLatch = new CountDownLatch (1 );
27732772 ContainerProperties containerProps = new ContainerProperties ("foo" );
27742773 containerProps .setGroupId ("grp" );
27752774 containerProps .setAckMode (AckMode .RECORD );
@@ -2780,7 +2779,6 @@ public void rePausePartitionAfterRebalance() throws Exception {
27802779 KafkaMessageListenerContainer <Integer , String > container =
27812780 new KafkaMessageListenerContainer <>(cf , containerProps );
27822781 container .start ();
2783- InOrder inOrder = inOrder (consumer );
27842782 assertThat (firstPoll .await (10 , TimeUnit .SECONDS )).isNotNull ();
27852783 container .pausePartition (tp0 );
27862784 container .pausePartition (tp1 );
@@ -2811,7 +2809,6 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception {
28112809 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
28122810 Consumer <Integer , String > consumer = mock (Consumer .class );
28132811 given (cf .createConsumer (eq ("grp" ), eq ("clientId" ), isNull (), any ())).willReturn (consumer );
2814- AtomicBoolean first = new AtomicBoolean (true );
28152812 TopicPartition tp0 = new TopicPartition ("foo" , 0 );
28162813 TopicPartition tp1 = new TopicPartition ("foo" , 1 );
28172814 given (consumer .assignment ()).willReturn (Set .of (tp0 , tp1 ));
@@ -3462,7 +3459,6 @@ public void testCooperativeRebalance() throws Exception {
34623459 containerProps .setGroupId ("grp" );
34633460 containerProps .setClientId ("clientId" );
34643461 containerProps .setMessageListener ((MessageListener <?, ?>) msg -> { });
3465- Properties consumerProps = new Properties ();
34663462 KafkaMessageListenerContainer <Integer , String > container =
34673463 new KafkaMessageListenerContainer <>(cf , containerProps );
34683464 container .start ();
@@ -3606,7 +3602,6 @@ else if (call == 1) {
36063602 }).given (consumer ).subscribe (any (Collection .class ), any (ConsumerRebalanceListener .class ));
36073603 List <Map <TopicPartition , OffsetAndMetadata >> commits = new ArrayList <>();
36083604 AtomicBoolean firstCommit = new AtomicBoolean (true );
3609- AtomicInteger commitCount = new AtomicInteger ();
36103605 willAnswer (invoc -> {
36113606 commits .add (invoc .getArgument (0 , Map .class ));
36123607 if (!firstCommit .getAndSet (false )) {
@@ -3888,6 +3883,11 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
38883883 latch .countDown ();
38893884 return null ;
38903885 }).given (consumer ).commitSync (any (), any ());
3886+ CountDownLatch closeLatch = new CountDownLatch (1 );
3887+ willAnswer (inv -> {
3888+ closeLatch .countDown ();
3889+ return null ;
3890+ }).given (consumer ).close ();
38913891 TopicPartitionOffset [] topicPartition = new TopicPartitionOffset [] {
38923892 new TopicPartitionOffset ("foo" , 0 ) };
38933893
@@ -3902,6 +3902,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early
39023902 containerProps .setKafkaAwareTransactionManager (mock (KafkaAwareTransactionManager .class ));
39033903 }
39043904
3905+ CountDownLatch afterRecordLatch = new CountDownLatch (2 );
39053906 RecordInterceptor <Integer , String > recordInterceptor = spy (new RecordInterceptor <Integer , String >() {
39063907
39073908 @ Override
@@ -3912,6 +3913,10 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39123913 return null ;
39133914 }
39143915
3916+ public void afterRecord (ConsumerRecord <Integer , String > record , Consumer <Integer , String > consumer ) {
3917+ afterRecordLatch .countDown ();
3918+ }
3919+
39153920 });
39163921
39173922 KafkaMessageListenerContainer <Integer , String > container =
@@ -3920,6 +3925,9 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39203925 container .setInterceptBeforeTx (early );
39213926 container .start ();
39223927 assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
3928+ assertThat (afterRecordLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
3929+ container .stop ();
3930+ assertThat (closeLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
39233931
39243932 InOrder inOrder = inOrder (recordInterceptor , consumer );
39253933 inOrder .verify (recordInterceptor ).setupThreadState (eq (consumer ));
@@ -3946,12 +3954,12 @@ public ConsumerRecord<Integer, String> intercept(ConsumerRecord<Integer, String>
39463954 inOrder .verify (consumer ).commitSync (eq (Map .of (new TopicPartition ("foo" , 0 ), new OffsetAndMetadata (2L ))),
39473955 any (Duration .class ));
39483956 }
3949- container . stop ();
3957+ inOrder . verify ( consumer ). close ();
39503958 }
39513959
39523960 @ ParameterizedTest (name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}" )
39533961 @ ValueSource (booleans = { true , false })
3954- @ SuppressWarnings ({ "unchecked" , "deprecation" } )
3962+ @ SuppressWarnings ("unchecked" )
39553963 public void testInvokeBatchInterceptorAllSkipped (boolean early ) throws Exception {
39563964 ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
39573965 Consumer <Integer , String > consumer = mock (Consumer .class );
0 commit comments