From b250408c5a84121ddfacd3bd439b609ea424e315 Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Tue, 2 Sep 2025 13:38:53 +0800 Subject: [PATCH 1/2] feat: support relative topicPattern without requiring full tenant/namespace prefix #1240 Signed-off-by: oneby-wang --- .../core/DefaultReactivePulsarConsumerFactory.java | 8 ++++++++ .../pulsar/core/DefaultPulsarConsumerFactory.java | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java index efc3b98d..e9cf8823 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java @@ -18,7 +18,9 @@ import java.util.Collections; import java.util.List; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder; @@ -98,6 +100,12 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); mutableSpec.setTopicNames(fullyQualifiedTopics); } + var topicsPattern = mutableSpec.getTopicsPattern(); + if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) { + var topicsPatternStr = topicsPattern.pattern(); + var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr); + mutableSpec.setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr)); + } } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index ec547c8d..587864ef 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -23,7 +23,9 @@ import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -146,6 +148,12 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder builder) { var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics)); } + var topicsPattern = builderImpl.getConf().getTopicsPattern(); + if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) { + var topicsPatternStr = topicsPattern.pattern(); + var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr); + builderImpl.getConf().setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr)); + } } } From d937d4b5e6fc76ad05e3a0bc029033a5438cfd7c Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Wed, 3 Sep 2025 14:03:51 +0800 Subject: [PATCH 2/2] feat: extract ensureTopicsPatternFullyQualified method and add unit tests Signed-off-by: oneby-wang --- .../DefaultReactivePulsarConsumerFactory.java | 8 ++++ ...ultReactivePulsarConsumerFactoryTests.java | 20 +++++++- .../core/DefaultPulsarConsumerFactory.java | 8 ++++ .../DefaultPulsarConsumerFactoryTests.java | 47 ++++++++++++++----- 4 files changed, 69 insertions(+), 14 deletions(-) diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java index e9cf8823..eb2c8c27 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java @@ -87,6 +87,7 @@ public ReactiveMessageConsumer createConsumer(Schema schema, customizers.forEach((c) -> c.customize(consumerBuilder)); } this.ensureTopicNamesFullyQualified(consumerBuilder); + this.ensureTopicsPatternFullyQualified(consumerBuilder); return consumerBuilder.build(); } @@ -100,6 +101,13 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); mutableSpec.setTopicNames(fullyQualifiedTopics); } + } + + protected void ensureTopicsPatternFullyQualified(ReactiveMessageConsumerBuilder consumerBuilder) { + if (this.topicBuilder == null) { + return; + } + var mutableSpec = consumerBuilder.getMutableSpec(); var topicsPattern = mutableSpec.getTopicsPattern(); if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) { var topicsPatternStr = topicsPattern.pattern(); diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java index de5fb8de..d8617899 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; +import java.util.regex.Pattern; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -112,7 +113,7 @@ void createConsumerWithCustomizer() { class FactoryCreatedWithTopicBuilder { @Test - void createConsumer() { + void createConsumerEnsureTopicNamesFullyQualified() { var topicBuilder = spy(new PulsarTopicBuilder()); var consumerFactory = new DefaultReactivePulsarConsumerFactory( AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null); @@ -127,6 +128,23 @@ void createConsumer() { verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopic); } + @Test + void createConsumerEnsureTopicsPatternFullyQualified() { + var topicBuilder = spy(new PulsarTopicBuilder()); + var consumerFactory = new DefaultReactivePulsarConsumerFactory( + AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null); + consumerFactory.setTopicBuilder(topicBuilder); + var inputTopicsPattern = "my-topic-.*"; + var fullyQualifiedTopicsPattern = "persistent://public/default/my-topic-.*"; + var consumer = consumerFactory.createConsumer(SCHEMA, + Collections.singletonList(builder -> builder.topicsPattern(Pattern.compile(inputTopicsPattern)))); + ReactiveMessageConsumerSpec reactiveMessageConsumerSpec = assertThat(consumer) + .extracting("consumerSpec", InstanceOfAssertFactories.type(ReactiveMessageConsumerSpec.class)) + .actual(); + assertThat(reactiveMessageConsumerSpec.getTopicsPattern().pattern()).isEqualTo(fullyQualifiedTopicsPattern); + verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopicsPattern); + } + } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index 587864ef..52252832 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -119,6 +119,7 @@ public Consumer createConsumer(Schema schema, @Nullable Collection customizers.forEach(customizer -> customizer.customize(consumerBuilder)); } this.ensureTopicNamesFullyQualified(consumerBuilder); + this.ensureTopicsPatternFullyQualified(consumerBuilder); try { return consumerBuilder.subscribe(); } @@ -148,6 +149,13 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder builder) { var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics)); } + } + + protected void ensureTopicsPatternFullyQualified(ConsumerBuilder builder) { + if (this.topicBuilder == null) { + return; + } + var builderImpl = (ConsumerBuilderImpl) builder; var topicsPattern = builderImpl.getConf().getTopicsPattern(); if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) { var topicsPatternStr = topicsPattern.pattern(); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java index cca21470..c9f7883e 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java @@ -27,12 +27,15 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl; +import org.assertj.core.api.InstanceOfAssertFactories; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -271,26 +274,44 @@ void multipleConfigCustomizers() throws PulsarClientException { @Nested class CreateConsumerUsingPulsarTopicBuilder { - private DefaultPulsarConsumerFactory consumerFactory; - - private PulsarTopicBuilder pulsarTopicBuilder; - - @BeforeEach - void createConsumerFactory() { - pulsarTopicBuilder = spy(new PulsarTopicBuilder()); - consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, null); - consumerFactory.setTopicBuilder(pulsarTopicBuilder); - } - @Test - void withPulsarTopicBuilder() throws PulsarClientException { + void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientException { + var pulsarTopicBuilder = spy(new PulsarTopicBuilder()); + DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, + null); + consumerFactory.setTopicBuilder(pulsarTopicBuilder); try (var consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"), - "with-pulsar-topic-builder-sub", null, null)) { + "with-pulsar-topic-builder-ensure-topic-names-fully-qualified-sub", null, null)) { assertThat(consumer.getTopic()).isEqualTo("persistent://public/default/topic1"); verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic1"); } } + @Test + void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClientException { + var pulsarTopicBuilder = spy(new PulsarTopicBuilder()); + ConsumerBuilderCustomizer customizer = (builder) -> builder.topicsPattern("topic-.*"); + DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, + List.of(customizer)); + consumerFactory.setTopicBuilder(pulsarTopicBuilder); + + try (var consumer = consumerFactory.createConsumer(SCHEMA, null, + "with-pulsar-topic-builder-ensure-topics-pattern-fully-qualified-sub", null, null)) { + assertThat(consumer).isInstanceOf(PatternMultiTopicsConsumerImpl.class); + var patternMultiTopicsConsumer = (PatternMultiTopicsConsumerImpl) consumer; + var topicsPattern = patternMultiTopicsConsumer.getPattern(); + assertThat(topicsPattern.inputPattern()).isEqualTo("persistent://public/default/topic-.*"); + verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic-.*"); + CompletableFuture watcherFuture = assertThat(patternMultiTopicsConsumer) + .extracting("watcherFuture", InstanceOfAssertFactories.type(CompletableFuture.class)) + .actual(); + // Seems some bugs in PatternMultiTopicsConsumerImpl.closeAsync() method. + // If watcherFuture is not completed, invoke pulsarClient.close() first + // will cause exception. + watcherFuture.join(); + } + } + } }