Skip to content

Commit 003432a

Browse files
garyrussellartembilan
authored andcommitted
Fix Request/Reply with ConsumerRecord<?, ?>
Message conversion is bypassed when consuming the raw `ConsumerRecord`. However, the request message is needed when returning a result (for reply topic determination, correlation, etc). **I will do the backports; I expect conflicts in the test.**
1 parent 9f56b5d commit 003432a

File tree

2 files changed

+31
-2
lines changed

2 files changed

+31
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ else if (methodParameter.hasParameterAnnotation(Header.class)) {
605605
}
606606
}
607607

608-
if (notConvertibleParameters == method.getParameterCount()) {
608+
if (notConvertibleParameters == method.getParameterCount() && method.getReturnType().equals(void.class)) {
609609
this.conversionNeeded = false;
610610
}
611611
boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters;

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@
109109
ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST,
110110
ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST,
111111
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST,
112-
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST })
112+
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST,
113+
ReplyingKafkaTemplateTests.K_REPLY, ReplyingKafkaTemplateTests.K_REQUEST })
113114
public class ReplyingKafkaTemplateTests {
114115

115116
public static final String A_REPLY = "aReply";
@@ -152,6 +153,10 @@ public class ReplyingKafkaTemplateTests {
152153

153154
public static final String J_REQUEST = "jRequest";
154155

156+
public static final String K_REPLY = "kReply";
157+
158+
public static final String K_REQUEST = "kRequest";
159+
155160
@Autowired
156161
private EmbeddedKafkaBroker embeddedKafka;
157162

@@ -198,6 +203,24 @@ public void testGood() throws Exception {
198203
}
199204
}
200205

206+
@Test
207+
void testConsumerRecord() throws Exception {
208+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(K_REPLY);
209+
try {
210+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
211+
Headers headers = new RecordHeaders();
212+
ProducerRecord<Integer, String> record = new ProducerRecord<>(K_REQUEST, null, null, null, "foo", headers);
213+
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
214+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
215+
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
216+
assertThat(consumerRecord.value()).isEqualTo("FOO");
217+
}
218+
finally {
219+
template.stop();
220+
template.destroy();
221+
}
222+
}
223+
201224
@Test
202225
public void testBadDeserialize() throws Exception {
203226
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(J_REPLY, true);
@@ -730,6 +753,12 @@ public String handleJ(String in) throws InterruptedException {
730753
return in.toUpperCase();
731754
}
732755

756+
@KafkaListener(id = K_REQUEST, topics = { K_REQUEST })
757+
@SendTo
758+
public String handleK(ConsumerRecord<String, String> in) {
759+
return in.value().toUpperCase();
760+
}
761+
733762
}
734763

735764
@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)

0 commit comments

Comments
 (0)