Skip to content

Commit d47d178

Browse files
garyrussellartembilan
authored andcommitted
GH-1778: Properly Apply Client Post Processors
Resolves #1778 `Consumer`/`Producer` post processors are `Function`s and the result should be used; it was previously discarded. **cherry-pick to 2.6.x, 2.5.x**
1 parent a0b88dc commit d47d178

File tree

4 files changed

+25
-14
lines changed

4 files changed

+25
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
324324
}
325325
}
326326
for (ConsumerPostProcessor<K, V> pp : this.postProcessors) {
327-
pp.apply(kafkaConsumer);
327+
kafkaConsumer = pp.apply(kafkaConsumer);
328328
}
329329
return kafkaConsumer;
330330
}

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -738,9 +738,11 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
738738
}
739739

740740
protected Producer<K, V> createRawProducer(Map<String, Object> rawConfigs) {
741-
KafkaProducer<K, V> kafkaProducer =
741+
Producer<K, V> kafkaProducer =
742742
new KafkaProducer<>(rawConfigs, this.keySerializerSupplier.get(), this.valueSerializerSupplier.get());
743-
this.postProcessors.forEach(pp -> pp.apply(kafkaProducer));
743+
for (ProducerPostProcessor<K, V> pp : this.postProcessors) {
744+
kafkaProducer = pp.apply(kafkaProducer);
745+
}
744746
return kafkaProducer;
745747
}
746748

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaConsumerFactoryTests.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import java.util.Properties;
3030
import java.util.concurrent.CountDownLatch;
3131
import java.util.concurrent.TimeUnit;
32-
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.concurrent.atomic.AtomicReference;
3333
import java.util.stream.Collectors;
3434
import java.util.stream.Stream;
3535

@@ -42,6 +42,7 @@
4242
import org.apache.kafka.common.serialization.StringDeserializer;
4343
import org.junit.jupiter.api.Test;
4444

45+
import org.springframework.aop.framework.ProxyFactory;
4546
import org.springframework.aop.support.AopUtils;
4647
import org.springframework.beans.factory.annotation.Autowired;
4748
import org.springframework.context.annotation.Configuration;
@@ -335,10 +336,14 @@ public void testNestedTxProducerIsCached() throws Exception {
335336
KafkaTemplate<Integer, String> templateTx = new KafkaTemplate<>(pfTx);
336337
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
337338
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
338-
AtomicBoolean ppCalled = new AtomicBoolean();
339+
AtomicReference<Consumer<Integer, String>> wrapped = new AtomicReference<>();
339340
cf.addPostProcessor(consumer -> {
340-
ppCalled.set(true);
341-
return consumer;
341+
ProxyFactory prox = new ProxyFactory();
342+
prox.setTarget(consumer);
343+
@SuppressWarnings("unchecked")
344+
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
345+
wrapped.set(proxy);
346+
return proxy;
342347
});
343348
ContainerProperties containerProps = new ContainerProperties("txCache1");
344349
CountDownLatch latch = new CountDownLatch(1);
@@ -360,13 +365,13 @@ public void testNestedTxProducerIsCached() throws Exception {
360365
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
361366
assertThat(KafkaTestUtils.getPropertyValue(pfTx, "cache", Map.class)).hasSize(1);
362367
assertThat(pfTx.getCache()).hasSize(1);
368+
assertThat(KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer")).isSameAs(wrapped.get());
363369
}
364370
finally {
365371
container.stop();
366372
pf.destroy();
367373
pfTx.destroy();
368374
}
369-
assertThat(ppCalled.get()).isTrue();
370375
}
371376

372377
@SuppressWarnings("unchecked")

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,7 +38,6 @@
3838
import java.util.UUID;
3939
import java.util.concurrent.CountDownLatch;
4040
import java.util.concurrent.TimeUnit;
41-
import java.util.concurrent.atomic.AtomicBoolean;
4241
import java.util.concurrent.atomic.AtomicInteger;
4342
import java.util.concurrent.atomic.AtomicReference;
4443
import java.util.function.Supplier;
@@ -63,6 +62,7 @@
6362
import org.junit.jupiter.api.BeforeAll;
6463
import org.junit.jupiter.api.Test;
6564

65+
import org.springframework.aop.framework.ProxyFactory;
6666
import org.springframework.kafka.KafkaException;
6767
import org.springframework.kafka.support.Acknowledgment;
6868
import org.springframework.kafka.support.CompositeProducerListener;
@@ -119,10 +119,14 @@ public static void tearDown() {
119119
void testTemplate() {
120120
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
121121
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
122-
AtomicBoolean ppCalled = new AtomicBoolean();
122+
AtomicReference<Producer<Integer, String>> wrapped = new AtomicReference<>();
123123
pf.addPostProcessor(prod -> {
124-
ppCalled.set(true);
125-
return prod;
124+
ProxyFactory prox = new ProxyFactory();
125+
prox.setTarget(prod);
126+
@SuppressWarnings("unchecked")
127+
Producer<Integer, String> proxy = (Producer<Integer, String>) prox.getProxy();
128+
wrapped.set(proxy);
129+
return proxy;
126130
});
127131
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
128132

@@ -165,8 +169,8 @@ void testTemplate() {
165169
List<PartitionInfo> partitions = template.partitionsFor(INT_KEY_TOPIC);
166170
assertThat(partitions).isNotNull();
167171
assertThat(partitions).hasSize(2);
172+
assertThat(KafkaTestUtils.getPropertyValue(pf.createProducer(), "delegate")).isSameAs(wrapped.get());
168173
pf.destroy();
169-
assertThat(ppCalled.get()).isTrue();
170174
}
171175

172176
@Test

0 commit comments

Comments
 (0)