Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
Expand All @@ -27,6 +28,7 @@

import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/**
* Default implementation for {@link ReactivePulsarConsumerFactory}.
Expand Down Expand Up @@ -85,6 +87,7 @@ public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
customizers.forEach((c) -> c.customize(consumerBuilder));
}
this.ensureTopicNamesFullyQualified(consumerBuilder);
this.ensureTopicsPatternFullyQualified(consumerBuilder);
return consumerBuilder.build();
}

Expand All @@ -100,4 +103,19 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder<T>
}
}

protected void ensureTopicsPatternFullyQualified(ReactiveMessageConsumerBuilder<T> consumerBuilder) {
if (this.topicBuilder == null) {
return;
}
var mutableSpec = consumerBuilder.getMutableSpec();
var topicsPattern = mutableSpec.getTopicsPattern();
if (topicsPattern != null && StringUtils.hasText(topicsPattern.pattern())) {
var topicsPatternStr = topicsPattern.pattern();
var fqTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
if (!topicsPatternStr.equals(fqTopicsPatternStr)) {
mutableSpec.setTopicsPattern(Pattern.compile(fqTopicsPatternStr));
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
Expand Down Expand Up @@ -112,7 +113,7 @@ void createConsumerWithCustomizer() {
class FactoryCreatedWithTopicBuilder {

@Test
void createConsumer() {
void createConsumerEnsureTopicNamesFullyQualified() {
var topicBuilder = spy(new PulsarTopicBuilder());
var consumerFactory = new DefaultReactivePulsarConsumerFactory<String>(
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null);
Expand All @@ -127,6 +128,23 @@ void createConsumer() {
verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopic);
}

@Test
void createConsumerEnsureTopicsPatternFullyQualified() {
var topicBuilder = spy(new PulsarTopicBuilder());
var consumerFactory = new DefaultReactivePulsarConsumerFactory<String>(
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null);
consumerFactory.setTopicBuilder(topicBuilder);
var inputTopicsPattern = "my-topic-.*";
var fullyQualifiedTopicsPattern = "persistent://public/default/my-topic-.*";
var consumer = consumerFactory.createConsumer(SCHEMA,
Collections.singletonList(builder -> builder.topicsPattern(Pattern.compile(inputTopicsPattern))));
var reactiveMessageConsumerSpec = assertThat(consumer)
.extracting("consumerSpec", InstanceOfAssertFactories.type(ReactiveMessageConsumerSpec.class))
.actual();
assertThat(reactiveMessageConsumerSpec.getTopicsPattern().pattern()).isEqualTo(fullyQualifiedTopicsPattern);
verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopicsPattern);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.regex.Pattern;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
Expand All @@ -34,6 +35,7 @@

import org.springframework.pulsar.PulsarException;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/**
* Default implementation for {@link PulsarConsumerFactory}.
Expand Down Expand Up @@ -117,6 +119,7 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
}
this.ensureTopicNamesFullyQualified(consumerBuilder);
this.ensureTopicsPatternFullyQualified(consumerBuilder);
try {
return consumerBuilder.subscribe();
}
Expand Down Expand Up @@ -148,4 +151,19 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder<T> builder) {
}
}

protected void ensureTopicsPatternFullyQualified(ConsumerBuilder<T> builder) {
if (this.topicBuilder == null) {
return;
}
var builderImpl = (ConsumerBuilderImpl<T>) builder;
var topicsPattern = builderImpl.getConf().getTopicsPattern();
if (topicsPattern != null && StringUtils.hasText(topicsPattern.pattern())) {
var topicsPatternStr = topicsPattern.pattern();
var fqTopicsPatternStr = this.topicBuilder.getFullyQualifiedNameForTopic(topicsPatternStr);
if (!topicsPatternStr.equals(fqTopicsPatternStr)) {
builderImpl.getConf().setTopicsPattern(Pattern.compile(fqTopicsPatternStr));
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.jspecify.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -271,26 +275,50 @@ void multipleConfigCustomizers() throws PulsarClientException {
@Nested
class CreateConsumerUsingPulsarTopicBuilder {

private DefaultPulsarConsumerFactory<String> consumerFactory;

private PulsarTopicBuilder pulsarTopicBuilder;

@BeforeEach
void createConsumerFactory() {
pulsarTopicBuilder = spy(new PulsarTopicBuilder());
consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, null);
@Test
void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientException {
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
null);
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
try (Consumer<String> consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"),
"with-pulsar-topic-builder-ensure-topic-names-fully-qualified-sub", null, null)) {
assertThat(consumer.getTopic()).isEqualTo("persistent://public/default/topic1");
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic1");
}
}

@Test
void withPulsarTopicBuilder() throws PulsarClientException {
try (var consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"),
"with-pulsar-topic-builder-sub", null, null)) {
assertThat(consumer.getTopic()).isEqualTo("persistent://public/default/topic1");
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic1");
void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClientException {
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
ConsumerBuilderCustomizer<String> customizer = (builder) -> builder.topicsPattern("topic-.*");
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
List.of(customizer));
consumerFactory.setTopicBuilder(pulsarTopicBuilder);

try (var consumer = consumerFactory.createConsumer(SCHEMA, null,
"with-pulsar-topic-builder-ensure-topics-pattern-fully-qualified-sub", null, null)) {
assertThat(consumer).isInstanceOf(PatternMultiTopicsConsumerImpl.class);
var patternMultiTopicsConsumer = (PatternMultiTopicsConsumerImpl<String>) consumer;
var topicsPattern = patternMultiTopicsConsumer.getPattern();
assertThat(topicsPattern.inputPattern()).isEqualTo("persistent://public/default/topic-.*");
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic-.*");
temporarilyDealWithPulsar24698(patternMultiTopicsConsumer);
}
}

// TODO remove when Pulsar client updates to 4.2.0
private void temporarilyDealWithPulsar24698(PatternMultiTopicsConsumerImpl<String> consumer) {
// See https://github.com/apache/pulsar/pull/24698
// If this is not here there will be numerous exceptions when
// PulsarClient.close
CompletableFuture<?> watcherFuture = assertThat(consumer)
.extracting("watcherFuture", InstanceOfAssertFactories.type(CompletableFuture.class))
.actual();
watcherFuture.join();

}

}

}