Skip to content

Commit cab35a7

Browse files
oneby-wangonobc
authored andcommitted
Support relative topicPattern in PulsarListener
Allows specifying the topicsPattern on the PulsarListener without requiring the full tenant/namespace prefix. Resolves #1240 Signed-off-by: oneby-wang <[email protected]>
1 parent e7312ba commit cab35a7

File tree

4 files changed

+85
-14
lines changed

4 files changed

+85
-14
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import java.util.Collections;
2020
import java.util.List;
21+
import java.util.regex.Pattern;
2122

23+
import org.apache.commons.lang3.StringUtils;
2224
import org.apache.pulsar.client.api.Schema;
2325
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
2426
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder;
@@ -85,6 +87,7 @@ public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
8587
customizers.forEach((c) -> c.customize(consumerBuilder));
8688
}
8789
this.ensureTopicNamesFullyQualified(consumerBuilder);
90+
this.ensureTopicsPatternFullyQualified(consumerBuilder);
8891
return consumerBuilder.build();
8992
}
9093

@@ -100,4 +103,17 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder<T>
100103
}
101104
}
102105

106+
protected void ensureTopicsPatternFullyQualified(ReactiveMessageConsumerBuilder<T> consumerBuilder) {
107+
if (this.topicBuilder == null) {
108+
return;
109+
}
110+
var mutableSpec = consumerBuilder.getMutableSpec();
111+
var topicsPattern = mutableSpec.getTopicsPattern();
112+
if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) {
113+
var topicsPatternStr = topicsPattern.pattern();
114+
var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
115+
mutableSpec.setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr));
116+
}
117+
}
118+
103119
}

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.Collections;
2424
import java.util.List;
25+
import java.util.regex.Pattern;
2526

2627
import org.apache.pulsar.client.api.PulsarClient;
2728
import org.apache.pulsar.client.api.Schema;
@@ -112,7 +113,7 @@ void createConsumerWithCustomizer() {
112113
class FactoryCreatedWithTopicBuilder {
113114

114115
@Test
115-
void createConsumer() {
116+
void createConsumerEnsureTopicNamesFullyQualified() {
116117
var topicBuilder = spy(new PulsarTopicBuilder());
117118
var consumerFactory = new DefaultReactivePulsarConsumerFactory<String>(
118119
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null);
@@ -127,6 +128,23 @@ void createConsumer() {
127128
verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopic);
128129
}
129130

131+
@Test
132+
void createConsumerEnsureTopicsPatternFullyQualified() {
133+
var topicBuilder = spy(new PulsarTopicBuilder());
134+
var consumerFactory = new DefaultReactivePulsarConsumerFactory<String>(
135+
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null);
136+
consumerFactory.setTopicBuilder(topicBuilder);
137+
var inputTopicsPattern = "my-topic-.*";
138+
var fullyQualifiedTopicsPattern = "persistent://public/default/my-topic-.*";
139+
var consumer = consumerFactory.createConsumer(SCHEMA,
140+
Collections.singletonList(builder -> builder.topicsPattern(Pattern.compile(inputTopicsPattern))));
141+
ReactiveMessageConsumerSpec reactiveMessageConsumerSpec = assertThat(consumer)
142+
.extracting("consumerSpec", InstanceOfAssertFactories.type(ReactiveMessageConsumerSpec.class))
143+
.actual();
144+
assertThat(reactiveMessageConsumerSpec.getTopicsPattern().pattern()).isEqualTo(fullyQualifiedTopicsPattern);
145+
verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopicsPattern);
146+
}
147+
130148
}
131149

132150
}

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import java.util.Map;
2424
import java.util.Objects;
2525
import java.util.TreeMap;
26+
import java.util.regex.Pattern;
2627

28+
import org.apache.commons.lang3.StringUtils;
2729
import org.apache.pulsar.client.api.Consumer;
2830
import org.apache.pulsar.client.api.ConsumerBuilder;
2931
import org.apache.pulsar.client.api.PulsarClient;
@@ -117,6 +119,7 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
117119
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
118120
}
119121
this.ensureTopicNamesFullyQualified(consumerBuilder);
122+
this.ensureTopicsPatternFullyQualified(consumerBuilder);
120123
try {
121124
return consumerBuilder.subscribe();
122125
}
@@ -148,4 +151,17 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder<T> builder) {
148151
}
149152
}
150153

