Skip to content

Commit 899469a

Browse files
committed
GH-3124 Propagate Kafka container setting to binder container
Resolves #3124
1 parent 2916acf commit 899469a

File tree

1 file changed

+48
-0
lines changed

1 file changed

+48
-0
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454

5555
import org.springframework.beans.factory.DisposableBean;
5656
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
57+
import org.springframework.boot.kafka.autoconfigure.KafkaProperties;
5758
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
5859
import org.springframework.cloud.stream.binder.BinderHeaders;
5960
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
@@ -634,6 +635,9 @@ private <K, V> MessageProducer createConsumerEndpointCaptureHelper(
634635
: new ContainerProperties(topics)
635636
: new ContainerProperties(topicPartitionOffsets);
636637

638+
// See GH-3124 - Container setting needs to be propagated
639+
this.propagateContainerProperties(containerProperties);
640+
// End GH-3124
637641
containerProperties.setObservationEnabled(this.configurationProperties.isEnableObservation());
638642

639643
KafkaAwareTransactionManager<byte[], byte[]> transMan = transactionManager(
@@ -803,6 +807,50 @@ else if (customizer instanceof KafkaListenerContainerCustomizer c) {
803807
return kafkaMessageDrivenChannelAdapter;
804808
}
805809

810+
private void propagateContainerProperties(ContainerProperties containerProperties) {
811+
KafkaProperties.Listener listener = this.configurationProperties.getKafkaProperties().getListener();
812+
if (listener.getAckMode() != null) {
813+
containerProperties.setAckMode(listener.getAckMode());
814+
}
815+
if (listener.getAsyncAcks() != null) {
816+
containerProperties.setAsyncAcks(listener.getAsyncAcks());
817+
}
818+
819+
if (listener.getAckCount() != null) {
820+
containerProperties.setAckCount(listener.getAckCount());
821+
}
822+
if (listener.getAckTime() != null) {
823+
containerProperties.setAckTime(listener.getAckTime().toMillis());
824+
}
825+
826+
if (listener.getPollTimeout() != null) {
827+
containerProperties.setPollTimeout(listener.getPollTimeout().toMillis());
828+
}
829+
830+
if (listener.getNoPollThreshold() != null) {
831+
containerProperties.setNoPollThreshold(listener.getNoPollThreshold());
832+
}
833+
834+
if (listener.getIdleBetweenPolls() != null) {
835+
containerProperties.setIdleBetweenPolls(listener.getIdleBetweenPolls().toMillis());
836+
}
837+
838+
if (listener.getIdleEventInterval() != null) {
839+
containerProperties.setIdleEventInterval(listener.getIdleEventInterval().toMillis());
840+
}
841+
842+
if (listener.getMonitorInterval() != null) {
843+
containerProperties.setMonitorInterval(Math.toIntExact(listener.getMonitorInterval().toSeconds()));
844+
}
845+
846+
if (listener.getLogContainerConfig() != null) {
847+
containerProperties.setLogContainerConfig(listener.getLogContainerConfig());
848+
}
849+
850+
containerProperties.setMissingTopicsFatal(listener.isMissingTopicsFatal());
851+
containerProperties.setStopImmediate(listener.isImmediateStop());
852+
}
853+
806854
private BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> createDestResolver(
807855
KafkaConsumerProperties extension) {
808856

0 commit comments

Comments
 (0)