@@ -140,6 +140,8 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
140140
141141 private static final int DEFAULT_ACK_TIME = 5000 ;
142142
143+ private static final Map <String , Object > CONSUMER_CONFIG_DEFAULTS = ConsumerConfig .configDef ().defaultValues ();
144+
143145 private final AbstractMessageListenerContainer <K , V > thisOrParentContainer ;
144146
145147 private final TopicPartitionOffset [] topicPartitions ;
@@ -433,8 +435,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
433435
434436 private static final String ERROR_HANDLER_THREW_AN_EXCEPTION = "Error handler threw an exception" ;
435437
436- private static final int SIXTY = 60 ;
437-
438438 private static final String UNCHECKED = "unchecked" ;
439439
440440 private static final String RAWTYPES = "rawtypes" ;
@@ -592,7 +592,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
592592
593593 @ SuppressWarnings (UNCHECKED )
594594 ListenerConsumer (GenericMessageListener <?> listener , ListenerType listenerType ) {
595- Properties consumerProperties = new Properties ( this . containerProperties . getKafkaConsumerProperties () );
595+ Properties consumerProperties = propertiesFromProperties ( );
596596 checkGroupInstance (consumerProperties , KafkaMessageListenerContainer .this .consumerFactory );
597597 this .autoCommit = determineAutoCommit (consumerProperties );
598598 this .consumer =
@@ -676,6 +676,20 @@ else if (listener instanceof MessageListener) {
676676 this .subBatchPerPartition = setupSubBatchPerPartition ();
677677 }
678678
679+ private Properties propertiesFromProperties () {
680+ Properties propertyOverrides = this .containerProperties .getKafkaConsumerProperties ();
681+ Properties props = new Properties ();
682+ props .putAll (propertyOverrides );
683+ Set <String > stringPropertyNames = propertyOverrides .stringPropertyNames ();
684+ // User might have provided properties as defaults
685+ stringPropertyNames .forEach ((name ) -> {
686+ if (!props .contains (name )) {
687+ props .setProperty (name , propertyOverrides .getProperty (name ));
688+ }
689+ });
690+ return props ;
691+ }
692+
679693 String getClientId () {
680694 return this .clientId ;
681695 }
@@ -770,9 +784,9 @@ else if (timeout instanceof String) {
770784 this .logger .warn (() -> "Unexpected type: " + timeoutToLog .getClass ().getName ()
771785 + " in property '"
772786 + ConsumerConfig .MAX_POLL_INTERVAL_MS_CONFIG
773- + "'; defaulting to 30 seconds ." );
787+ + "'; using Kafka default ." );
774788 }
775- return Duration . ofSeconds ( SIXTY / 2 ). toMillis (); // Default 'max.poll.interval.ms' is 30 seconds
789+ return ( int ) CONSUMER_CONFIG_DEFAULTS . get ( ConsumerConfig . MAX_POLL_INTERVAL_MS_CONFIG );
776790 }
777791 }
778792
@@ -855,12 +869,12 @@ else if (timeout instanceof String) {
855869 this .logger .warn (() -> "Unexpected type: " + timeoutToLog .getClass ().getName ()
856870 + " in property '"
857871 + ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG
858- + "'; defaulting to 60 seconds for sync commit timeouts" );
872+ + "'; defaulting to Kafka default for sync commit timeouts" );
859873 }
860- return Duration .ofSeconds (SIXTY );
874+ return Duration
875+ .ofMillis ((int ) CONSUMER_CONFIG_DEFAULTS .get (ConsumerConfig .DEFAULT_API_TIMEOUT_MS_CONFIG ));
861876 }
862877 }
863-
864878 }
865879
866880 private Object findDeserializerClass (Map <String , Object > props , boolean isValue ) {
0 commit comments