Skip to content

Commit 261e73e

Browse files
garyrussellartembilan
authored andcommitted
GH-1519: Fix @sendto on @KafkaHandler
Resolves #1519 An empty `@SendTo` on a `@KafkaListener` method means send the reply to the `KafkaHeaders.REPLY_TOPIC` header. This default was not applied for class-level `@KafkaListener`s. **backport to 2.4.x, 2.3.x, 2.2.x** (I will do the back ports, because I expect conflicts).
1 parent ac7ab70 commit 261e73e

File tree

4 files changed

+70
-17
lines changed

4 files changed

+70
-17
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import org.apache.kafka.clients.producer.RecordMetadata;
2121
import org.apache.kafka.common.TopicPartition;
2222

23+
import org.springframework.expression.ParserContext;
24+
import org.springframework.expression.common.TemplateParserContext;
25+
import org.springframework.kafka.support.KafkaHeaders;
2326
import org.springframework.lang.Nullable;
2427

2528
/**
@@ -31,6 +34,12 @@
3134
*/
3235
public final class AdapterUtils {
3336

37+
/**
38+
* Parser context for runtime SpEL using ! as the template prefix.
39+
* @since 2.2.15
40+
*/
41+
public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
42+
3443
private AdapterUtils() {
3544
}
3645

@@ -67,4 +76,14 @@ public static ConsumerRecordMetadata buildConsumerRecordMetadata(Object data) {
6776
record.serializedValueSize()), record.timestampType());
6877
}
6978

