Skip to content

Commit d70c205

Browse files
committed
GH-1331: KMLC - treat auth. exceptions as fatal
Resolves #1331 Authorization errors create a log storm; stop the container for such exceptions in the same way we do for `NoOffsetForPartitionException`. **cherry-pick to 2.2.x**
1 parent 1be02a3 commit d70c205

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.kafka.common.Metric;
5454
import org.apache.kafka.common.MetricName;
5555
import org.apache.kafka.common.TopicPartition;
56+
import org.apache.kafka.common.errors.AuthorizationException;
5657
import org.apache.kafka.common.errors.WakeupException;
5758
import org.apache.kafka.common.header.Header;
5859
import org.apache.kafka.common.header.Headers;
@@ -711,6 +712,11 @@ public void run() {
711712
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
712713
break;
713714
}
715+
catch (AuthorizationException ae) {
716+
this.fatalError = true;
717+
ListenerConsumer.this.logger.error("Authorization Exception", ae);
718+
break;
719+
}
714720
catch (Exception e) {
715721
handleConsumerException(e);
716722
}
@@ -848,7 +854,7 @@ public void wrapUp() {
848854
}
849855
}
850856
else {
851-
this.logger.error("No offset and no reset policy; stopping container");
857+
this.logger.error("Fatal consumer exception; stopping container");
852858
KafkaMessageListenerContainer.this.stop();
853859
}
854860
this.monitorTask.cancel(true);

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.eq;
2122
import static org.mockito.BDDMockito.given;
2223
import static org.mockito.BDDMockito.willAnswer;
2324
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.verify;
2426

2527
import java.util.Collections;
2628
import java.util.concurrent.CountDownLatch;
@@ -30,6 +32,7 @@
3032

3133
import org.apache.kafka.clients.consumer.Consumer;
3234
import org.apache.kafka.clients.consumer.ConsumerRecords;
35+
import org.apache.kafka.common.errors.GroupAuthorizationException;
3336
import org.junit.jupiter.api.Test;
3437

3538
import org.springframework.kafka.core.ConsumerFactory;
@@ -72,4 +75,33 @@ public void testCorrectContainerForConsumerError() throws InterruptedException {
7275
container.stop();
7376
}
7477

78+
@SuppressWarnings({ "rawtypes", "unchecked" })
79+
@Test
80+
void testConsumerExitWhenNotAuthorized() throws InterruptedException {
81+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
82+
final Consumer consumer = mock(Consumer.class);
83+
willAnswer(invocation -> {
84+
Thread.sleep(100);
85+
throw new GroupAuthorizationException("grp");
86+
}).given(consumer).poll(any());
87+
CountDownLatch latch = new CountDownLatch(1);
88+
willAnswer(invocation -> {
89+
latch.countDown();
90+
return null;
91+
}).given(consumer).close();
92+
given(consumerFactory.createConsumer(eq("grp"), eq(""), eq("-0"), any()))
93+
.willReturn(consumer);
94+
ContainerProperties containerProperties = new ContainerProperties("foo");
95+
containerProperties.setGroupId("grp");
96+
containerProperties.setMessageListener((MessageListener) record -> { });
97+
containerProperties.setMissingTopicsFatal(false);
98+
containerProperties.setShutdownTimeout(10);
99+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory,
100+
containerProperties);
101+
container.start();
102+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
103+
verify(consumer).close();
104+
container.stop();
105+
}
106+
75107
}

0 commit comments

Comments
 (0)