Skip to content

Commit d26dba2

Browse files
lukasz-kaminskigaryrussell
authored andcommitted
GH-1336: Introduce authExceptionRetryInterval
Resolves #1336 - to retry on auth errors
1 parent 5f5ac21 commit d26dba2

File tree

6 files changed

+99
-9
lines changed

6 files changed

+99
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public class ConsumerProperties {
9797

9898
private Properties kafkaConsumerProperties = new Properties();
9999

100+
private Duration authorizationExceptionRetryInterval;
101+
100102
/**
101103
* Create properties for a container that will subscribe to the specified topics.
102104
* @param topics the topics.
@@ -304,6 +306,24 @@ public void setKafkaConsumerProperties(Properties kafkaConsumerProperties) {
304306
this.kafkaConsumerProperties = kafkaConsumerProperties;
305307
}
306308

309+
public Duration getAuthorizationExceptionRetryInterval() {
310+
return this.authorizationExceptionRetryInterval;
311+
}
312+
313+
/**
314+
* Set the interval between retries after {@code AuthorizationException} is thrown
315+
* by {@code KafkaConsumer}. By default the field is null and retries are disabled.
316+
* In such case the container will be stopped.
317+
*
318+
* The interval must be less than {@code max.poll.interval.ms} consumer property.
319+
*
320+
* @param authorizationExceptionRetryInterval the duration between retries
321+
* @since 2.3.5
322+
*/
323+
public void setAuthorizationExceptionRetryInterval(Duration authorizationExceptionRetryInterval) {
324+
this.authorizationExceptionRetryInterval = authorizationExceptionRetryInterval;
325+
}
326+
307327
@Override
308328
public String toString() {
309329
return "ConsumerProperties ["
@@ -322,7 +342,8 @@ protected final String renderProperties() {
322342
+ (this.commitCallback != null ? ", commitCallback=" + this.commitCallback : "")
323343
+ ", syncCommits=" + this.syncCommits
324344
+ (this.syncCommitTimeout != null ? ", syncCommitTimeout=" + this.syncCommitTimeout : "")
325-
+ (this.kafkaConsumerProperties.size() > 0 ? ", properties=" + this.kafkaConsumerProperties : "");
345+
+ (this.kafkaConsumerProperties.size() > 0 ? ", properties=" + this.kafkaConsumerProperties : ""
346+
+ ", authorizationExceptionRetryInterval=" + this.authorizationExceptionRetryInterval);
326347
}
327348

328349
private String renderTopics() {

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* @author Artem Bilan
3939
* @author Artem Yakshin
4040
* @author Johnny Lim
41+
* @author Lukasz Kaminski
4142
*/
4243
public class ContainerProperties extends ConsumerProperties {
4344

@@ -600,7 +601,7 @@ public boolean isSubBatchPerPartition() {
600601
* When using a batch message listener whether to dispatch records by partition (with
601602
* a transaction for each sub batch if transactions are in use) or the complete batch
602603
* received by the {@code poll()}. Useful when using transactions to enable zombie
603-
* fencing, by using a {code transactional.id} that is unique for each
604+
* fencing, by using a {@code transactional.id} that is unique for each
604605
* group/topic/partition.
605606
* @param subBatchPerPartition true for a separate transaction for each partition.
606607
* @since 2.3.2

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@
122122
* @author Chen Binbin
123123
* @author Yang Qiju
124124
* @author Tom van den Berge
125+
* @author Lukasz Kaminski
125126
*/
126127
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
127128
extends AbstractMessageListenerContainer<K, V> {
@@ -568,6 +569,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
568569

569570
private Map<TopicPartition, OffsetMetadata> definedPartitions;
570571

572+
private Duration authorizationExceptionRetryInterval = this.containerProperties.getAuthorizationExceptionRetryInterval();
573+
571574
private int count;
572575

573576
private long last = System.currentTimeMillis();
@@ -910,9 +913,19 @@ public void run() {
910913
break;
911914
}
912915
catch (AuthorizationException ae) {
913-
this.fatalError = true;
914-
ListenerConsumer.this.logger.error(ae, "Authorization Exception");
915-
break;
916+
if (this.authorizationExceptionRetryInterval == null) {
917+
ListenerConsumer.this.logger.error(ae, "Authorization Exception and no authorizationExceptionRetryInterval set");
918+
this.fatalError = true;
919+
break;
920+
}
921+
else {
922+
ListenerConsumer.this.logger.error(ae, "Authorization Exception, retrying in " + this.authorizationExceptionRetryInterval.toMillis() + " ms");
923+
// We can't pause/resume here, as KafkaConsumer doesn't take pausing
924+
// into account when committing, hence risk of being flooded with
925+
// GroupAuthorizationExceptions.
926+
// see: https://github.com/spring-projects/spring-kafka/pull/1337
927+
sleepFor(this.authorizationExceptionRetryInterval);
928+
}
916929
}
917930
catch (Exception e) {
918931
handleConsumerException(e);
@@ -949,7 +962,7 @@ protected void pollAndInvoke() {
949962
if (this.seeks.size() > 0) {
950963
processSeeks();
951964
}
952-
checkPaused();
965+
pauseConsumerIfNecessary();
953966
this.lastPoll = System.currentTimeMillis();
954967
this.polling.set(true);
955968
ConsumerRecords<K, V> records = doPoll();
@@ -963,7 +976,7 @@ protected void pollAndInvoke() {
963976
}
964977
return;
965978
}
966-
checkResumed();
979+
resumeConsumerIfNeccessary();
967980
debugRecords(records);
968981
if (records != null && records.count() > 0) {
969982
if (this.containerProperties.getIdleEventInterval() != null) {
@@ -1020,7 +1033,16 @@ private void debugRecords(ConsumerRecords<K, V> records) {
10201033
}
10211034
}
10221035

1023-
private void checkPaused() {
1036+
private void sleepFor(Duration duration) {
1037+
try {
1038+
TimeUnit.MILLISECONDS.sleep(duration.toMillis());
1039+
}
1040+
catch (InterruptedException e) {
1041+
this.logger.error(e, "Interrupted while sleeping");
1042+
}
1043+
}
1044+
1045+
private void pauseConsumerIfNecessary() {
10241046
if (!this.consumerPaused && isPaused()) {
10251047
this.consumer.pause(this.consumer.assignment());
10261048
this.consumerPaused = true;
@@ -1029,7 +1051,7 @@ private void checkPaused() {
10291051
}
10301052
}
10311053

1032-
private void checkResumed() {
1054+
private void resumeConsumerIfNeccessary() {
10331055
if (this.consumerPaused && !isPaused()) {
10341056
this.logger.debug(() -> "Resuming consumption from: " + this.consumer.paused());
10351057
Set<TopicPartition> paused = this.consumer.paused();

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.mockito.ArgumentMatchers.isNull;
2626
import static org.mockito.BDDMockito.given;
2727
import static org.mockito.BDDMockito.willAnswer;
28+
import static org.mockito.BDDMockito.willThrow;
2829
import static org.mockito.Mockito.inOrder;
2930
import static org.mockito.Mockito.mock;
3031
import static org.mockito.Mockito.never;
@@ -66,6 +67,7 @@
6667
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
6768
import org.apache.kafka.clients.producer.ProducerConfig;
6869
import org.apache.kafka.common.TopicPartition;
70+
import org.apache.kafka.common.errors.AuthorizationException;
6971
import org.apache.kafka.common.errors.WakeupException;
7072
import org.apache.kafka.common.serialization.IntegerDeserializer;
7173
import org.junit.jupiter.api.BeforeAll;
@@ -110,6 +112,7 @@
110112
* @author Martin Dam
111113
* @author Artem Bilan
112114
* @author Loic Talhouarne
115+
* @author Lukasz Kaminski
113116
*/
114117
@EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2,
115118
KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4,
@@ -2655,6 +2658,37 @@ public void testCommitErrorHandlerCalled() throws Exception {
26552658
container.stop();
26562659
}
26572660

2661+
@SuppressWarnings({ "unchecked", "rawtypes" })
2662+
@Test
2663+
void testFatalErrorOnAuthorizationException() throws Exception {
2664+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2665+
Consumer<Integer, String> consumer = mock(Consumer.class);
2666+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2667+
given(cf.getConfigurationProperties()).willReturn(new HashMap<>());
2668+
2669+
willThrow(AuthorizationException.class)
2670+
.given(consumer).poll(any());
2671+
2672+
ContainerProperties containerProps = new ContainerProperties(topic1);
2673+
containerProps.setGroupId("grp");
2674+
containerProps.setClientId("clientId");
2675+
containerProps.setMessageListener((MessageListener) r -> { });
2676+
KafkaMessageListenerContainer<Integer, String> container =
2677+
new KafkaMessageListenerContainer<>(cf, containerProps);
2678+
2679+
CountDownLatch stopping = new CountDownLatch(1);
2680+
2681+
container.setApplicationEventPublisher(e -> {
2682+
if (e instanceof ConsumerStoppingEvent) {
2683+
stopping.countDown();
2684+
}
2685+
});
2686+
2687+
container.start();
2688+
assertThat(stopping.await(10, TimeUnit.SECONDS)).isTrue();
2689+
container.stop();
2690+
}
2691+
26582692
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
26592693
Consumer<?, ?> consumer = spy(
26602694
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));

src/reference/asciidoc/kafka.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,13 @@ It does not apply if the container is configured to listen to a topic pattern (r
757757
Previously, the container threads looped within the `consumer.poll()` method waiting for the topic to appear while logging many messages.
758758
Aside from the logs, there was no indication that there was a problem.
759759

760+
As of version 2.3.5, a new container property called `authorizationExceptionRetryInterval` has been introduced.
761+
This causes the container to retry fetching messages after getting any `AuthorizationException` from `KafkaConsumer`.
762+
This can happen when, for example, the configured user is denied access to read certain topic.
763+
Defining `authorizationExceptionRetryInterval` should help the application to recover as soon as proper permissions are granted.
764+
765+
NOTE: By default, no interval is configured - authorization errors are considered fatal, which causes the container to stop.
766+
760767
[[using-ConcurrentMessageListenerContainer]]
761768
====== Using `ConcurrentMessageListenerContainer`
762769

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ It is now called after a timeout (as well as when records arrive); the second pa
3333

3434
See <<aggregating-request-reply>> for more information.
3535

36+
==== Listener Container
37+
38+
The `ContainerProperties` provides an `authorizationExceptionRetryInterval` option to let the listener container to retry after any `AuthorizationException` is thrown by the `KafkaConsumer`.
39+
See its JavaDocs and <<kafka-container>> for more information.
40+
3641
=== Migration Guide
3742

3843
* This release is essentially the same as the 2.3.x line, except it has been compiled against the 2.4 `kafka-clients` jar, due to a binary incompatibility.

0 commit comments

Comments
 (0)