Skip to content

Commit 3d8de78

Browse files
garyrussellartembilan
authored andcommitted
Fix ErrorHandler for Consumer Errors
When a container-aware error handler is called from a `ConcurrentMessageListenerContainer`, when the consumer throws an exception (rather than the listener), the wrong container was passed in the container argument. The child container was passed in instead of the concurrent container.
1 parent 2414025 commit 3d8de78

File tree

2 files changed

+76
-1
lines changed

2 files changed

+76
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -851,7 +851,7 @@ protected void handleConsumerException(Exception e) {
851851
try {
852852
if (!this.isBatchListener && this.errorHandler != null) {
853853
this.errorHandler.handle(e, Collections.emptyList(), this.consumer,
854-
KafkaMessageListenerContainer.this);
854+
KafkaMessageListenerContainer.this.container);
855855
}
856856
else if (this.isBatchListener && this.batchErrorHandler != null) {
857857
this.batchErrorHandler.handle(e, new ConsumerRecords<K, V>(Collections.emptyMap()), this.consumer,
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.BDDMockito.given;
22+
import static org.mockito.BDDMockito.willAnswer;
23+
import static org.mockito.Mockito.mock;
24+
25+
import java.util.Collections;
26+
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
31+
import org.apache.kafka.clients.consumer.Consumer;
32+
import org.apache.kafka.clients.consumer.ConsumerRecords;
33+
import org.junit.jupiter.api.Test;
34+
35+
import org.springframework.kafka.core.ConsumerFactory;
36+
37+
/**
38+
* @author Gary Russell
39+
* @since 2.2.4
40+
*
41+
*/
42+
public class ConcurrentMessageListenerContainerMockTests {
43+
44+
@SuppressWarnings({ "rawtypes", "unchecked" })
45+
@Test
46+
public void testCorrectContainerForConsumerError() throws InterruptedException {
47+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
48+
final Consumer consumer = mock(Consumer.class);
49+
AtomicBoolean first = new AtomicBoolean(true);
50+
willAnswer(invocation -> {
51+
if (first.getAndSet(false)) {
52+
throw new RuntimeException("planned");
53+
}
54+
Thread.sleep(100);
55+
return new ConsumerRecords<>(Collections.emptyMap());
56+
}).given(consumer).poll(any());
57+
given(consumerFactory.createConsumer("grp", "", "-0", null)).willReturn(consumer);
58+
ContainerProperties containerProperties = new ContainerProperties("foo");
59+
containerProperties.setGroupId("grp");
60+
containerProperties.setMessageListener((MessageListener) record -> { });
61+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory,
62+
containerProperties);
63+
CountDownLatch latch = new CountDownLatch(1);
64+
AtomicReference<MessageListenerContainer> errorContainer = new AtomicReference<>();
65+
container.setErrorHandler((ContainerAwareErrorHandler) (thrownException, records, consumer1, ec) -> {
66+
errorContainer.set(ec);
67+
latch.countDown();
68+
});
69+
container.start();
70+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
71+
assertThat(errorContainer.get()).isSameAs(container);
72+
container.stop();
73+
}
74+
75+
}

0 commit comments

Comments
 (0)