Skip to content

Commit c3d5eb2

Browse files
committed
GH-3526: Configure deserializers against modified configs
Fixes: #3526
1 parent 367f40f commit c3d5eb2

File tree

2 files changed

+33
-44
lines changed

2 files changed

+33
-44
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 31 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -169,42 +169,8 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,
169169

170170
this.configs = new ConcurrentHashMap<>(configs);
171171
this.configureDeserializers = configureDeserializers;
172-
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
173-
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
174-
}
175-
176-
private Supplier<Deserializer<K>> keyDeserializerSupplier(
177-
@Nullable Supplier<Deserializer<K>> keyDeserializerSupplier) {
178-
179-
if (!this.configureDeserializers) {
180-
return keyDeserializerSupplier;
181-
}
182-
return keyDeserializerSupplier == null
183-
? () -> null
184-
: () -> {
185-
Deserializer<K> deserializer = keyDeserializerSupplier.get();
186-
if (deserializer != null) {
187-
deserializer.configure(this.configs, true);
188-
}
189-
return deserializer;
190-
};
191-
}
192-
193-
private Supplier<Deserializer<V>> valueDeserializerSupplier(
194-
@Nullable Supplier<Deserializer<V>> valueDeserializerSupplier) {
195-
196-
if (!this.configureDeserializers) {
197-
return valueDeserializerSupplier;
198-
}
199-
return valueDeserializerSupplier == null
200-
? () -> null
201-
: () -> {
202-
Deserializer<V> deserializer = valueDeserializerSupplier.get();
203-
if (deserializer != null) {
204-
deserializer.configure(this.configs, false);
205-
}
206-
return deserializer;
207-
};
172+
this.keyDeserializerSupplier = keyDeserializerSupplier;
173+
this.valueDeserializerSupplier = valueDeserializerSupplier;
208174
}
209175

210176
@Override
@@ -219,7 +185,7 @@ public void setBeanName(String name) {
219185
* @param keyDeserializer the deserializer.
220186
*/
221187
public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
222-
this.keyDeserializerSupplier = keyDeserializerSupplier(() -> keyDeserializer);
188+
this.keyDeserializerSupplier = () -> keyDeserializer;
223189
}
224190

225191
/**
@@ -229,7 +195,7 @@ public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
229195
* @param valueDeserializer the value deserializer.
230196
*/
231197
public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
232-
this.valueDeserializerSupplier = valueDeserializerSupplier(() -> valueDeserializer);
198+
this.valueDeserializerSupplier = () -> valueDeserializer;
233199
}
234200

235201
/**
@@ -240,7 +206,7 @@ public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
240206
* @since 2.8
241207
*/
242208
public void setKeyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializerSupplier) {
243-
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
209+
this.keyDeserializerSupplier = keyDeserializerSupplier;
244210
}
245211

246212
/**
@@ -251,7 +217,7 @@ public void setKeyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializer
251217
* @since 2.8
252218
*/
253219
public void setValueDeserializerSupplier(Supplier<Deserializer<V>> valueDeserializerSupplier) {
254-
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
220+
this.valueDeserializerSupplier = valueDeserializerSupplier;
255221
}
256222

257223
/**
@@ -499,14 +465,36 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
499465
this.applicationContext = applicationContext;
500466
}
501467

468+
@Nullable
469+
private Deserializer<K> keyDeserializer(Map<String, Object> configs) {
470+
Deserializer<K> deserializer =
471+
this.keyDeserializerSupplier != null
472+
? this.keyDeserializerSupplier.get()
473+
: null;
474+
if (deserializer != null && this.configureDeserializers) {
475+
deserializer.configure(configs, true);
476+
}
477+
return deserializer;
478+
}
479+
480+
@Nullable
481+
private Deserializer<V> valueDeserializer(Map<String, Object> configs) {
482+
Deserializer<V> deserializer =
483+
this.valueDeserializerSupplier != null
484+
? this.valueDeserializerSupplier.get()
485+
: null;
486+
if (deserializer != null && this.configureDeserializers) {
487+
deserializer.configure(configs, false);
488+
}
489+
return deserializer;
490+
}
491+
502492
protected class ExtendedKafkaConsumer extends KafkaConsumer<K, V> {
503493

504494
private String idForListeners;
505495

506496
protected ExtendedKafkaConsumer(Map<String, Object> configProps) {
507-
super(configProps,
508-
DefaultKafkaConsumerFactory.this.keyDeserializerSupplier.get(),
509-
DefaultKafkaConsumerFactory.this.valueDeserializerSupplier.get());
497+
super(configProps, keyDeserializer(configProps), valueDeserializer(configProps));
510498

511499
if (!DefaultKafkaConsumerFactory.this.listeners.isEmpty()) {
512500
Iterator<MetricName> metricIterator = metrics().keySet().iterator();

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,10 +503,11 @@ public void consumerRemoved(String id, Consumer consumer) {
503503
void configDeserializer() {
504504
Deserializer key = mock(Deserializer.class);
505505
Deserializer value = mock(Deserializer.class);
506-
Map<String, Object> config = new HashMap<>();
506+
Map<String, Object> config = KafkaTestUtils.consumerProps("mockGroup", "false", this.embeddedKafka);
507507
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(config, key, value);
508508
Deserializer keyDeserializer = cf.getKeyDeserializer();
509509
assertThat(keyDeserializer).isSameAs(key);
510+
cf.createKafkaConsumer(config);
510511
verify(key).configure(config, true);
511512
Deserializer valueDeserializer = cf.getValueDeserializer();
512513
assertThat(valueDeserializer).isSameAs(value);

0 commit comments

Comments
 (0)