Skip to content

Commit 09e775f

Browse files
garyrussellartembilan
authored andcommitted
GH-1331: KMLC - treat auth. exceptions as fatal
Resolves #1331 Authorization errors create a log storm; stop the container for such exceptions in the same way we do for `NoOffsetForPartitionException`. **cherry-pick to 2.2.x**
1 parent ceb8a99 commit 09e775f

File tree

2 files changed

+37
-1
lines changed

2 files changed

+37
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.kafka.common.Metric;
5454
import org.apache.kafka.common.MetricName;
5555
import org.apache.kafka.common.TopicPartition;
56+
import org.apache.kafka.common.errors.AuthorizationException;
5657
import org.apache.kafka.common.errors.WakeupException;
5758

5859
import org.springframework.context.ApplicationContext;
@@ -908,6 +909,11 @@ public void run() {
908909
ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");
909910
break;
910911
}
912+
catch (AuthorizationException ae) {
913+
this.fatalError = true;
914+
ListenerConsumer.this.logger.error(ae, "Authorization Exception");
915+
break;
916+
}
911917
catch (Exception e) {
912918
handleConsumerException(e);
913919
}
@@ -1091,7 +1097,7 @@ private void wrapUp() {
10911097
}
10921098
}
10931099
else {
1094-
this.logger.error("No offset and no reset policy; stopping container");
1100+
this.logger.error("Fatal consumer exception; stopping container");
10951101
KafkaMessageListenerContainer.this.stop();
10961102
}
10971103
this.monitorTask.cancel(true);

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
4949
import org.apache.kafka.clients.producer.Producer;
5050
import org.apache.kafka.common.TopicPartition;
51+
import org.apache.kafka.common.errors.GroupAuthorizationException;
5152
import org.junit.jupiter.api.DisplayName;
5253
import org.junit.jupiter.api.Test;
5354

@@ -149,6 +150,35 @@ void testCorrectContainerForConsumerError() throws InterruptedException {
149150
container.stop();
150151
}
151152

153+
@SuppressWarnings({ "rawtypes", "unchecked" })
154+
@Test
155+
void testConsumerExitWhenNotAuthorized() throws InterruptedException {
156+
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);
157+
final Consumer consumer = mock(Consumer.class);
158+
willAnswer(invocation -> {
159+
Thread.sleep(100);
160+
throw new GroupAuthorizationException("grp");
161+
}).given(consumer).poll(any());
162+
CountDownLatch latch = new CountDownLatch(1);
163+
willAnswer(invocation -> {
164+
latch.countDown();
165+
return null;
166+
}).given(consumer).close();
167+
given(consumerFactory.createConsumer("grp", "", "-0", KafkaTestUtils.defaultPropertyOverrides()))
168+
.willReturn(consumer);
169+
ContainerProperties containerProperties = new ContainerProperties("foo");
170+
containerProperties.setGroupId("grp");
171+
containerProperties.setMessageListener((MessageListener) record -> { });
172+
containerProperties.setMissingTopicsFatal(false);
173+
containerProperties.setShutdownTimeout(10);
174+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory,
175+
containerProperties);
176+
container.start();
177+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
178+
verify(consumer).close();
179+
container.stop();
180+
}
181+
152182
@SuppressWarnings({ "rawtypes", "unchecked" })
153183
@Test
154184
@DisplayName("Seek on TL callback when idle")

0 commit comments

Comments
 (0)