Skip to content

Commit d60df68

Browse files
committed
Fix Request/Reply with ConsumerRecord<?, ?>
Message conversion is bypassed when consuming the raw `ConsumerRecord`. However, the request message is needed when returning a result (for reply topic determination, correlation, etc). **I will do the backports; I expect conflicts in the test.**
1 parent fd41e50 commit d60df68

File tree

2 files changed

+31
-2
lines changed

2 files changed

+31
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ else if (methodParameter.hasParameterAnnotation(Header.class)) {
580580
}
581581
}
582582

583-
if (notConvertibleParameters == method.getParameterCount()) {
583+
if (notConvertibleParameters == method.getParameterCount() && method.getReturnType().equals(void.class)) {
584584
this.conversionNeeded = false;
585585
}
586586
boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters;

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@
105105
ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST,
106106
ReplyingKafkaTemplateTests.E_REPLY, ReplyingKafkaTemplateTests.E_REQUEST,
107107
ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST,
108-
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST })
108+
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
109+
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST })
109110
public class ReplyingKafkaTemplateTests {
110111

111112
public static final String A_REPLY = "aReply";
@@ -140,6 +141,10 @@ public class ReplyingKafkaTemplateTests {
140141

141142
public static final String J_REQUEST = "jRequest";
142143

144+
public static final String K_REPLY = "kReply";
145+
146+
public static final String K_REQUEST = "kRequest";
147+
143148
@Autowired
144149
private EmbeddedKafkaBroker embeddedKafka;
145150

@@ -186,6 +191,24 @@ public void testGood() throws Exception {
186191
}
187192
}
188193

194+
@Test
195+
void testConsumerRecord() throws Exception {
196+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(K_REPLY);
197+
try {
198+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
199+
Headers headers = new RecordHeaders();
200+
ProducerRecord<Integer, String> record = new ProducerRecord<>(K_REQUEST, null, null, null, "foo", headers);
201+
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
202+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
203+
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
204+
assertThat(consumerRecord.value()).isEqualTo("FOO");
205+
}
206+
finally {
207+
template.stop();
208+
template.destroy();
209+
}
210+
}
211+
189212
@Test
190213
public void testBadDeserialize() throws Exception {
191214
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(J_REPLY, true);
@@ -668,6 +691,12 @@ public void gListener(Message<String> in) {
668691
public String handleJ(String in) throws InterruptedException {
669692
return in.toUpperCase();
670693
}
694+
@KafkaListener(id = K_REQUEST, topics = { K_REQUEST })
695+
696+
@SendTo
697+
public String handleK(ConsumerRecord<String, String> in) {
698+
return in.value().toUpperCase();
699+
}
671700

672701
}
673702

0 commit comments

Comments
 (0)