Skip to content

Commit 18378a8

Browse files
jonas-grgtonobc
authored andcommitted
Add support for SpEL topics on @PulsarMessage
This commit adds support for expressions (property placeholders and SpEL) on the topic attribute of @PulsarMessage. Resolves #568
1 parent d8524d1 commit 18378a8

File tree

9 files changed

+340
-2
lines changed

9 files changed

+340
-2
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.inttest.app;
18+
19+
import org.springframework.pulsar.annotation.PulsarMessage;
20+
21+
@PulsarMessage(topic = "${inttest.topic}")
22+
record TopicPropertyDefinedSampleMessage(Integer id, String content) {
23+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.inttest.app;
18+
19+
import org.springframework.pulsar.annotation.PulsarMessage;
20+
21+
@PulsarMessage(topic = "#{spelTopic}")
22+
record TopicSpELDefinedSampleMessage(Integer id, String content) {
23+
}

spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/topic-resolution.adoc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,18 @@ record Foo(String value) {
4848
}
4949
----
5050

51+
Property placeholders and SpEL expressions are supported in the `@PulsarMessage` annotation,
52+
for example:
53+
[source,java,indent=0,subs="verbatim"]
54+
----
55+
@PulsarMessage(topic = "${app.topics.foo}")
56+
record Foo(String value) {
57+
}
58+
59+
@PulsarMessage(topic = "#{someBean.getTopic()}")
60+
record Bar(String value) {
61+
}
62+
----
5163
=== Custom topic resolver
5264
The preferred method of adding mappings is via the property mentioned above.
5365
However, if more control is needed you can replace the default resolver by proving your own implementation, for example:
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.core;
18+
19+
import org.springframework.beans.factory.config.BeanExpressionContext;
20+
import org.springframework.beans.factory.config.BeanExpressionResolver;
21+
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
22+
23+
/**
24+
* Default implementation of {@link ExpressionResolver} that relies on the
25+
* {@link ConfigurableBeanFactory} capabilities to resolve expressions.
26+
*
27+
* @author Jonas Geiregat
28+
*/
29+
public class DefaultExpressionResolver implements ExpressionResolver {
30+
31+
private final BeanExpressionResolver beanExpressionResolver;
32+
33+
private final BeanExpressionContext beanExpressionContext;
34+
35+
private final ConfigurableBeanFactory configurableBeanFactory;
36+
37+
public DefaultExpressionResolver(ConfigurableBeanFactory configurableBeanFactory) {
38+
this.beanExpressionResolver = configurableBeanFactory.getBeanExpressionResolver();
39+
this.beanExpressionContext = new BeanExpressionContext(configurableBeanFactory, null);
40+
this.configurableBeanFactory = configurableBeanFactory;
41+
}
42+
43+
@Override
44+
public Resolved<String> resolveToString(String expression) {
45+
String placeholdersResolved = this.configurableBeanFactory.resolveEmbeddedValue(expression);
46+
Object resolvedObj = this.beanExpressionResolver.evaluate(placeholdersResolved, this.beanExpressionContext);
47+
if (resolvedObj instanceof String resolvedString) {
48+
return Resolved.of(resolvedString);
49+
}
50+
if (resolvedObj == null) {
51+
return Resolved.of(null);
52+
}
53+
return Resolved
54+
.failed("The expression '%s' must resolve to a string but was: %s".formatted(expression, resolvedObj));
55+
}
56+
57+
}

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultTopicResolver.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
import java.util.Map;
2222
import java.util.function.Supplier;
2323

24+
import org.springframework.beans.BeansException;
25+
import org.springframework.beans.factory.BeanFactory;
26+
import org.springframework.beans.factory.BeanFactoryAware;
27+
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
28+
import org.springframework.core.log.LogAccessor;
2429
import org.springframework.lang.Nullable;
2530
import org.springframework.pulsar.annotation.PulsarMessage;
2631
import org.springframework.util.StringUtils;
@@ -34,15 +39,35 @@
3439
*
3540
* @author Chris Bono
3641
* @author Aleksei Arsenev
42+
* @author Jonas Geiregat
3743
*/
38-
public class DefaultTopicResolver implements TopicResolver {
44+
public class DefaultTopicResolver implements TopicResolver, BeanFactoryAware {
45+
46+
private final LogAccessor logger = new LogAccessor(this.getClass());
3947

4048
private final Map<Class<?>, String> customTopicMappings = new LinkedHashMap<>();
4149

4250
private final PulsarMessageAnnotationRegistry pulsarMessageAnnotationRegistry = new PulsarMessageAnnotationRegistry();
4351

4452
private boolean usePulsarMessageAnnotations = true;
4553

54+
@Nullable
55+
private ExpressionResolver expressionResolver;
56+
57+
/**
58+
* Constructs a new DefaultTopicResolver with the given expression resolver.
59+
* @param expressionResolver the expression resolver to use for resolving topic
60+
*/
61+
public DefaultTopicResolver(ExpressionResolver expressionResolver) {
62+
this.expressionResolver = expressionResolver;
63+
}
64+
65+
/**
66+
* Constructs a new DefaultTopicResolver.
67+
*/
68+
public DefaultTopicResolver() {
69+
}
70+
4671
/**
4772
* Sets whether to inspect message classes for the
4873
* {@link PulsarMessage @PulsarMessage} annotation during topic resolution.
@@ -135,11 +160,31 @@ protected Resolved<String> doResolveTopic(@Nullable String userSpecifiedTopic, @
135160
}
136161

137162
// VisibleForTesting
163+
@Nullable
138164
String getAnnotatedTopicInfo(Class<?> messageType) {
139165
return this.pulsarMessageAnnotationRegistry.getAnnotationFor(messageType)
140166
.map(PulsarMessage::topic)
141167
.filter(StringUtils::hasText)
168+
.map(this::resolveExpression)
142169
.orElse(null);
143170
}
144171

172+
private String resolveExpression(String v) {
173+
return this.expressionResolver == null ? v : this.expressionResolver.resolveToString(v)
174+
.orElseThrow(() -> "Failed to resolve topic expression: %s".formatted(v));
175+
}
176+
177+
@Override
178+
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
179+
if (beanFactory instanceof ConfigurableBeanFactory configurableBeanFactory) {
180+
this.expressionResolver = new DefaultExpressionResolver(configurableBeanFactory);
181+
}
182+
else {
183+
this.logger.warn(
184+
() -> "Topic expressions on @PulsarMessage will not be resolved: bean factory must be %s but was %s"
185+
.formatted(ConfigurableBeanFactory.class.getSimpleName(),
186+
beanFactory.getClass().getSimpleName()));
187+
}
188+
}
189+
145190
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.core;
18+
19+
/**
20+
* Resolves expressions.
21+
*
22+
* @author Jonas Geiregat
23+
*/
24+
public interface ExpressionResolver {
25+
26+
/**
27+
* Resolve the given expression.
28+
* @param expression the expression to resolve
29+
* @return the resolved value as a {@code Resolved} of {@link String}
30+
*/
31+
Resolved<String> resolveToString(String expression);
32+
33+
}

spring-pulsar/src/main/java/org/springframework/pulsar/core/Resolved.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package org.springframework.pulsar.core;
1818

19+
import java.util.Objects;
1920
import java.util.Optional;
21+
import java.util.StringJoiner;
2022
import java.util.function.Consumer;
2123
import java.util.function.Supplier;
2224

@@ -28,6 +30,7 @@
2830
* @param <T> the resolved type
2931
* @author Christophe Bornet
3032
* @author Chris Bono
33+
* @author Jonas Geiregat
3134
*/
3235
public final class Resolved<T> {
3336

@@ -48,7 +51,7 @@ private Resolved(@Nullable T value, @Nullable RuntimeException exception) {
4851
* @param <T> the type of the value
4952
* @return a {@code Resolved} containing the resolved value
5053
*/
51-
public static <T> Resolved<T> of(T value) {
54+
public static <T> Resolved<T> of(@Nullable T value) {
5255
return new Resolved<>(value, null);
5356
}
5457

@@ -155,4 +158,28 @@ public T orElseThrow(Supplier<String> wrappingErrorMessage) {
155158
return this.value;
156159
}
157160

161+
@Override
162+
public boolean equals(Object o) {
163+
if (this == o) {
164+
return true;
165+
}
166+
if (o == null || getClass() != o.getClass()) {
167+
return false;
168+
}
169+
Resolved<?> resolved = (Resolved<?>) o;
170+
return Objects.equals(this.value, resolved.value) && Objects.equals(this.exception, resolved.exception);
171+
}
172+
173+
@Override
174+
public int hashCode() {
175+
return Objects.hash(this.value, this.exception);
176+
}
177+
178+
@Override
179+
public String toString() {
180+
return new StringJoiner(", ", Resolved.class.getSimpleName() + "[", "]").add("value=" + this.value)
181+
.add("exception=" + this.exception)
182+
.toString();
183+
}
184+
158185
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.core;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.ArgumentMatchers.eq;
22+
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
25+
import org.junit.jupiter.api.Test;
26+
27+
import org.springframework.beans.factory.config.BeanExpressionContext;
28+
import org.springframework.beans.factory.config.BeanExpressionResolver;
29+
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
30+
31+
/**
32+
* Unit tests for {@link DefaultExpressionResolver}.
33+
*
34+
* @author Jonas Geiregat
35+
*/
36+
class DefaultExpressionResolverTest {
37+
38+
@Test
39+
void resolveEvaluatedStringResult() {
40+
var configurableBeanFactory = mock(ConfigurableBeanFactory.class);
41+
var beanExpressionResolver = mock(BeanExpressionResolver.class);
42+
when(configurableBeanFactory.getBeanExpressionResolver()).thenReturn(beanExpressionResolver);
43+
when(configurableBeanFactory.resolveEmbeddedValue("${topic.name}")).thenReturn("resolved-topic-name");
44+
when(beanExpressionResolver.evaluate(eq("resolved-topic-name"), any(BeanExpressionContext.class)))
45+
.thenReturn("resolved-topic-name");
46+
47+
ExpressionResolver expressionResolver = new DefaultExpressionResolver(configurableBeanFactory);
48+
Resolved<String> resolved = expressionResolver.resolveToString("${topic.name}");
49+
50+
assertThat(resolved).isEqualTo(Resolved.of("resolved-topic-name"));
51+
}
52+
53+
@Test
54+
void resolveEvaluatedNullResult() {
55+
var configurableBeanFactory = mock(ConfigurableBeanFactory.class);
56+
var beanExpressionResolver = mock(BeanExpressionResolver.class);
57+
when(configurableBeanFactory.getBeanExpressionResolver()).thenReturn(beanExpressionResolver);
58+
when(configurableBeanFactory.resolveEmbeddedValue("#{null")).thenReturn("#{null}");
59+
when(beanExpressionResolver.evaluate(eq("#{null}"), any(BeanExpressionContext.class))).thenReturn(null);
60+
61+
ExpressionResolver expressionResolver = new DefaultExpressionResolver(configurableBeanFactory);
62+
Resolved<String> resolved = expressionResolver.resolveToString("#{null}");
63+
64+
assertThat(resolved).isEqualTo(Resolved.of(null));
65+
}
66+
67+
@Test
68+
void failToResolveEvaluatedNoneString() {
69+
var configurableBeanFactory = mock(ConfigurableBeanFactory.class);
70+
var beanExpressionResolver = mock(BeanExpressionResolver.class);
71+
when(configurableBeanFactory.getBeanExpressionResolver()).thenReturn(beanExpressionResolver);
72+
when(configurableBeanFactory.resolveEmbeddedValue("#{someBean.someProperty}"))
73+
.thenReturn("#{someBean.someProperty}");
74+
when(beanExpressionResolver.evaluate(eq("#{someBean.someProperty}"), any(BeanExpressionContext.class)))
75+
.thenReturn(new Object() {
76+
});
77+
78+
ExpressionResolver expressionResolver = new DefaultExpressionResolver(configurableBeanFactory);
79+
Resolved<String> resolved = expressionResolver.resolveToString("#{someBean.someProperty}");
80+
81+
assertThat(resolved.exception())
82+
.hasValueSatisfying((ex) -> assertThat(ex).isInstanceOf(IllegalArgumentException.class)
83+
.hasMessageContaining("The expression '#{someBean.someProperty}' must resolve to a string but was: "));
84+
85+
}
86+
87+
}

0 commit comments

Comments
 (0)