@@ -4019,7 +4019,6 @@ public ConsumerRecords<Integer, String> intercept(ConsumerRecords<Integer, Strin
40194019 inOrder .verify (interceptor ).setupThreadState (eq (consumer ));
40204020 inOrder .verify (consumer ).poll (Duration .ofMillis (ContainerProperties .DEFAULT_POLL_TIMEOUT ));
40214021 inOrder .verify (interceptor ).intercept (any (), eq (consumer ));
4022- inOrder .verify (interceptor ).success (any (), eq (consumer ));
40234022 inOrder .verify (consumer ).commitSync (eq (Map .of (new TopicPartition ("foo" , 0 ), new OffsetAndMetadata (2L ))),
40244023 any (Duration .class ));
40254024 container .stop ();
@@ -4240,6 +4239,80 @@ public void clearThreadState(Consumer<?, ?> consumer) {
42404239 container .stop ();
42414240 }
42424241
4242+ @ Test
4243+ @ SuppressWarnings ("unchecked" )
4244+ public void invokeBatchInterceptorSuccessFailureOnRetry () throws Exception {
4245+ ConsumerFactory <Integer , String > cf = mock (ConsumerFactory .class );
4246+ Consumer <Integer , String > consumer = mock (Consumer .class );
4247+ given (cf .createConsumer (eq ("grp" ), eq ("clientId" ), isNull (), any ())).willReturn (consumer );
4248+ ConsumerRecord <Integer , String > firstRecord = new ConsumerRecord <>("test-topic" , 0 , 0L , 1 , "data-1" );
4249+ ConsumerRecord <Integer , String > secondRecord = new ConsumerRecord <>("test-topic" , 0 , 1L , 1 , "data-2" );
4250+ Map <TopicPartition , List <ConsumerRecord <Integer , String >>> records = new HashMap <>();
4251+ records .put (new TopicPartition ("test-topic" , 0 ), List .of (firstRecord , secondRecord ));
4252+ ConsumerRecords <Integer , String > consumerRecords = new ConsumerRecords <>(records );
4253+ AtomicInteger invocation = new AtomicInteger (0 );
4254+ given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
4255+ if (invocation .getAndIncrement () == 0 ) {
4256+ return consumerRecords ;
4257+ }
4258+ else {
4259+ // Subsequent polls after the first one returns empty records.
4260+ return new ConsumerRecords <Integer , String >(Map .of ());
4261+ }
4262+ });
4263+ TopicPartitionOffset [] topicPartition = new TopicPartitionOffset [] {
4264+ new TopicPartitionOffset ("test-topic" , 0 ) };
4265+
4266+ CountDownLatch latch = new CountDownLatch (4 ); // 3 failures, 1 success
4267+ BatchMessageListener <Integer , String > batchMessageListener = spy (
4268+ new BatchMessageListener <Integer , String >() { // Cannot be lambda: Mockito doesn't mock final classes
4269+
4270+ @ Override
4271+ public void onMessage (List <ConsumerRecord <Integer , String >> data ) {
4272+ latch .countDown ();
4273+ if (latch .getCount () > 0 ) {
4274+ throw new IllegalArgumentException ("Failed record" );
4275+ }
4276+ }
4277+
4278+ });
4279+
4280+ ContainerProperties containerProps = new ContainerProperties (topicPartition );
4281+ containerProps .setGroupId ("grp" );
4282+ containerProps .setAckMode (ContainerProperties .AckMode .BATCH );
4283+ containerProps .setMissingTopicsFatal (false );
4284+ containerProps .setMessageListener (batchMessageListener );
4285+ containerProps .setClientId ("clientId" );
4286+
4287+ BatchInterceptor <Integer , String > batchInterceptor = spy (new BatchInterceptor <Integer , String >() {
4288+
4289+ @ Override
4290+ public ConsumerRecords <Integer , String > intercept (ConsumerRecords <Integer , String > records ,
4291+ Consumer <Integer , String > consumer ) {
4292+ return records ;
4293+ }
4294+
4295+ });
4296+
4297+ KafkaMessageListenerContainer <Integer , String > container =
4298+ new KafkaMessageListenerContainer <>(cf , containerProps );
4299+ container .setCommonErrorHandler (new DefaultErrorHandler (new FixedBackOff (0 , 3 )));
4300+ container .setBatchInterceptor (batchInterceptor );
4301+ container .start ();
4302+ assertThat (latch .await (10 , TimeUnit .SECONDS )).isTrue ();
4303+
4304+ InOrder inOrder = inOrder (batchInterceptor , batchMessageListener , consumer );
4305+ for (int i = 0 ; i < 3 ; i ++) {
4306+ inOrder .verify (batchInterceptor ).intercept (eq (consumerRecords ), eq (consumer ));
4307+ inOrder .verify (batchMessageListener ).onMessage (eq (List .of (firstRecord , secondRecord )));
4308+ inOrder .verify (batchInterceptor ).failure (eq (consumerRecords ), any (), eq (consumer ));
4309+ }
4310+ inOrder .verify (batchInterceptor ).intercept (eq (consumerRecords ), eq (consumer ));
4311+ inOrder .verify (batchMessageListener ).onMessage (eq (List .of (firstRecord , secondRecord )));
4312+ inOrder .verify (batchInterceptor ).success (eq (consumerRecords ), eq (consumer ));
4313+ container .stop ();
4314+ }
4315+
42434316 @ Test
42444317 public void testOffsetAndMetadataWithoutProvider () throws InterruptedException {
42454318 testOffsetAndMetadata (null , new OffsetAndMetadata (1 ));
0 commit comments