Skip to content

Commit 9e3eb15

Browse files
committed
Add Pulsar container factory customizers
This commit adds the ability for users to customize the auto-configured Spring for Apache Pulsar message container factories. Each container factory holds a set of container properties that is a common target for users to configure. Allowing the customization of these properties prevents a rapid increase of configuration properties.
1 parent 6cd6f75 commit 9e3eb15

File tree

8 files changed

+226
-12
lines changed

8 files changed

+226
-12
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import org.springframework.pulsar.config.ConcurrentPulsarListenerContainerFactory;
20+
21+
/**
22+
* Callback interface that can be implemented to customize a
23+
* {@link ConcurrentPulsarListenerContainerFactory}.
24+
*
25+
* @author Chris Bono
26+
*/
27+
@FunctionalInterface
28+
public interface ConcurrentPulsarListenerContainerFactoryCustomizer {
29+
30+
/**
31+
* Customize a {@link ConcurrentPulsarListenerContainerFactory}.
32+
* @param containerFactory the factory to customize
33+
*/
34+
void customize(ConcurrentPulsarListenerContainerFactory<?> containerFactory);
35+
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import org.springframework.pulsar.config.DefaultPulsarReaderContainerFactory;
20+
21+
/**
22+
* Callback interface that can be implemented to customize a
23+
* {@link DefaultPulsarReaderContainerFactory}.
24+
*
25+
* @author Chris Bono
26+
*/
27+
@FunctionalInterface
28+
public interface DefaultPulsarReaderContainerFactoryCustomizer {
29+
30+
/**
31+
* Customize a {@link DefaultPulsarReaderContainerFactory}.
32+
* @param containerFactory the factory to customize
33+
*/
34+
void customize(DefaultPulsarReaderContainerFactory<?> containerFactory);
35+
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.pulsar;
18+
19+
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
20+
21+
/**
22+
* Callback interface that can be implemented to customize a
23+
* {@link DefaultReactivePulsarListenerContainerFactory}.
24+
*
25+
* @author Chris Bono
26+
*/
27+
@FunctionalInterface
28+
public interface DefaultReactivePulsarListenerContainerFactoryCustomizer {
29+
30+
/**
31+
* Customize a {@link DefaultReactivePulsarListenerContainerFactory}.
32+
* @param containerFactory the factory to customize
33+
*/
34+
void customize(DefaultReactivePulsarListenerContainerFactory<?> containerFactory);
35+
36+
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ private void applyConsumerBuilderCustomizers(List<ConsumerBuilderCustomizer<?>>
178178
ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
179179
PulsarConsumerFactory<Object> pulsarConsumerFactory, SchemaResolver schemaResolver,
180180
TopicResolver topicResolver, ObjectProvider<PulsarAwareTransactionManager> pulsarTransactionManager,
181+
ObjectProvider<ConcurrentPulsarListenerContainerFactoryCustomizer> customizersProvider,
181182
Environment environment) {
182183
PulsarContainerProperties containerProperties = new PulsarContainerProperties();
183184
containerProperties.setSchemaResolver(schemaResolver);
@@ -187,7 +188,10 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
187188
}
188189
pulsarTransactionManager.ifUnique(containerProperties.transactions()::setTransactionManager);
189190
this.propertiesMapper.customizeContainerProperties(containerProperties);
190-
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
191+
ConcurrentPulsarListenerContainerFactory<?> containerFactory = new ConcurrentPulsarListenerContainerFactory<>(
192+
pulsarConsumerFactory, containerProperties);
193+
customizersProvider.orderedStream().forEachOrdered((customizer) -> customizer.customize(containerFactory));
194+
return containerFactory;
191195
}
192196

193197
@Bean
@@ -215,14 +219,19 @@ private void applyReaderBuilderCustomizers(List<ReaderBuilderCustomizer<?>> cust
215219
@Bean
216220
@ConditionalOnMissingBean(name = "pulsarReaderContainerFactory")
217221
DefaultPulsarReaderContainerFactory<?> pulsarReaderContainerFactory(PulsarReaderFactory<?> pulsarReaderFactory,
218-
SchemaResolver schemaResolver, Environment environment) {
222+
SchemaResolver schemaResolver,
223+
ObjectProvider<DefaultPulsarReaderContainerFactoryCustomizer> customizersProvider,
224+
Environment environment) {
219225
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
220226
readerContainerProperties.setSchemaResolver(schemaResolver);
221227
if (Threading.VIRTUAL.isActive(environment)) {
222228
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor("pulsar-reader-"));
223229
}
224230
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
225-
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);
231+
DefaultPulsarReaderContainerFactory<?> containerFactory = new DefaultPulsarReaderContainerFactory<>(
232+
pulsarReaderFactory, readerContainerProperties);
233+
customizersProvider.orderedStream().forEachOrdered((customizer) -> customizer.customize(containerFactory));
234+
return containerFactory;
226235
}
227236

228237
@Configuration(proxyBeanMethods = false)

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,16 @@ private void applyMessageConsumerBuilderCustomizers(List<ReactiveMessageConsumer
164164
@ConditionalOnMissingBean(name = "reactivePulsarListenerContainerFactory")
165165
DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainerFactory(
166166
ReactivePulsarConsumerFactory<Object> reactivePulsarConsumerFactory, SchemaResolver schemaResolver,
167-
TopicResolver topicResolver) {
167+
TopicResolver topicResolver,
168+
ObjectProvider<DefaultReactivePulsarListenerContainerFactoryCustomizer> customizersProvider) {
168169
ReactivePulsarContainerProperties<Object> containerProperties = new ReactivePulsarContainerProperties<>();
169170
containerProperties.setSchemaResolver(schemaResolver);
170171
containerProperties.setTopicResolver(topicResolver);
171172
this.propertiesMapper.customizeContainerProperties(containerProperties);
172-
return new DefaultReactivePulsarListenerContainerFactory<>(reactivePulsarConsumerFactory, containerProperties);
173+
DefaultReactivePulsarListenerContainerFactory<?> containerFactory = new DefaultReactivePulsarListenerContainerFactory<>(
174+
reactivePulsarConsumerFactory, containerProperties);
175+
customizersProvider.orderedStream().forEachOrdered((customizer) -> customizer.customize(containerFactory));
176+
return containerFactory;
173177
}
174178

175179
@Bean

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.ArrayList;
2020
import java.util.List;
21+
import java.util.Objects;
2122
import java.util.concurrent.ThreadFactory;
2223
import java.util.concurrent.TimeUnit;
2324

@@ -585,6 +586,36 @@ void whenTransactionEnabledFalseListenerContainerShouldNotUseTransactions() {
585586
});
586587
}
587588

