Skip to content

Commit e8cafc1

Browse files
garyrussellartembilan
authored andcommitted
GH-847: Support placeholders in @sendto
Resolves #847 **cherry-pick to 2.1.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java
1 parent f32a1b8 commit e8cafc1

File tree

5 files changed

+63
-9
lines changed

5 files changed

+63
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
1919
import java.lang.reflect.Method;
2020
import java.util.Arrays;
2121

22+
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
2223
import org.springframework.core.annotation.AnnotationUtils;
2324
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
2425
import org.springframework.kafka.listener.MessageListenerContainer;
@@ -110,7 +111,14 @@ private String getReplyTopic() {
110111
throw new IllegalStateException("Invalid @" + SendTo.class.getSimpleName() + " annotation on '"
111112
+ method + "' one destination must be set (got " + Arrays.toString(destinations) + ")");
112113
}
113-
return destinations.length == 1 ? resolve(destinations[0]) : "";
114+
String topic = destinations.length == 1 ? destinations[0] : "";
115+
if (getBeanFactory() instanceof ConfigurableListableBeanFactory) {
116+
topic = ((ConfigurableListableBeanFactory) getBeanFactory()).resolveEmbeddedValue(topic);
117+
if (topic != null) {
118+
topic = resolve(topic);
119+
}
120+
}
121+
return topic;
114122
}
115123
}
116124
return null;

spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte
7979
}
8080
}
8181
DelegatingInvocableHandler delegatingHandler = new DelegatingInvocableHandler(invocableHandlerMethods,
82-
defaultHandler, getBean(), getResolver(), getBeanExpressionContext());
82+
defaultHandler, getBean(), getResolver(), getBeanExpressionContext(), getBeanFactory());
8383
return new HandlerAdapter(delegatingHandler);
8484
}
8585

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.ConcurrentMap;
2828

29+
import org.springframework.beans.factory.BeanFactory;
2930
import org.springframework.beans.factory.config.BeanExpressionContext;
3031
import org.springframework.beans.factory.config.BeanExpressionResolver;
32+
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
3133
import org.springframework.core.MethodParameter;
3234
import org.springframework.core.annotation.AnnotationUtils;
3335
import org.springframework.expression.Expression;
@@ -72,6 +74,8 @@ public class DelegatingInvocableHandler {
7274

7375
private final BeanExpressionContext beanExpressionContext;
7476

77+
private final ConfigurableListableBeanFactory beanFactory;
78+
7579
/**
7680
* Construct an instance with the supplied handlers for the bean.
7781
* @param handlers the handlers.
@@ -81,6 +85,7 @@ public class DelegatingInvocableHandler {
8185
*/
8286
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers, Object bean,
8387
BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) {
88+
8489
this(handlers, null, bean, beanExpressionResolver, beanExpressionContext);
8590
}
8691

@@ -96,11 +101,33 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers, Object
96101
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
97102
@Nullable InvocableHandlerMethod defaultHandler,
98103
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) {
104+
105+
this(handlers, defaultHandler, bean, beanExpressionResolver, beanExpressionContext, null);
106+
}
107+
108+
/**
109+
* Construct an instance with the supplied handlers for the bean.
110+
* @param handlers the handlers.
111+
* @param defaultHandler the default handler.
112+
* @param bean the bean.
113+
* @param beanExpressionResolver the resolver.
114+
* @param beanExpressionContext the context.
115+
* @param beanFactory the bean factory.
116+
* @since 2.1.11
117+
*/
118+
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
119+
@Nullable InvocableHandlerMethod defaultHandler,
120+
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext,
121+
@Nullable BeanFactory beanFactory) {
122+
99123
this.handlers = new ArrayList<>(handlers);
100124
this.defaultHandler = defaultHandler;
101125
this.bean = bean;
102126
this.resolver = beanExpressionResolver;
103127
this.beanExpressionContext = beanExpressionContext;
128+
this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory
129+
? (ConfigurableListableBeanFactory) beanFactory
130+
: null;
104131
}
105132

