Skip to content

Commit 689126e

Browse files
authored
Fix idleBetweenPolls max value calculation
- Wrong default used when `max.poll.interval.ms` is not specified - 30s Vs 300s - Value set in `@KafkaListener.properties` ignored for this calculatio and default used instead - Also use actual default for `ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG` **I will back port - expecting conflicts** * Remove unused constant
1 parent 7da8092 commit 689126e

File tree

2 files changed

+30
-18
lines changed

2 files changed

+30
-18
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
140140

141141
private static final int DEFAULT_ACK_TIME = 5000;
142142

143+
private static final Map<String, Object> CONSUMER_CONFIG_DEFAULTS = ConsumerConfig.configDef().defaultValues();
144+
143145
private final AbstractMessageListenerContainer<K, V> thisOrParentContainer;
144146

145147
private final TopicPartitionOffset[] topicPartitions;
@@ -433,8 +435,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
433435

434436
private static final String ERROR_HANDLER_THREW_AN_EXCEPTION = "Error handler threw an exception";
435437

436-
private static final int SIXTY = 60;
437-
438438
private static final String UNCHECKED = "unchecked";
439439

440440
private static final String RAWTYPES = "rawtypes";
@@ -678,12 +678,13 @@ else if (listener instanceof MessageListener) {
678678

679679
private Properties propertiesFromProperties() {
680680
Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties();
681-
Properties props = new Properties(propertyOverrides);
682-
Set<String> stringPropertyNames = props.stringPropertyNames();
683-
// Copy non-string-valued properties from the default hash table to the new properties object
684-
propertyOverrides.forEach((key, value) -> {
685-
if (key instanceof String && !stringPropertyNames.contains(key)) {
686-
props.put(key, value);
681+
Properties props = new Properties();
682+
props.putAll(propertyOverrides);
683+
Set<String> stringPropertyNames = propertyOverrides.stringPropertyNames();
684+
// User might have provided properties as defaults
685+
stringPropertyNames.forEach((name) -> {
686+
if (!props.contains(name)) {
687+
props.setProperty(name, propertyOverrides.getProperty(name));
687688
}
688689
});
689690
return props;
@@ -783,9 +784,9 @@ else if (timeout instanceof String) {
783784
this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName()
784785
+ " in property '"
785786
+ ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
786-
+ "'; defaulting to 30 seconds.");
787+
+ "'; using Kafka default.");
787788
}
788-
return Duration.ofSeconds(SIXTY / 2).toMillis(); // Default 'max.poll.interval.ms' is 30 seconds
789+
return (int) CONSUMER_CONFIG_DEFAULTS.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
789790
}
790791
}
791792

@@ -868,12 +869,12 @@ else if (timeout instanceof String) {
868869
this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName()
869870
+ " in property '"
870871
+ ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
871-
+ "'; defaulting to 60 seconds for sync commit timeouts");
872+
+ "'; defaulting to Kafka default for sync commit timeouts");
872873
}
873-
return Duration.ofSeconds(SIXTY);
874+
return Duration
875+
.ofMillis((int) CONSUMER_CONFIG_DEFAULTS.get(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
874876
}
875877
}
876-
877878
}
878879

879880
private Object findDeserializerClass(Map<String, Object> props, boolean isValue) {

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import org.springframework.kafka.listener.ContainerProperties;
9999
import org.springframework.kafka.listener.ContainerProperties.AckMode;
100100
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
101+
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
101102
import org.springframework.kafka.listener.ListenerExecutionFailedException;
102103
import org.springframework.kafka.listener.MessageListenerContainer;
103104
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
@@ -235,6 +236,10 @@ public void testAnonymous() {
235236
List<?> containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class);
236237
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumerGroupId"))
237238
.isEqualTo(DEFAULT_TEST_GROUP_ID);
239+
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.maxPollInterval"))
240+
.isEqualTo(300000L);
241+
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.syncCommitTimeout"))
242+
.isEqualTo(Duration.ofSeconds(60));
238243
container.stop();
239244
}
240245

@@ -370,20 +375,24 @@ public void testAutoStartup() {
370375
assertThat(listenerContainer.getContainerProperties().getSyncCommitTimeout()).isNull();
371376
this.registry.start();
372377
assertThat(listenerContainer.isRunning()).isTrue();
373-
assertThat(((ConcurrentMessageListenerContainer<?, ?>) listenerContainer)
378+
KafkaMessageListenerContainer<?, ?> kafkaMessageListenerContainer =
379+
((ConcurrentMessageListenerContainer<?, ?>) listenerContainer)
374380
.getContainers()
375-
.get(0)
381+
.get(0);
382+
assertThat(kafkaMessageListenerContainer
376383
.getContainerProperties().getSyncCommitTimeout())
377-
.isEqualTo(Duration.ofSeconds(60));
384+
.isEqualTo(Duration.ofSeconds(59));
378385
assertThat(listenerContainer.getContainerProperties().getSyncCommitTimeout())
379-
.isEqualTo(Duration.ofSeconds(60));
386+
.isEqualTo(Duration.ofSeconds(59));
380387
listenerContainer.stop();
381388
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.syncCommits", Boolean.class))
382389
.isFalse();
383390
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.commitCallback"))
384391
.isNotNull();
385392
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.consumerRebalanceListener"))
386393
.isNotNull();
394+
assertThat(KafkaTestUtils.getPropertyValue(kafkaMessageListenerContainer, "listenerConsumer.maxPollInterval"))
395+
.isEqualTo(301000L);
387396
}
388397

389398
@Test
@@ -1683,7 +1692,9 @@ static class Listener implements ConsumerSeekAware {
16831692
volatile CustomMethodArgument customMethodArgument;
16841693

16851694
@KafkaListener(id = "manualStart", topics = "manualStart",
1686-
containerFactory = "kafkaAutoStartFalseListenerContainerFactory")
1695+
containerFactory = "kafkaAutoStartFalseListenerContainerFactory",
1696+
properties = { ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + ":301000",
1697+
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG + ":59000" })
16871698
public void manualStart(String foo) {
16881699
}
16891700

0 commit comments

Comments
 (0)