589+
@Test
590+
void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
591+
this.contextRunner.withUserConfiguration(ListenerContainerFactoryCustomizersConfig.class)
592+
.run((context) -> assertThat(context).getBean(ConcurrentPulsarListenerContainerFactory.class)
593+
.hasFieldOrPropertyWithValue("containerProperties.subscriptionName", ":bar:foo"));
594+
}
595+
596+
@TestConfiguration(proxyBeanMethods = false)
597+
static class ListenerContainerFactoryCustomizersConfig {
598+
599+
@Bean
600+
@Order(200)
601+
ConcurrentPulsarListenerContainerFactoryCustomizer customizerFoo() {
602+
return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo");
603+
}
604+
605+
@Bean
606+
@Order(100)
607+
ConcurrentPulsarListenerContainerFactoryCustomizer customizerBar() {
608+
return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar");
609+
}
610+
611+
private void appendToSubscriptionName(ConcurrentPulsarListenerContainerFactory<?> containerFactory,
612+
String valueToAppend) {
613+
String name = Objects.toString(containerFactory.getContainerProperties().getSubscriptionName(), "");
614+
containerFactory.getContainerProperties().setSubscriptionName(name.concat(valueToAppend));
615+
}
616+
617+
}
618+
588619
}
589620

590621
@Nested
@@ -617,7 +648,7 @@ void hasNoTopicBuilderWhenTopicDefaultsAreDisabled() {
617648
}
618649