106133
/**
@@ -172,7 +199,13 @@ private String extractSendTo(String element, SendTo ann) {
172199
throw new IllegalStateException("Invalid @" + SendTo.class.getSimpleName() + " annotation on '"
173200
+ element + "' one destination must be set (got " + Arrays.toString(destinations) + ")");
174201
}
175-
replyTo = destinations.length == 1 ? resolve(destinations[0]) : null;
202+
replyTo = destinations.length == 1 ? destinations[0] : null;
203+
if (replyTo != null && this.beanFactory != null) {
204+
replyTo = this.beanFactory.resolveEmbeddedValue(replyTo);
205+
if (replyTo != null) {
206+
replyTo = resolve(replyTo);
207+
}
208+
}
176209
}
177210
return replyTo;
178211
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ public class EnableKafkaIntegrationTests {
122122
@ClassRule
123123
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true,
124124
"annotated1", "annotated2", "annotated3",
125-
"annotated4", "annotated5", "annotated6", "annotated7", "annotated8", "annotated9", "annotated10",
125+
"annotated4", "annotated5", "annotated6", "annotated7", "annotated8", "annotated8reply",
126+
"annotated9", "annotated10",
126127
"annotated11", "annotated12", "annotated13", "annotated14", "annotated15", "annotated16", "annotated17",
127128
"annotated18", "annotated19", "annotated20", "annotated21", "annotated21reply", "annotated22",
128129
"annotated22reply", "annotated23", "annotated23reply", "annotated24", "annotated24reply",
@@ -306,11 +307,19 @@ public void testInterface() throws Exception {
306307

307308
@Test
308309
public void testMulti() throws Exception {
310+
Map<String, Object> consumerProps = new HashMap<>(this.consumerFactory.getConfigurationProperties());
311+
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "testReplying");
312+
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
313+
Consumer<Integer, String> consumer = cf.createConsumer();
314+
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "annotated8reply");
309315
template.send("annotated8", 0, 1, "foo");
310316
template.send("annotated8", 0, 1, null);
311317
template.flush();
312318
assertThat(this.multiListener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
313319
assertThat(this.multiListener.latch2.await(60, TimeUnit.SECONDS)).isTrue();
320+
ConsumerRecord<Integer, String> reply = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply");
321+
assertThat(reply.value()).isEqualTo("OK");
322+
consumer.close();
314323
}
315324

316325
@Test
@@ -1361,7 +1370,7 @@ public Collection<String> replyingBatchListener(List<String> in) {
13611370

13621371
@KafkaListener(id = "replyingListenerWithErrorHandler", topics = "annotated23",
13631372
errorHandler = "replyErrorHandler")
1364-
@SendTo("annotated23reply")
1373+
@SendTo("${foo:annotated23reply}")
13651374
public String replyingListenerWithErrorHandler(String in) {
13661375
throw new RuntimeException("return this");
13671376
}
@@ -1476,8 +1485,10 @@ public void bar(@NonNull String bar) {
14761485
}
14771486

14781487
@KafkaHandler
1479-
public void bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
1488+
@SendTo("#{'${foo:annotated8reply}'}")
1489+
public String bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
14801490
this.latch2.countDown();
1491+
return "OK";
14811492
}
14821493

14831494
public void foo(String bar) {

src/reference/asciidoc/kafka.adoc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,7 +1101,9 @@ The `#root` object for the evaluation has 3 properties:
11011101
- request - the inbound `ConsumerRecord` (or `ConsumerRecords` object for a batch listener))
11021102
- source - the `org.springframework.messaging.Message<?>` converted from the `request`.
11031103
- result - the method return result.
1104-
- `@SendTo` (no properties) - this is treated as `!{source.headers['kafka_replyTopic']}` (since version _2.1.3_).
1104+
- `@SendTo` (no properties) - this is treated as `!{source.headers['kafka_replyTopic']}` (since version 2.1.3).
1105+
1106+
Starting with versions 2.1.11, 2.2.1, property placeholders are resolved within `@SendTo` values.
11051107

11061108
The result of the expression evaluation must be a `String` representing the topic name.
11071109

@@ -1113,7 +1115,7 @@ public String replyingListener(String in) {
11131115
...
11141116
}
11151117
1116-
@KafkaListener(topics = "annotated22")
1118+
@KafkaListener(topics = "${some.property:annotated22}")
11171119
@SendTo("#{myBean.replyTopic}") // config time SpEL
11181120
public Collection<String> replyingBatchListener(List<String> in) {
11191121
...

0 commit comments

Comments
 (0)