Skip to content

Commit 8281d56

Browse files
authored
Merge pull request #41 from chxbca/feature/issue-40-SPEL-support
Feat SpEL support to set topic name in @PulsarConsumer annotation
2 parents 2e21bd9 + 7e51dcd commit 8281d56

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

src/main/java/io/github/majusko/pulsar/consumer/ConsumerBuilder.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import org.apache.pulsar.client.api.PulsarClient;
99
import org.apache.pulsar.client.api.PulsarClientException;
1010
import org.apache.pulsar.client.api.Schema;
11+
import org.springframework.context.EmbeddedValueResolverAware;
1112
import org.springframework.context.annotation.DependsOn;
1213
import org.springframework.stereotype.Component;
14+
import org.springframework.util.StringValueResolver;
1315
import reactor.core.Disposable;
1416
import reactor.core.publisher.EmitterProcessor;
1517

@@ -20,12 +22,13 @@
2022

2123
@Component
2224
@DependsOn({"pulsarClient", "consumerCollector"})
23-
public class ConsumerBuilder {
25+
public class ConsumerBuilder implements EmbeddedValueResolverAware {
2426

2527
private final EmitterProcessor<FailedMessage> exceptionEmitter = EmitterProcessor.create();
2628
private final ConsumerCollector consumerCollector;
2729
private final PulsarClient pulsarClient;
2830

31+
private StringValueResolver stringValueResolver;
2932
private List<Consumer> consumers;
3033

3134
public ConsumerBuilder(ConsumerCollector consumerCollector, PulsarClient pulsarClient) {
@@ -46,7 +49,7 @@ private Consumer<?> subscribe(String name, ConsumerHolder holder) {
4649
.newConsumer(Schema.JSON(holder.getAnnotation().clazz()))
4750
.consumerName("consumer-" + name)
4851
.subscriptionName("subscription-" + name)
49-
.topic(holder.getAnnotation().topic())
52+
.topic(stringValueResolver.resolveStringValue(holder.getAnnotation().topic()))
5053
.subscriptionType(holder.getAnnotation().subscriptionType())
5154
.messageListener((consumer, msg) -> {
5255
try {
@@ -73,4 +76,9 @@ public List<Consumer> getConsumers() {
7376
public Disposable onError(java.util.function.Consumer<? super FailedMessage> consumer) {
7477
return exceptionEmitter.subscribe(consumer);
7578
}
79+
80+
@Override
81+
public void setEmbeddedValueResolver(StringValueResolver stringValueResolver) {
82+
this.stringValueResolver = stringValueResolver;
83+
}
7684
}

0 commit comments

Comments
 (0)