Skip to content

Commit d937d4b

Browse files
author
oneby-wang
committed
feat: extract ensureTopicsPatternFullyQualified method and add unit tests
Signed-off-by: oneby-wang <[email protected]>
1 parent b250408 commit d937d4b

File tree

4 files changed

+69
-14
lines changed

4 files changed

+69
-14
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
8787
customizers.forEach((c) -> c.customize(consumerBuilder));
8888
}
8989
this.ensureTopicNamesFullyQualified(consumerBuilder);
90+
this.ensureTopicsPatternFullyQualified(consumerBuilder);
9091
return consumerBuilder.build();
9192
}
9293

@@ -100,6 +101,13 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder<T>
100101
var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
101102
mutableSpec.setTopicNames(fullyQualifiedTopics);
102103
}
104+
}
105+
106+
protected void ensureTopicsPatternFullyQualified(ReactiveMessageConsumerBuilder<T> consumerBuilder) {
107+
if (this.topicBuilder == null) {
108+
return;
109+
}
110+
var mutableSpec = consumerBuilder.getMutableSpec();
103111
var topicsPattern = mutableSpec.getTopicsPattern();
104112
if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) {
105113
var topicsPatternStr = topicsPattern.pattern();

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.util.Collections;
2424
import java.util.List;
25+
import java.util.regex.Pattern;
2526

2627
import org.apache.pulsar.client.api.PulsarClient;
2728
import org.apache.pulsar.client.api.Schema;
@@ -112,7 +113,7 @@ void createConsumerWithCustomizer() {
112113
class FactoryCreatedWithTopicBuilder {
113114

114115
@Test
115-
void createConsumer() {
116+
void createConsumerEnsureTopicNamesFullyQualified() {
116117
var topicBuilder = spy(new PulsarTopicBuilder());
117118
var consumerFactory = new DefaultReactivePulsarConsumerFactory<String>(
118119
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null);
@@ -127,6 +128,23 @@ void createConsumer() {
127128
verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopic);
128129
}
129130

131+
@Test
132+
void createConsumerEnsureTopicsPatternFullyQualified() {
133+
var topicBuilder = spy(new PulsarTopicBuilder());
134+
var consumerFactory = new DefaultReactivePulsarConsumerFactory<String>(
135+
AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null);
136+
consumerFactory.setTopicBuilder(topicBuilder);
137+
var inputTopicsPattern = "my-topic-.*";
138+
var fullyQualifiedTopicsPattern = "persistent://public/default/my-topic-.*";
139+
var consumer = consumerFactory.createConsumer(SCHEMA,
140+
Collections.singletonList(builder -> builder.topicsPattern(Pattern.compile(inputTopicsPattern))));
141+
ReactiveMessageConsumerSpec reactiveMessageConsumerSpec = assertThat(consumer)
142+
.extracting("consumerSpec", InstanceOfAssertFactories.type(ReactiveMessageConsumerSpec.class))
143+
.actual();
144+
assertThat(reactiveMessageConsumerSpec.getTopicsPattern().pattern()).isEqualTo(fullyQualifiedTopicsPattern);
145+
verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopicsPattern);
146+
}
147+
130148
}
131149

132150
}

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
119119
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
120120
}
121121
this.ensureTopicNamesFullyQualified(consumerBuilder);
122+
this.ensureTopicsPatternFullyQualified(consumerBuilder);
122123
try {
123124
return consumerBuilder.subscribe();
124125
}
@@ -148,6 +149,13 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder<T> builder) {
148149
var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
149150
builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics));
150151
}
152+
}
153+
154+
protected void ensureTopicsPatternFullyQualified(ConsumerBuilder<T> builder) {
155+
if (this.topicBuilder == null) {
156+
return;
157+
}
158+
var builderImpl = (ConsumerBuilderImpl<T>) builder;
151159
var topicsPattern = builderImpl.getConf().getTopicsPattern();
152160
if (topicsPattern != null && StringUtils.isNoneBlank(topicsPattern.pattern())) {
153161
var topicsPatternStr = topicsPattern.pattern();

spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,15 @@
2727
import java.util.Collections;
2828
import java.util.List;
2929
import java.util.Map;
30+
import java.util.concurrent.CompletableFuture;
3031

3132
import org.apache.pulsar.client.api.ConsumerBuilder;
3233
import org.apache.pulsar.client.api.PulsarClient;
3334
import org.apache.pulsar.client.api.PulsarClientException;
3435
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
3536
import org.apache.pulsar.client.api.Schema;
37+
import org.apache.pulsar.client.impl.PatternMultiTopicsConsumerImpl;
38+
import org.assertj.core.api.InstanceOfAssertFactories;
3639
import org.jspecify.annotations.Nullable;
3740
import org.junit.jupiter.api.AfterEach;
3841
import org.junit.jupiter.api.BeforeEach;
@@ -271,26 +274,44 @@ void multipleConfigCustomizers() throws PulsarClientException {
271274
@Nested
272275
class CreateConsumerUsingPulsarTopicBuilder {
273276

274-
private DefaultPulsarConsumerFactory<String> consumerFactory;
275-
276-
private PulsarTopicBuilder pulsarTopicBuilder;
277-
278-
@BeforeEach
279-
void createConsumerFactory() {
280-
pulsarTopicBuilder = spy(new PulsarTopicBuilder());
281-
consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, null);
282-
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
283-
}
284-
285277
@Test
286-
void withPulsarTopicBuilder() throws PulsarClientException {
278+
void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientException {
279+
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
280+
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
281+
null);
282+
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
287283
try (var consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"),
288-
"with-pulsar-topic-builder-sub", null, null)) {
284+
"with-pulsar-topic-builder-ensure-topic-names-fully-qualified-sub", null, null)) {
289285
assertThat(consumer.getTopic()).isEqualTo("persistent://public/default/topic1");
290286
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic1");
291287
}
292288
}
293289

290+
@Test
291+
void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClientException {
292+
var pulsarTopicBuilder = spy(new PulsarTopicBuilder());
293+
ConsumerBuilderCustomizer<String> customizer = (builder) -> builder.topicsPattern("topic-.*");
294+
DefaultPulsarConsumerFactory<String> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
295+
List.of(customizer));
296+
consumerFactory.setTopicBuilder(pulsarTopicBuilder);
297+
298+
try (var consumer = consumerFactory.createConsumer(SCHEMA, null,
299+
"with-pulsar-topic-builder-ensure-topics-pattern-fully-qualified-sub", null, null)) {
300+
assertThat(consumer).isInstanceOf(PatternMultiTopicsConsumerImpl.class);
301+
var patternMultiTopicsConsumer = (PatternMultiTopicsConsumerImpl<String>) consumer;
302+
var topicsPattern = patternMultiTopicsConsumer.getPattern();
303+
assertThat(topicsPattern.inputPattern()).isEqualTo("persistent://public/default/topic-.*");
304+
verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic("topic-.*");
305+
CompletableFuture<?> watcherFuture = assertThat(patternMultiTopicsConsumer)
306+
.extracting("watcherFuture", InstanceOfAssertFactories.type(CompletableFuture.class))
307+
.actual();
308+
// Seems some bugs in PatternMultiTopicsConsumerImpl.closeAsync() method.
309+
// If watcherFuture is not completed, invoke pulsarClient.close() first
310+
// will cause exception.
311+
watcherFuture.join();
312+
}
313+
}
314+
294315
}
295316

296317
}

0 commit comments

Comments
 (0)