154+
protected void ensureTopicsPatternFullyQualified(ConsumerBuilder<T> builder) {
155+
if (this.topicBuilder == null) {
156+
return;
157+
}
158+
var builderImpl = (ConsumerBuilderImpl<T>) builder;
159+
var topicsPattern = builderImpl.getConf().getTopicsPattern();
160+
if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) {
161+
var topicsPatternStr = topicsPattern.pattern();
162+
var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
163+
builderImpl.getConf().setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr));
164+
}
165+
}
166+
151167
}

spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@
2727
import java.util.Collections;
2828
import java.util.List;
2929
import java.util.Map;
30+
import java.util.concurrent.CompletableFuture;
3031

3132
import org.apache.pulsar.client.api.ConsumerBuilder;
3233
import org.apache.pulsar.client.api.PulsarClient;
3334
import org.apache.pulsar.client.api.PulsarClientException;
3435
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
3536
import org.apache.pulsar.client.api.Schema;
37+
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
38+
import org.assertj.core.api.InstanceOfAssertFactories;
3639
import org.jspecify.annotations.Nullable;
3740
import org.junit.jupiter.api.AfterEach;
3841
import org.junit.jupiter.api.BeforeEach;
@@ -271,26 +274,44 @@ void multipleConfigCustomizers() throws PulsarClientException {
271274
@Nested
272275
class CreateConsumerUsingPulsarTopicBuilder {
273276

274-
private DefaultPulsarConsumerFactory<String> consumerFactory;
275-
276-
private PulsarTopicBuilder pulsarTopicBuilder;
277-
278-
@BeforeEach
279-
void createConsumerFactory() {
280-
pulsarTopicBuilder = spy(new PulsarTopicBuilder());
281-
consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, null);
282-
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
283-
}
284-
285277
@Test
286-
void withPulsarTopicBuilder() throws PulsarClientException {
278+
void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientException {
279+
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
280+
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
281+
null);
282+
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
287283
try (var consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"),
288-
"with-pulsar-topic-builder-sub", null, null)) {
284+
"with-pulsar-topic-builder-ensure-topic-names-fully-qualified-sub", null, null)) {
289285
assertThat(consumer.getTopic()).isEqualTo("persistent://public/default/topic1");
290286
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic1");
291287
}
292288
}
293289

290+
@Test
291+
void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClientException {
292+
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
293+
ConsumerBuilderCustomizer<String> customizer = (builder) -> builder.topicsPattern("topic-.*");
294+
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
295+
List.of(customizer));
296+
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
297+
298+
try (var consumer = consumerFactory.createConsumer(SCHEMA, null,
299+
"with-pulsar-topic-builder-ensure-topics-pattern-fully-qualified-sub", null, null)) {
300+
assertThat(consumer).isInstanceOf(PatternMultiTopicsConsumerImpl.class);
301+
var patternMultiTopicsConsumer = (PatternMultiTopicsConsumerImpl<String>) consumer;
302+
var topicsPattern = patternMultiTopicsConsumer.getPattern();
303+
assertThat(topicsPattern.inputPattern()).isEqualTo("persistent://public/default/topic-.*");
304+
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic-.*");
305+
CompletableFuture<?> watcherFuture = assertThat(patternMultiTopicsConsumer)
306+
.extracting("watcherFuture", InstanceOfAssertFactories.type(CompletableFuture.class))
307+
.actual();
308+
// Seems some bugs in PatternMultiTopicsConsumerImpl.closeAsync() method.
309+
// If watcherFuture is not completed, invoke pulsarClient.close() first
310+
// will cause exception.
311+
watcherFuture.join();
312+
}
313+
}
314+
294315
}
295316

296317
}

0 commit comments

Comments
 (0)