Skip to content

Commit 0e7a9c6

Browse files
committed
Fix BatchErrorHandler container argument
When passing in the container to batch error handlers, if there is a parent container, it should be passed in, e.g. if the error handler wants to stop the entire container
1 parent 491df2b commit 0e7a9c6

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -877,7 +877,7 @@ protected void handleConsumerException(Exception e) {
877877
}
878878
else if (this.isBatchListener && this.batchErrorHandler != null) {
879879
this.batchErrorHandler.handle(e, new ConsumerRecords<K, V>(Collections.emptyMap()), this.consumer,
880-
KafkaMessageListenerContainer.this);
880+
KafkaMessageListenerContainer.this.container);
881881
}
882882
else {
883883
this.logger.error("Consumer exception", e);

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.anyString;
2122
import static org.mockito.BDDMockito.given;
2223
import static org.mockito.BDDMockito.willAnswer;
2324
import static org.mockito.Mockito.inOrder;
@@ -34,6 +35,7 @@
3435
import java.util.concurrent.TimeUnit;
3536
import java.util.concurrent.atomic.AtomicBoolean;
3637
import java.util.concurrent.atomic.AtomicInteger;
38+
import java.util.concurrent.atomic.AtomicReference;
3739

3840
import org.apache.kafka.clients.consumer.Consumer;
3941
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
@@ -116,6 +118,45 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
116118
assertThat(((ListenerExecutionFailedException) this.config.ehException).getGroupId()).isEqualTo(CONTAINER_ID);
117119
}
118120

121+
@SuppressWarnings({ "unchecked", "rawtypes" })
122+
@Test
123+
public void verifyCorrectContainer() throws InterruptedException {
124+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
125+
final Consumer consumer = mock(Consumer.class);
126+
AtomicBoolean first = new AtomicBoolean(true);
127+
willAnswer(invocation -> {
128+
if (first.getAndSet(false)) {
129+
throw new IllegalStateException("intentional");
130+
}
131+
Thread.sleep(50);
132+
return new ConsumerRecords(Collections.emptyMap());
133+
}).given(consumer).poll(any());
134+
given(consumerFactory.createConsumer(anyString(), anyString(), anyString(), any()))
135+
.willReturn(consumer);
136+
ContainerProperties containerProperties = new ContainerProperties("foo");
137+
containerProperties.setGroupId("grp");
138+
containerProperties.setMessageListener((BatchMessageListener) record -> { });
139+
containerProperties.setMissingTopicsFatal(false);
140+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory,
141+
containerProperties);
142+
AtomicReference<MessageListenerContainer> parent = new AtomicReference<>();
143+
CountDownLatch latch = new CountDownLatch(1);
144+
container.setBatchErrorHandler(new ContainerAwareBatchErrorHandler() {
145+
146+
@Override
147+
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
148+
MessageListenerContainer container) {
149+
150+
parent.set(container);
151+
latch.countDown();
152+
}
153+
});
154+
container.start();
155+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
156+
container.stop();
157+
assertThat(parent.get()).isSameAs(container);
158+
}
159+
119160
@Configuration
120161
@EnableKafka
121162
public static class Config {

0 commit comments

Comments
 (0)