Skip to content

Commit b250408

Browse files
author
oneby-wang
committed
feat: support relative topicPattern without requiring full tenant/namespace prefix #1240
Signed-off-by: oneby-wang <[email protected]>
1 parent 4d61cb0 commit b250408

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import java.util.Collections;
2020
import java.util.List;
21+
import java.util.regex.Pattern;
2122

23+
import org.apache.commons.lang3.StringUtils;
2224
import org.apache.pulsar.client.api.Schema;
2325
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
2426
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder;
@@ -98,6 +100,12 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder<T>
98100
var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
99101
mutableSpec.setTopicNames(fullyQualifiedTopics);
100102
}
103+
var topicsPattern = mutableSpec.getTopicsPattern();
104+
if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) {
105+
var topicsPatternStr = topicsPattern.pattern();
106+
var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
107+
mutableSpec.setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr));
108+
}
101109
}
102110

103111
}

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.util.Map;
2424
import java.util.Objects;
2525
import java.util.TreeMap;
26+
import java.util.regex.Pattern;
2627

28+
import org.apache.commons.lang3.StringUtils;
2729
import org.apache.pulsar.client.api.Consumer;
2830
import org.apache.pulsar.client.api.ConsumerBuilder;
2931
import org.apache.pulsar.client.api.PulsarClient;
@@ -146,6 +148,12 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder<T> builder) {
146148
var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
147149
builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics));
148150
}
151+
var topicsPattern = builderImpl.getConf().getTopicsPattern();
152+
if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) {
153+
var topicsPatternStr = topicsPattern.pattern();
154+
var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
155+
builderImpl.getConf().setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr));
156+
}
149157
}
150158

151159
}

0 commit comments

Comments
 (0)