79+
/**
80+
* Return the default expression when no SendTo value is present.
81+
* @return the expression.
82+
* @since 2.2.15
83+
*/
84+
public static String getDefaultReplyTopicExpression() {
85+
return PARSER_CONTEXT.getExpressionPrefix() + "source.headers['"
86+
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
87+
}
88+
7089
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import org.springframework.core.MethodParameter;
3535
import org.springframework.core.annotation.AnnotationUtils;
3636
import org.springframework.expression.Expression;
37-
import org.springframework.expression.ParserContext;
38-
import org.springframework.expression.common.TemplateParserContext;
3937
import org.springframework.expression.spel.standard.SpelExpressionParser;
4038
import org.springframework.kafka.KafkaException;
4139
import org.springframework.kafka.support.KafkaUtils;
@@ -61,8 +59,6 @@ public class DelegatingInvocableHandler {
6159

6260
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
6361

64-
private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
65-
6662
private final List<InvocableHandlerMethod> handlers;
6763

6864
private final ConcurrentMap<Class<?>, InvocableHandlerMethod> cachedHandlers = new ConcurrentHashMap<>();
@@ -205,16 +201,20 @@ protected InvocableHandlerMethod getHandlerForPayload(Class<? extends Object> pa
205201
private void setupReplyTo(InvocableHandlerMethod handler) {
206202
String replyTo = null;
207203
Method method = handler.getMethod();
204+
SendTo ann = null;
208205
if (method != null) {
209-
SendTo ann = AnnotationUtils.getAnnotation(method, SendTo.class);
206+
ann = AnnotationUtils.getAnnotation(method, SendTo.class);
210207
replyTo = extractSendTo(method.toString(), ann);
211208
}
212-
if (replyTo == null) {
213-
SendTo ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
209+
if (ann == null) {
210+
ann = AnnotationUtils.getAnnotation(this.bean.getClass(), SendTo.class);
214211
replyTo = extractSendTo(this.getBean().getClass().getSimpleName(), ann);
215212
}
213+
if (ann != null && replyTo == null) {
214+
replyTo = AdapterUtils.getDefaultReplyTopicExpression();
215+
}
216216
if (replyTo != null) {
217-
this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, PARSER_CONTEXT));
217+
this.handlerSendTo.put(handler, PARSER.parseExpression(replyTo, AdapterUtils.PARSER_CONTEXT));
218218
}
219219
this.handlerReturnsMessage.put(handler, KafkaUtils.returnTypeMessageOrCollectionOf(method));
220220
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@
3939
import org.springframework.core.log.LogAccessor;
4040
import org.springframework.expression.BeanResolver;
4141
import org.springframework.expression.Expression;
42-
import org.springframework.expression.ParserContext;
4342
import org.springframework.expression.common.LiteralExpression;
44-
import org.springframework.expression.common.TemplateParserContext;
4543
import org.springframework.expression.spel.standard.SpelExpressionParser;
4644
import org.springframework.expression.spel.support.StandardEvaluationContext;
4745
import org.springframework.expression.spel.support.StandardTypeConverter;
@@ -84,8 +82,6 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
8482

8583
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
8684

87-
private static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
88-
8985
/**
9086
* Message used when no conversion is needed.
9187
*/
@@ -201,11 +197,10 @@ public boolean isConversionNeeded() {
201197
public void setReplyTopic(String replyTopicParam) {
202198
String replyTopic = replyTopicParam;
203199
if (!StringUtils.hasText(replyTopic)) {
204-
replyTopic = PARSER_CONTEXT.getExpressionPrefix() + "source.headers['"
205-
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
200+
replyTopic = AdapterUtils.getDefaultReplyTopicExpression();
206201
}
207-
if (replyTopic.contains(PARSER_CONTEXT.getExpressionPrefix())) {
208-
this.replyTopicExpression = PARSER.parseExpression(replyTopic, PARSER_CONTEXT);
202+
if (replyTopic.contains(AdapterUtils.PARSER_CONTEXT.getExpressionPrefix())) {
203+
this.replyTopicExpression = PARSER.parseExpression(replyTopic, AdapterUtils.PARSER_CONTEXT);
209204
}
210205
else {
211206
this.replyTopicExpression = new LiteralExpression(replyTopic);

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@
103103
ReplyingKafkaTemplateTests.E_REPLY, ReplyingKafkaTemplateTests.E_REQUEST,
104104
ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST,
105105
ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST,
106-
ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST })
106+
ReplyingKafkaTemplateTests.H_REPLY, ReplyingKafkaTemplateTests.H_REQUEST,
107+
ReplyingKafkaTemplateTests.I_REPLY, ReplyingKafkaTemplateTests.I_REQUEST })
107108
public class ReplyingKafkaTemplateTests {
108109

109110
public static final String A_REPLY = "aReply";
@@ -138,6 +139,10 @@ public class ReplyingKafkaTemplateTests {
138139

139140
public static final String H_REQUEST = "hRequest";
140141

142+
public static final String I_REPLY = "iReply";
143+
144+
public static final String I_REQUEST = "iRequest";
145+
141146
@Autowired
142147
private EmbeddedKafkaBroker embeddedKafka;
143148

@@ -202,6 +207,24 @@ public void testMultiListenerMessageReturn() throws Exception {
202207
}
203208
}
204209

210+
@Test
211+
public void testHandlerReturn() throws Exception {
212+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(I_REPLY);
213+
try {
214+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
215+
ProducerRecord<Integer, String> record = new ProducerRecord<>(I_REQUEST, "foo");
216+
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, I_REPLY.getBytes()));
217+
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
218+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
219+
ConsumerRecord<Integer, String> consumerRecord = future.get(30, TimeUnit.SECONDS);
220+
assertThat(consumerRecord.value()).isEqualTo("FOO");
221+
}
222+
finally {
223+
template.stop();
224+
template.destroy();
225+
}
226+
}
227+
205228
@Test
206229
public void testMessageReturnNoHeadersProvidedByListener() throws Exception {
207230
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(H_REPLY);
@@ -628,6 +651,11 @@ public MultiMessageReturn mmr() {
628651
return new MultiMessageReturn();
629652
}
630653

654+
@Bean
655+
public HandlerReturn handlerReturn() {
656+
return new HandlerReturn();
657+
}
658+
631659
@KafkaListener(id = "def1", topics = { D_REQUEST, E_REQUEST, F_REQUEST })
632660
@SendTo // default REPLY_TOPIC header
633661
public String dListener1(String in) {
@@ -678,4 +706,15 @@ public Message<?> listen1(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] re
678706

679707
}
680708

709+
@KafkaListener(topics = I_REQUEST, groupId = I_REQUEST)
710+
public static class HandlerReturn {
711+
712+
@KafkaHandler
713+
@SendTo
714+
public String listen1(String in) {
715+
return in.toUpperCase();
716+
}
717+
718+
}
719+
681720
}

0 commit comments

Comments
 (0)