619650
@Test
620-
<T> void whenHasUserDefinedCustomizersAppliesInCorrectOrder() {
651+
<T> void whenHasUserDefinedReaderBuilderCustomizersAppliesInCorrectOrder() {
621652
this.contextRunner.withPropertyValues("spring.pulsar.reader.name=fromPropsCustomizer")
622653
.withUserConfiguration(ReaderBuilderCustomizersConfig.class)
623654
.run((context) -> {
@@ -654,6 +685,13 @@ void whenVirtualThreadsAreEnabledOnJava20AndEarlierReaderShouldNotUseVirtualThre
654685
});
655686
}
656687

688+
@Test
689+
void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() {
690+
this.contextRunner.withUserConfiguration(ReaderContainerFactoryCustomizersConfig.class)
691+
.run((context) -> assertThat(context).getBean(DefaultPulsarReaderContainerFactory.class)
692+
.hasFieldOrPropertyWithValue("containerProperties.readerListener", ":bar:foo"));
693+
}
694+
657695
@TestConfiguration(proxyBeanMethods = false)
658696
static class ReaderBuilderCustomizersConfig {
659697

@@ -671,6 +709,29 @@ ReaderBuilderCustomizer<?> customizerBar() {
671709

672710
}
673711

712+
@TestConfiguration(proxyBeanMethods = false)
713+
static class ReaderContainerFactoryCustomizersConfig {
714+
715+
@Bean
716+
@Order(200)
717+
DefaultPulsarReaderContainerFactoryCustomizer customizerFoo() {
718+
return (containerFactory) -> appendToReaderListener(containerFactory, ":foo");
719+
}
720+
721+
@Bean
722+
@Order(100)
723+
DefaultPulsarReaderContainerFactoryCustomizer customizerBar() {
724+
return (containerFactory) -> appendToReaderListener(containerFactory, ":bar");
725+
}
726+
727+
private void appendToReaderListener(DefaultPulsarReaderContainerFactory<?> containerFactory,
728+
String valueToAppend) {
729+
String name = Objects.toString(containerFactory.getContainerProperties().getReaderListener(), "");
730+
containerFactory.getContainerProperties().setReaderListener(name.concat(valueToAppend));
731+
}
732+
733+
}
734+
674735
}
675736

676737
@Nested

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.ArrayList;
2121
import java.util.List;
22+
import java.util.Objects;
2223
import java.util.function.Supplier;
2324

2425
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -382,6 +383,36 @@ void injectsExpectedBeans() {
382383
});
383384
}
384385

386+
@Test
387+
void whenHasUserDefinedFactoryCustomizersAppliesInCorrectOrder() {
388+
this.contextRunner.withUserConfiguration(ListenerContainerFactoryCustomizersConfig.class)
389+
.run((context) -> assertThat(context).getBean(DefaultReactivePulsarListenerContainerFactory.class)
390+
.hasFieldOrPropertyWithValue("containerProperties.subscriptionName", ":bar:foo"));
391+
}
392+
393+
@TestConfiguration(proxyBeanMethods = false)
394+
static class ListenerContainerFactoryCustomizersConfig {
395+
396+
@Bean
397+
@Order(200)
398+
DefaultReactivePulsarListenerContainerFactoryCustomizer customizerFoo() {
399+
return (containerFactory) -> appendToSubscriptionName(containerFactory, ":foo");
400+
}
401+
402+
@Bean
403+
@Order(100)
404+
DefaultReactivePulsarListenerContainerFactoryCustomizer customizerBar() {
405+
return (containerFactory) -> appendToSubscriptionName(containerFactory, ":bar");
406+
}
407+
408+
private void appendToSubscriptionName(DefaultReactivePulsarListenerContainerFactory<?> containerFactory,
409+
String valueToAppend) {
410+
String name = Objects.toString(containerFactory.getContainerProperties().getSubscriptionName(), "");
411+
containerFactory.getContainerProperties().setSubscriptionName(name.concat(valueToAppend));
412+
}
413+
414+
}
415+
385416
}
386417

