Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerBuilder;
Expand Down Expand Up @@ -85,6 +87,7 @@ public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
customizers.forEach((c) -> c.customize(consumerBuilder));
}
this.ensureTopicNamesFullyQualified(consumerBuilder);
this.ensureTopicsPatternFullyQualified(consumerBuilder);
return consumerBuilder.build();
}

Expand All @@ -100,4 +103,17 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder<T>
}
}

protected void ensureTopicsPatternFullyQualified(ReactiveMessageConsumerBuilder<T> consumerBuilder) {
if (this.topicBuilder == null) {
return;
}
var mutableSpec = consumerBuilder.getMutableSpec();
var topicsPattern = mutableSpec.getTopicsPattern();
if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) {
var topicsPatternStr = topicsPattern.pattern();
var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
mutableSpec.setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -112,7 +113,7 @@ void createConsumerWithCustomizer() {
class FactoryCreatedWithTopicBuilder {

@Test
void createConsumer() {
void createConsumerEnsureTopicNamesFullyQualified() {
var topicBuilder = spy(new PulsarTopicBuilder());
var consumerFactory = new DefaultReactivePulsarConsumerFactory<String>(
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null);
Expand All @@ -127,6 +128,23 @@ void createConsumer() {
verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopic);
}

@Test
void createConsumerEnsureTopicsPatternFullyQualified() {
var topicBuilder = spy(new PulsarTopicBuilder());
var consumerFactory = new DefaultReactivePulsarConsumerFactory<String>(
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null);
consumerFactory.setTopicBuilder(topicBuilder);
var inputTopicsPattern = "my-topic-.*";
var fullyQualifiedTopicsPattern = "persistent://public/default/my-topic-.*";
var consumer = consumerFactory.createConsumer(SCHEMA,
Collections.singletonList(builder -> builder.topicsPattern(Pattern.compile(inputTopicsPattern))));
ReactiveMessageConsumerSpec reactiveMessageConsumerSpec = assertThat(consumer)
.extracting("consumerSpec", InstanceOfAssertFactories.type(ReactiveMessageConsumerSpec.class))
.actual();
assertThat(reactiveMessageConsumerSpec.getTopicsPattern().pattern()).isEqualTo(fullyQualifiedTopicsPattern);
verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopicsPattern);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.regex.Pattern;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -117,6 +119,7 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
}
this.ensureTopicNamesFullyQualified(consumerBuilder);
this.ensureTopicsPatternFullyQualified(consumerBuilder);
try {
return consumerBuilder.subscribe();
}
Expand Down Expand Up @@ -148,4 +151,17 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder<T> builder) {
}
}

protected void ensureTopicsPatternFullyQualified(ConsumerBuilder<T> builder) {
if (this.topicBuilder == null) {
return;
}
var builderImpl = (ConsumerBuilderImpl<T>) builder;
var topicsPattern = builderImpl.getConf().getTopicsPattern();
if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) {
var topicsPatternStr = topicsPattern.pattern();
var fullyQualifiedTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
builderImpl.getConf().setTopicsPattern(Pattern.compile(fullyQualifiedTopicsPatternStr));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.jspecify.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -271,26 +274,44 @@ void multipleConfigCustomizers() throws PulsarClientException {
@Nested
class CreateConsumerUsingPulsarTopicBuilder {

private DefaultPulsarConsumerFactory<String> consumerFactory;

private PulsarTopicBuilder pulsarTopicBuilder;

@BeforeEach
void createConsumerFactory() {
pulsarTopicBuilder = spy(new PulsarTopicBuilder());
consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, null);
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
}

@Test
void withPulsarTopicBuilder() throws PulsarClientException {
void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientException {
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
null);
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
try (var consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"),
"with-pulsar-topic-builder-sub", null, null)) {
"with-pulsar-topic-builder-ensure-topic-names-fully-qualified-sub", null, null)) {
assertThat(consumer.getTopic()).isEqualTo("persistent://public/default/topic1");
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic1");
}
}

@Test
void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClientException {
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
ConsumerBuilderCustomizer<String> customizer = (builder) -> builder.topicsPattern("topic-.*");
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
List.of(customizer));
consumerFactory.setTopicBuilder(pulsarTopicBuilder);

try (var consumer = consumerFactory.createConsumer(SCHEMA, null,
"with-pulsar-topic-builder-ensure-topics-pattern-fully-qualified-sub", null, null)) {
assertThat(consumer).isInstanceOf(PatternMultiTopicsConsumerImpl.class);
var patternMultiTopicsConsumer = (PatternMultiTopicsConsumerImpl<String>) consumer;
var topicsPattern = patternMultiTopicsConsumer.getPattern();
assertThat(topicsPattern.inputPattern()).isEqualTo("persistent://public/default/topic-.*");
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic-.*");
CompletableFuture<?> watcherFuture = assertThat(patternMultiTopicsConsumer)
.extracting("watcherFuture", InstanceOfAssertFactories.type(CompletableFuture.class))
.actual();
// Seems some bugs in PatternMultiTopicsConsumerImpl.closeAsync() method.
// If watcherFuture is not completed, invoke pulsarClient.close() first
// will cause exception.
watcherFuture.join();
}
}

}

}