diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java index efc3b98d..eb2c8c27 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java @@ -18,7 +18,9 @@ import java.util.Collections; import java.util.List; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder; @@ -85,6 +87,7 @@ public ReactiveMessageConsumer createConsumer(Schema schema, customizers.forEach((c) -> c.customize(consumerBuilder)); } this.ensureTopicNamesFullyQualified(consumerBuilder); + this.ensureTopicsPatternFullyQualified(consumerBuilder); return consumerBuilder.build(); } @@ -100,4 +103,17 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder } } + protected void ensureTopicsPatternFullyQualified(ReactiveMessageConsumerBuilder consumerBuilder) { + if (this.topicBuilder == null) { + return; + } + var mutableSpec = consumerBuilder.getMutableSpec(); + var topicsPattern = mutableSpec.getTopicsPattern(); + if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) { + var topicsPatternStr = topicsPattern.pattern(); + var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr); + mutableSpec.setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr)); + } + } + } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java index de5fb8de..d8617899 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; +import java.util.regex.Pattern; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -112,7 +113,7 @@ void createConsumerWithCustomizer() { class FactoryCreatedWithTopicBuilder { @Test - void createConsumer() { + void createConsumerEnsureTopicNamesFullyQualified() { var topicBuilder = spy(new PulsarTopicBuilder()); var consumerFactory = new DefaultReactivePulsarConsumerFactory( AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null); @@ -127,6 +128,23 @@ void createConsumer() { verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopic); } + @Test + void createConsumerEnsureTopicsPatternFullyQualified() { + var topicBuilder = spy(new PulsarTopicBuilder()); + var consumerFactory = new DefaultReactivePulsarConsumerFactory( + AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null); + consumerFactory.setTopicBuilder(topicBuilder); + var inputTopicsPattern = "my-topic-.*"; + var fullyQualifiedTopicsPattern = "persistent://public/default/my-topic-.*"; + var consumer = consumerFactory.createConsumer(SCHEMA, + Collections.singletonList(builder -> builder.topicsPattern(Pattern.compile(inputTopicsPattern)))); + ReactiveMessageConsumerSpec reactiveMessageConsumerSpec = assertThat(consumer) + .extracting("consumerSpec", InstanceOfAssertFactories.type(ReactiveMessageConsumerSpec.class)) + .actual(); + assertThat(reactiveMessageConsumerSpec.getTopicsPattern().pattern()).isEqualTo(fullyQualifiedTopicsPattern); + verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopicsPattern); + } + } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index ec547c8d..52252832 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -23,7 +23,9 @@ import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -117,6 +119,7 @@ public Consumer createConsumer(Schema schema, @Nullable Collection customizers.forEach(customizer -> customizer.customize(consumerBuilder)); } this.ensureTopicNamesFullyQualified(consumerBuilder); + this.ensureTopicsPatternFullyQualified(consumerBuilder); try { return consumerBuilder.subscribe(); } @@ -148,4 +151,17 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder builder) { } } + protected void ensureTopicsPatternFullyQualified(ConsumerBuilder builder) { + if (this.topicBuilder == null) { + return; + } + var builderImpl = (ConsumerBuilderImpl) builder; + var topicsPattern = builderImpl.getConf().getTopicsPattern(); + if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) { + var topicsPatternStr = topicsPattern.pattern(); + var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr); + builderImpl.getConf().setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr)); + } + } + } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java index cca21470..c9f7883e 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java @@ -27,12 +27,15 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl; +import org.assertj.core.api.InstanceOfAssertFactories; import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -271,26 +274,44 @@ void multipleConfigCustomizers() throws PulsarClientException { @Nested class CreateConsumerUsingPulsarTopicBuilder { - private DefaultPulsarConsumerFactory consumerFactory; - - private PulsarTopicBuilder pulsarTopicBuilder; - - @BeforeEach - void createConsumerFactory() { - pulsarTopicBuilder = spy(new PulsarTopicBuilder()); - consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, null); - consumerFactory.setTopicBuilder(pulsarTopicBuilder); - } - @Test - void withPulsarTopicBuilder() throws PulsarClientException { + void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientException { + var pulsarTopicBuilder = spy(new PulsarTopicBuilder()); + DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, + null); + consumerFactory.setTopicBuilder(pulsarTopicBuilder); try (var consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"), - "with-pulsar-topic-builder-sub", null, null)) { + "with-pulsar-topic-builder-ensure-topic-names-fully-qualified-sub", null, null)) { assertThat(consumer.getTopic()).isEqualTo("persistent://public/default/topic1"); verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic1"); } } + @Test + void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClientException { + var pulsarTopicBuilder = spy(new PulsarTopicBuilder()); + ConsumerBuilderCustomizer customizer = (builder) -> builder.topicsPattern("topic-.*"); + DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, + List.of(customizer)); + consumerFactory.setTopicBuilder(pulsarTopicBuilder); + + try (var consumer = consumerFactory.createConsumer(SCHEMA, null, + "with-pulsar-topic-builder-ensure-topics-pattern-fully-qualified-sub", null, null)) { + assertThat(consumer).isInstanceOf(PatternMultiTopicsConsumerImpl.class); + var patternMultiTopicsConsumer = (PatternMultiTopicsConsumerImpl) consumer; + var topicsPattern = patternMultiTopicsConsumer.getPattern(); + assertThat(topicsPattern.inputPattern()).isEqualTo("persistent://public/default/topic-.*"); + verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic-.*"); + CompletableFuture watcherFuture = assertThat(patternMultiTopicsConsumer) + .extracting("watcherFuture", InstanceOfAssertFactories.type(CompletableFuture.class)) + .actual(); + // Seems some bugs in PatternMultiTopicsConsumerImpl.closeAsync() method. + // If watcherFuture is not completed, invoke pulsarClient.close() first + // will cause exception. + watcherFuture.join(); + } + } + } }