Skip to content

Commit 2414025

Browse files
committed
Reduce method complexity
1 parent ff174e0 commit 2414025

File tree

2 files changed

+41
-30
lines changed

2 files changed

+41
-30
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -442,19 +442,7 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
442442
if (StringUtils.hasText(autoStartup)) {
443443
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
444444
}
445-
String[] propertyStrings = kafkaListener.properties();
446-
if (propertyStrings.length > 0) {
447-
Properties properties = new Properties();
448-
for (String property : propertyStrings) {
449-
try {
450-
properties.load(new StringReader(resolveExpressionAsString(property, "property")));
451-
}
452-
catch (IOException e) {
453-
this.logger.error("Failed to load property " + property + ", continuing...", e);
454-
}
455-
}
456-
endpoint.setConsumerProperties(properties);
457-
}
445+
resolveKafkaProperties(endpoint, kafkaListener.properties());
458446

459447
KafkaListenerContainerFactory<?> factory = null;
460448
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
@@ -481,6 +469,21 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
481469
}
482470
}
483471

472+
private void resolveKafkaProperties(MethodKafkaListenerEndpoint<?, ?> endpoint, String[] propertyStrings) {
473+
if (propertyStrings.length > 0) {
474+
Properties properties = new Properties();
475+
for (String property : propertyStrings) {
476+
try {
477+
properties.load(new StringReader(resolveExpressionAsString(property, "property")));
478+
}
479+
catch (IOException e) {
480+
this.logger.error("Failed to load property " + property + ", continuing...", e);
481+
}
482+
}
483+
endpoint.setConsumerProperties(properties);
484+
}
485+
}
486+
484487
private String getEndpointId(KafkaListener kafkaListener) {
485488
if (StringUtils.hasText(kafkaListener.id())) {
486489
return resolve(kafkaListener.id());

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -130,26 +130,34 @@ protected KafkaConsumer<K, V> createKafkaConsumer(@Nullable String groupId, @Nul
130130
return createKafkaConsumer(this.configs);
131131
}
132132
else {
133-
Map<String, Object> modifiedConfigs = new HashMap<>(this.configs);
134-
if (groupId != null) {
135-
modifiedConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
136-
}
137-
if (shouldModifyClientId) {
138-
modifiedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,
139-
(overrideClientIdPrefix ? clientIdPrefix
140-
: modifiedConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG)) + clientIdSuffix);
141-
}
142-
if (properties != null) {
143-
properties.forEach((k, v) -> {
144-
if (!k.equals(ConsumerConfig.CLIENT_ID_CONFIG) && !k.equals(ConsumerConfig.GROUP_ID_CONFIG)) {
145-
modifiedConfigs.put((String) k, v);
146-
}
147-
});
148-
}
149-
return createKafkaConsumer(modifiedConfigs);
133+
return createConsumerWithAdjustedProperties(groupId, clientIdPrefix, properties, overrideClientIdPrefix,
134+
clientIdSuffix, shouldModifyClientId);
150135
}
151136
}
152137

138+
private KafkaConsumer<K, V> createConsumerWithAdjustedProperties(String groupId, String clientIdPrefix,
139+
Properties properties, boolean overrideClientIdPrefix, String clientIdSuffix,
140+
boolean shouldModifyClientId) {
141+
142+
Map<String, Object> modifiedConfigs = new HashMap<>(this.configs);
143+
if (groupId != null) {
144+
modifiedConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
145+
}
146+
if (shouldModifyClientId) {
147+
modifiedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,
148+
(overrideClientIdPrefix ? clientIdPrefix
149+
: modifiedConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG)) + clientIdSuffix);
150+
}
151+
if (properties != null) {
152+
properties.forEach((k, v) -> {
153+
if (!k.equals(ConsumerConfig.CLIENT_ID_CONFIG) && !k.equals(ConsumerConfig.GROUP_ID_CONFIG)) {
154+
modifiedConfigs.put((String) k, v);
155+
}
156+
});
157+
}
158+
return createKafkaConsumer(modifiedConfigs);
159+
}
160+
153161
protected KafkaConsumer<K, V> createKafkaConsumer(Map<String, Object> configs) {
154162
return new KafkaConsumer<>(configs, this.keyDeserializer, this.valueDeserializer);
155163
}

0 commit comments

Comments
 (0)