387418
@Nested

spring-boot-project/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/pulsar.adoc

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,11 @@ include-code::MyBean[]
150150
Spring Boot auto-configuration provides all the components necessary for `PulsarListener`, such as the `PulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying Pulsar consumers.
151151
You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties.
152152

153-
If you need more control over the consumer factory configuration, consider registering one or more `ConsumerBuilderCustomizer` beans.
153+
If you need more control over the configuration of the consumer factory used by the container factory to create consumers, consider registering one or more `ConsumerBuilderCustomizer` beans.
154154
These customizers are applied to all consumers created by the factory, and therefore all `@PulsarListener` instances.
155155
You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@PulsarListener` annotation.
156156

157-
157+
If you need more control over the actual container factory configuration, consider registering one or more `ConcurrentPulsarListenerContainerFactoryCustomizer` beans.
158158

159159
[[messaging.pulsar.receiving-reactive]]
160160
== Receiving a Message Reactively
@@ -165,13 +165,13 @@ The following component creates a reactive listener endpoint on the `someTopic`
165165
include-code::MyBean[]
166166

167167
Spring Boot auto-configuration provides all the components necessary for `ReactivePulsarListener`, such as the `ReactivePulsarListenerContainerFactory` and the consumer factory it uses to construct the underlying reactive Pulsar consumers.
168-
You can configure these components by specifying any of the `spring.pulsar.listener.*` and `spring.pulsar.consumer.*` prefixed application properties.
168+
You can configure these components by specifying any of the `spring.pulsar.listener.\*` and `spring.pulsar.consumer.*` prefixed application properties.
169169

170-
If you need more control over the consumer factory configuration, consider registering one or more `ReactiveMessageConsumerBuilderCustomizer` beans.
170+
If you need more control over the configuration of the consumer factory used by the container factory to create consumers, consider registering one or more `ReactiveMessageConsumerBuilderCustomizer` beans.
171171
These customizers are applied to all consumers created by the factory, and therefore all `@ReactivePulsarListener` instances.
172172
You can also customize a single listener by setting the `consumerCustomizer` attribute of the `@ReactivePulsarListener` annotation.
173173

174-
174+
If you need more control over the actual container factory configuration, consider registering one or more `DefaultReactivePulsarListenerContainerFactoryCustomizer` beans.
175175

176176
[[messaging.pulsar.reading]]
177177
== Reading a Message
@@ -187,10 +187,11 @@ include-code::MyBean[]
187187
The `@PulsarReader` relies on a `PulsarReaderFactory` to create the underlying Pulsar reader.
188188
Spring Boot auto-configuration provides this reader factory which can be customized by setting any of the `spring.pulsar.reader.*` prefixed application properties.
189189

190-
If you need more control over the reader factory configuration, consider registering one or more `ReaderBuilderCustomizer` beans.
190+
If you need more control over the configuration of the reader factory used by the container factory to create readers, consider registering one or more `ReaderBuilderCustomizer` beans.
191191
These customizers are applied to all readers created by the factory, and therefore all `@PulsarReader` instances.
192192
You can also customize a single listener by setting the `readerCustomizer` attribute of the `@PulsarReader` annotation.
193193

194+
If you need more control over the actual container factory configuration, consider registering one or more `DefaultPulsarReaderContainerFactoryCustomizer` beans.
194195

195196

196197
[[messaging.pulsar.reading-reactive]]

0 commit comments

Comments
 (0)