Skip to content

Commit bce10bd

Browse files
authored
Ensure default topics are fully qualified (#849)
* Ensure default topics are fully qualified Prior to this commit, only the topics specified on the `@PulsarListener` were being fully qualified in the `DefaultPulsarConsumerFactory`. With this change all topics (including the default topics driven by the Spring Boot `spring.pulsar.consumer.topics` config prop) are fully-qualified.
1 parent b16bfae commit bce10bd

File tree

2 files changed

+17
-6
lines changed

2 files changed

+17
-6
lines changed

integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2023 the original author or authors.
2+
* Copyright 2022-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -160,7 +160,8 @@ static class ConfigPropsDrivenListenerConfig {
160160
public void listen(String ignored, Consumer<String> consumer) {
161161
assertThat(consumer).extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class))
162162
.satisfies((conf) -> {
163-
assertThat(conf.getSingleTopic()).isEqualTo("plit-config-props-topic-dev");
163+
assertThat(conf.getSingleTopic())
164+
.isEqualTo("persistent://public/default/plit-config-props-topic-dev");
164165
assertThat(conf.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
165166
assertThat(conf.getSubscriptionName()).isEqualTo("plit-config-props-subs-dev");
166167
});

spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,11 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
101101
Objects.requireNonNull(schema, "Schema must be specified");
102102
ConsumerBuilder<T> consumerBuilder = this.pulsarClient.newConsumer(schema);
103103

104-
// Apply the default config customizer (preserve the topic)
104+
// Apply the default config customizer
105105
if (!CollectionUtils.isEmpty(this.defaultConfigCustomizers)) {
106106
this.defaultConfigCustomizers.forEach((customizer -> customizer.customize(consumerBuilder)));
107107
}
108+
// Preserve the passed in topics (don't let default config customizer win)
108109
if (topics != null) {
109110
replaceTopicsOnBuilder(consumerBuilder, topics);
110111
}
@@ -117,6 +118,9 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
117118
if (!CollectionUtils.isEmpty(customizers)) {
118119
customizers.forEach(customizer -> customizer.customize(consumerBuilder));
119120
}
121+
if (this.topicBuilder != null) {
122+
this.ensureTopicNamesFullyQualified(consumerBuilder);
123+
}
120124
try {
121125
return consumerBuilder.subscribe();
122126
}
@@ -126,9 +130,6 @@ public Consumer<T> createConsumer(Schema<T> schema, @Nullable Collection<String>
126130
}
127131

128132
private void replaceTopicsOnBuilder(ConsumerBuilder<T> builder, Collection<String> topics) {
129-
if (this.topicBuilder != null) {
130-
topics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
131-
}
132133
var builderImpl = (ConsumerBuilderImpl<T>) builder;
133134
builderImpl.getConf().setTopicNames(new HashSet<>(topics));
134135
}
@@ -139,4 +140,13 @@ private void replaceMetadataPropertiesOnBuilder(ConsumerBuilder<T> builder,
139140
builderImpl.getConf().setProperties(new TreeMap<>(metadataProperties));
140141
}
141142

143+
protected void ensureTopicNamesFullyQualified(ConsumerBuilder<T> builder) {
144+
var builderImpl = (ConsumerBuilderImpl<T>) builder;
145+
var topics = builderImpl.getConf().getTopicNames();
146+
if (!CollectionUtils.isEmpty(topics)) {
147+
var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList();
148+
builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics));
149+
}
150+
}
151+
142152
}

0 commit comments

Comments
 (0)