diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 6d74a1f765f6..95113f0dbf95 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -59,6 +59,7 @@ * @author Tomaz Fernandes * @author Andy Wilkinson * @author Scott Frederick + * @author Yanming Zhou * @since 1.5.0 */ @ConfigurationProperties("spring.kafka") @@ -337,6 +338,12 @@ public static class Consumer { */ private Integer maxPollRecords; + /** + * Maximum delay between invocations of poll() when using consumer group + * management. + */ + private Duration maxPollInterval; + /** * Additional consumer-specific properties used to configure the client. */ @@ -454,6 +461,14 @@ public void setMaxPollRecords(Integer maxPollRecords) { this.maxPollRecords = maxPollRecords; } + public Duration getMaxPollInterval() { + return this.maxPollInterval; + } + + public void setMaxPollInterval(Duration maxPollInterval) { + this.maxPollInterval = maxPollInterval; + } + public Map getProperties() { return this.properties; } @@ -483,6 +498,9 @@ public Map buildProperties(SslBundles sslBundles) { map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); + map.from(this::getMaxPollInterval) + .asInt(Duration::toMillis) + .to(properties.in(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)); return properties.with(this.ssl, this.security, this.properties, sslBundles); } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 18336ef30bd4..49e32c02369c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -115,6 +115,7 @@ * @author Andy Wilkinson * @author Phillip Webb * @author Scott Frederick + * @author Yanming Zhou */ class KafkaAutoConfigurationTests { @@ -132,8 +133,9 @@ void consumerProperties() { "spring.kafka.ssl.key-store-type=PKCS12", "spring.kafka.ssl.trust-store-location=classpath:tsLoc", "spring.kafka.ssl.trust-store-password=p3", "spring.kafka.ssl.trust-store-type=PKCS12", "spring.kafka.ssl.protocol=TLSv1.2", "spring.kafka.consumer.auto-commit-interval=123", - "spring.kafka.consumer.max-poll-records=42", "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.client-id=ccid", // test override common + "spring.kafka.consumer.max-poll-records=42", "spring.kafka.consumer.max-poll-interval=30s", + "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.client-id=ccid", + // test override common "spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.fetch-max-wait=456", "spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB", "spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234", @@ -172,6 +174,7 @@ void consumerProperties() { assertThat(configs).containsEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); assertThat(configs).containsEntry(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 42); + assertThat(configs).containsEntry(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000); assertThat(configs).containsEntry("foo", "bar"); assertThat(configs).containsEntry("baz", "qux"); assertThat(configs).containsEntry("foo.bar.baz", "qux.fiz.buz");