Skip to content

Commit b6fbe58

Browse files
authored
GH-1552: Support more consumer properties override
Resolves #1552 Previously, non-String-valued consumer property overrides were ignored (and logged). This is due to the unfortunate design of `Properties` being a subclass of `HashTable` in that only String-valued properties can be accessed in the nested `default` properties `HashTable`. - when creating the merged `Properties` to create the consumer, copy any non-String-valued properties from the default hash table to the new properties object - in the DKCF, iterate over the hash table to find any other properties not already found - log an error if we find non-String-valued default properties * Code Polishing.
1 parent 820a125 commit b6fbe58

File tree

4 files changed

+51
-14
lines changed

4 files changed

+51
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.Collections;
21+
import java.util.Enumeration;
2122
import java.util.HashMap;
2223
import java.util.Iterator;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Properties;
27+
import java.util.Set;
2628
import java.util.function.Supplier;
2729

2830
import org.aopalliance.aop.Advice;
@@ -283,23 +285,39 @@ private Consumer<K, V> createConsumerWithAdjustedProperties(String groupId, Stri
283285
: modifiedConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG)) + clientIdSuffix);
284286
}
285287
if (properties != null) {
286-
checkForUnsupportedProps(properties);
287-
properties.stringPropertyNames()
288+
Set<String> stringPropertyNames = properties.stringPropertyNames(); // to get any nested default Properties
289+
stringPropertyNames
288290
.stream()
289291
.filter(name -> !name.equals(ConsumerConfig.CLIENT_ID_CONFIG)
290292
&& !name.equals(ConsumerConfig.GROUP_ID_CONFIG))
291293
.forEach(name -> modifiedConfigs.put(name, properties.getProperty(name)));
294+
properties.entrySet().stream()
295+
.filter(entry -> !entry.getKey().equals(ConsumerConfig.CLIENT_ID_CONFIG)
296+
&& !entry.getKey().equals(ConsumerConfig.GROUP_ID_CONFIG)
297+
&& !stringPropertyNames.contains(entry.getKey())
298+
&& entry.getKey() instanceof String)
299+
.forEach(entry -> modifiedConfigs.put((String) entry.getKey(), entry.getValue()));
300+
checkInaccessible(properties, modifiedConfigs);
292301
}
293302
return createKafkaConsumer(modifiedConfigs);
294303
}
295304

296-
private void checkForUnsupportedProps(Properties properties) {
297-
properties.forEach((key, value) -> {
298-
if (!(key instanceof String) || !(value instanceof String)) {
299-
LOGGER.warn(() -> "Property override for '" + key.toString()
300-
+ "' ignored, only <String, String> properties are supported; value is a(n) " + value.getClass());
305+
private void checkInaccessible(Properties properties, Map<String, Object> modifiedConfigs) {
306+
List<Object> inaccessible = null;
307+
for (Enumeration<?> propertyNames = properties.propertyNames(); propertyNames.hasMoreElements(); ) {
308+
Object nextElement = propertyNames.nextElement();
309+
if (!modifiedConfigs.containsKey(nextElement)) {
310+
if (inaccessible == null) {
311+
inaccessible = new ArrayList<>();
312+
}
313+
inaccessible.add(nextElement);
301314
}
302-
});
315+
}
316+
if (inaccessible != null) {
317+
LOGGER.error("Non-String-valued default properties are inaccessible; use String values or "
318+
+ "make them explicit properties instead of defaults: "
319+
+ inaccessible);
320+
}
303321
}
304322

305323
@SuppressWarnings("resource")

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,8 @@ public void setCommitLogLevel(LogIfLevelEnabled.Level commitLogLevel) {
308308
/**
309309
* Get the consumer properties that will be merged with the consumer properties
310310
* provided by the consumer factory; properties here will supersede any with the same
311-
* name(s) in the consumer factory.
311+
* name(s) in the consumer factory. You can add non-String-valued properties, but the
312+
* property name (hashtable key) must be String; all others will be ignored.
312313
* {@code group.id} and {@code client.id} are ignored.
313314
* @return the properties.
314315
* @see org.apache.kafka.clients.consumer.ConsumerConfig
@@ -324,8 +325,7 @@ public Properties getKafkaConsumerProperties() {
324325
* provided by the consumer factory; properties here will supersede any with the same
325326
* name(s) in the consumer factory.
326327
* {@code group.id} and {@code client.id} are ignored.
327-
* Property values must be {@link String}s; only properties returned by
328-
* {@link Properties#stringPropertyNames()} will be applied.
328+
* Property keys must be {@link String}s.
329329
* @param kafkaConsumerProperties the properties.
330330
* @see org.apache.kafka.clients.consumer.ConsumerConfig
331331
* @see #setGroupId(String)

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -592,7 +592,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
592592

593593
@SuppressWarnings(UNCHECKED)
594594
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
595-
Properties consumerProperties = new Properties(this.containerProperties.getKafkaConsumerProperties());
595+
Properties consumerProperties = propertiesFromProperties();
596596
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
597597
this.autoCommit = determineAutoCommit(consumerProperties);
598598
this.consumer =
@@ -676,6 +676,19 @@ else if (listener instanceof MessageListener) {
676676
this.subBatchPerPartition = setupSubBatchPerPartition();
677677
}
678678

679+
private Properties propertiesFromProperties() {
680+
Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties();
681+
Properties props = new Properties(propertyOverrides);
682+
Set<String> stringPropertyNames = props.stringPropertyNames();
683+
// Copy non-string-valued properties from the default hash table to the new properties object
684+
propertyOverrides.forEach((key, value) -> {
685+
if (key instanceof String && !stringPropertyNames.contains(key)) {
686+
props.put(key, value);
687+
}
688+
});
689+
return props;
690+
}
691+
679692
String getClientId() {
680693
return this.clientId;
681694
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,10 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
227227
String clientIdSuffixArg, Properties properties) {
228228

229229
overrides.set(properties);
230-
return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
230+
Consumer<Integer, String> created = super.createKafkaConsumer(groupId, clientIdPrefix,
231+
clientIdSuffixArg, properties);
232+
assertThat(KafkaTestUtils.getPropertyValue(created, "requestTimeoutMs", Long.class)).isEqualTo(23000L);
233+
return created;
231234
}
232235

233236
};
@@ -240,8 +243,11 @@ protected Consumer<Integer, String> createKafkaConsumer(String groupId, String c
240243
listenerThreadNames.add(Thread.currentThread().getName());
241244
latch.countDown();
242245
});
243-
Properties consumerProperties = new Properties();
246+
Properties nestedProps = new Properties();
247+
nestedProps.put("weCantAccessThisOne", 42);
248+
Properties consumerProperties = new Properties(nestedProps);
244249
consumerProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
250+
consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 23000);
245251
containerProps.setKafkaConsumerProperties(consumerProperties);
246252
final CountDownLatch rebalancePartitionsAssignedLatch = new CountDownLatch(2);
247253
containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {

0 commit comments

Comments
 (0)