Skip to content

Commit 043cd79

Browse files
garyrussellartembilan
authored andcommitted
GH-1855: Fix Call to RetryingBatchErrorHandler
Resolves #1855 Unsupported op when a consumer exception occurs. **cherry-pick to all 2.x.x (down to and including 2.3.x)**
1 parent 6c7083c commit 043cd79

File tree

3 files changed

+41
-2
lines changed

3 files changed

+41
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1253,7 +1253,8 @@ protected void handleConsumerException(Exception e) {
12531253
}
12541254
else if (this.isBatchListener && this.batchErrorHandler != null) {
12551255
this.batchErrorHandler.handle(e, new ConsumerRecords<K, V>(Collections.emptyMap()), this.consumer,
1256-
KafkaMessageListenerContainer.this.thisOrParentContainer);
1256+
KafkaMessageListenerContainer.this.thisOrParentContainer, () -> {
1257+
});
12571258
}
12581259
else {
12591260
this.logger.error(e, "Consumer exception");

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ public RetryingBatchErrorHandler(BackOff backOff, @Nullable ConsumerRecordRecove
8383
public void handle(Exception thrownException, ConsumerRecords<?, ?> records,
8484
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
8585

86+
if (records.count() == 0) {
87+
LOGGER.error(thrownException, "Called with no records; consumer exception");
88+
return;
89+
}
8690
BackOffExecution execution = this.backOff.start();
8791
long nextBackOff = execution.nextBackOff();
8892
String failed = null;

spring-kafka/src/test/java/org/springframework/kafka/listener/RetryingBatchErrorHandlerIntegrationTests.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,10 @@
1717
package org.springframework.kafka.listener;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.isNull;
22+
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.Mockito.mock;
2024

2125
import java.util.List;
2226
import java.util.Map;
@@ -28,14 +32,17 @@
2832
import org.apache.kafka.clients.consumer.Consumer;
2933
import org.apache.kafka.clients.consumer.ConsumerConfig;
3034
import org.apache.kafka.clients.consumer.ConsumerRecord;
35+
import org.apache.kafka.clients.consumer.ConsumerRecords;
3136
import org.apache.kafka.common.TopicPartition;
3237
import org.junit.jupiter.api.BeforeAll;
3338
import org.junit.jupiter.api.Test;
3439

40+
import org.springframework.kafka.core.ConsumerFactory;
3541
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3642
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3743
import org.springframework.kafka.core.KafkaTemplate;
3844
import org.springframework.kafka.event.ConsumerStoppedEvent;
45+
import org.springframework.kafka.support.TopicPartitionOffset;
3946
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4047
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
4148
import org.springframework.kafka.test.context.EmbeddedKafka;
@@ -209,4 +216,31 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
209216
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
210217
}
211218

219+
@SuppressWarnings("unchecked")
220+
@Test
221+
void consumerEx() throws InterruptedException {
222+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
223+
Consumer<Integer, String> consumer = mock(Consumer.class);
224+
given(consumer.poll(any())).willThrow(new RuntimeException("test"));
225+
given(cf.createConsumer(any(), any(), isNull(), any())).willReturn(consumer);
226+
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0));
227+
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
228+
containerProps);
229+
CountDownLatch called = new CountDownLatch(1);
230+
container.setBatchErrorHandler(new RetryingBatchErrorHandler() {
231+
232+
@Override
233+
public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
234+
MessageListenerContainer container, Runnable invokeListener) {
235+
236+
called.countDown();
237+
super.handle(thrownException, records, consumer, container, invokeListener);
238+
}
239+
});
240+
container.setupMessageListener((BatchMessageListener<Integer, String>) (recs -> { }));
241+
container.start();
242+
assertThat(called.await(10, TimeUnit.SECONDS)).isTrue();
243+
container.stop();
244+
}
245+
212246
}

0 commit comments

Comments
 (0)