Skip to content

Commit 3277577

Browse files
committed
Add support for Pulsar default tenant/namespace
This commit allows Pulsar users to configure a default tenant and/or namespace to be used when producing or consuming messages to topic URLs that are not fully-qualified. The following changes accomplish this: - add `tenant` and `namespace` config props to `spring.pulsar.defaults` - auto-configure a `PulsarTopicBuilder` bean populated w/ above props - provide above topic builder to producer/consumer/reader factories (imperative and reactive) - add tests for all of the above
1 parent cc3e62d commit 3277577

File tree

8 files changed

+151
-18
lines changed

8 files changed

+151
-18
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfiguration.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.springframework.pulsar.core.PulsarProducerFactory;
5353
import org.springframework.pulsar.core.PulsarReaderFactory;
5454
import org.springframework.pulsar.core.PulsarTemplate;
55+
import org.springframework.pulsar.core.PulsarTopicBuilder;
5556
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
5657
import org.springframework.pulsar.core.SchemaResolver;
5758
import org.springframework.pulsar.core.TopicResolver;
@@ -88,24 +89,30 @@ public class PulsarAutoConfiguration {
8889
@ConditionalOnMissingBean(PulsarProducerFactory.class)
8990
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false")
9091
DefaultPulsarProducerFactory<?> pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
91-
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
92+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider,
93+
PulsarTopicBuilder topicBuilder) {
9294
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
9395
customizersProvider);
94-
return new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
96+
DefaultPulsarProducerFactory<?> producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
9597
lambdaSafeCustomizers, topicResolver);
98+
producerFactory.setTopicBuilder(topicBuilder);
99+
return producerFactory;
96100
}
97101

98102
@Bean
99103
@ConditionalOnMissingBean(PulsarProducerFactory.class)
100104
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true)
101105
CachingPulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
102-
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
106+
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider,
107+
PulsarTopicBuilder topicBuilder) {
103108
PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache();
104109
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
105110
customizersProvider);
106-
return new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
111+
CachingPulsarProducerFactory<?> producerFactory = new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
107112
lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(),
108113
cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity());
114+
producerFactory.setTopicBuilder(topicBuilder);
115+
return producerFactory;
109116
}
110117

111118
private List<ProducerBuilderCustomizer<Object>> lambdaSafeProducerBuilderCustomizers(
@@ -138,13 +145,16 @@ PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
138145
@Bean
139146
@ConditionalOnMissingBean(PulsarConsumerFactory.class)
140147
DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
141-
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider) {
148+
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider,
149+
PulsarTopicBuilder topicBuilder) {
142150
List<ConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
143151
customizers.add(this.propertiesMapper::customizeConsumerBuilder);
144152
customizers.addAll(customizersProvider.orderedStream().toList());
145153
List<ConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
146154
.of((builder) -> applyConsumerBuilderCustomizers(customizers, builder));
147-
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
155+
DefaultPulsarConsumerFactory<?> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
156+
consumerFactory.setTopicBuilder(topicBuilder);
157+
return consumerFactory;
148158
}
149159

150160
@Bean
@@ -181,13 +191,16 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
181191
@Bean
182192
@ConditionalOnMissingBean(PulsarReaderFactory.class)
183193
DefaultPulsarReaderFactory<?> pulsarReaderFactory(PulsarClient pulsarClient,
184-
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider) {
194+
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider,
195+
PulsarTopicBuilder topicBuilder) {
185196
List<ReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
186197
customizers.add(this.propertiesMapper::customizeReaderBuilder);
187198
customizers.addAll(customizersProvider.orderedStream().toList());
188199
List<ReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
189200
.of((builder) -> applyReaderBuilderCustomizers(customizers, builder));
190-
return new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers);
201+
DefaultPulsarReaderFactory<?> readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers);
202+
readerFactory.setTopicBuilder(topicBuilder);
203+
return readerFactory;
191204
}
192205

193206
@SuppressWarnings("unchecked")

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfiguration.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.pulsar.client.api.ClientBuilder;
2424
import org.apache.pulsar.client.api.PulsarClient;
2525
import org.apache.pulsar.client.api.Schema;
26+
import org.apache.pulsar.common.naming.TopicDomain;
2627
import org.apache.pulsar.common.schema.SchemaType;
2728

2829
import org.springframework.beans.factory.ObjectProvider;
@@ -41,6 +42,7 @@
4142
import org.springframework.pulsar.core.PulsarAdministration;
4243
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
4344
import org.springframework.pulsar.core.PulsarClientFactory;
45+
import org.springframework.pulsar.core.PulsarTopicBuilder;
4446
import org.springframework.pulsar.core.SchemaResolver;
4547
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
4648
import org.springframework.pulsar.core.TopicResolver;
@@ -176,4 +178,11 @@ PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration p
176178
properties.isFailFast(), properties.isPropagateFailures(), properties.isPropagateStopFailures());
177179
}
178180

181+
@Bean
182+
@ConditionalOnMissingBean(PulsarTopicBuilder.class)
183+
PulsarTopicBuilder pulsarTopicBuilder() {
184+
return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTenant(),
185+
this.properties.getDefaults().getNamespace());
186+
}
187+
179188
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarProperties.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,13 +250,43 @@ public Authentication getAuthentication() {
250250

251251
public static class Defaults {
252252

253+
/**
254+
* Default tenant to use when producing or consuming messages against a
255+
* non-fully-qualified topic URL. When not specified Pulsar uses a default tenant
256+
* of 'public'.
257+
*/
258+
private String tenant;
259+
260+
/**
261+
* Default namespace to use when producing or consuming messages against a
262+
* non-fully-qualified topic URL. When not specified Pulsar uses a default
263+
* namespace of 'default'.
264+
*/
265+
private String namespace;
266+
253267
/**
254268
* List of mappings from message type to topic name and schema info to use as a
255269
* defaults when a topic name and/or schema is not explicitly specified when
256270
* producing or consuming messages of the mapped type.
257271
*/
258272
private List<TypeMapping> typeMappings = new ArrayList<>();
259273

274+
public String getTenant() {
275+
return this.tenant;
276+
}
277+
278+
public void setTenant(String tenant) {
279+
this.tenant = tenant;
280+
}
281+
282+
public String getNamespace() {
283+
return this.namespace;
284+
}
285+
286+
public void setNamespace(String namespace) {
287+
this.namespace = namespace;
288+
}
289+
260290
public List<TypeMapping> getTypeMappings() {
261291
return this.typeMappings;
262292
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfiguration.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.springframework.context.annotation.Configuration;
4242
import org.springframework.context.annotation.Import;
4343
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
44+
import org.springframework.pulsar.core.PulsarTopicBuilder;
4445
import org.springframework.pulsar.core.SchemaResolver;
4546
import org.springframework.pulsar.core.TopicResolver;
4647
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
@@ -112,7 +113,8 @@ private ReactiveMessageSenderCache reactivePulsarMessageSenderCache(ProducerCach
112113
@ConditionalOnMissingBean(ReactivePulsarSenderFactory.class)
113114
DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
114115
ObjectProvider<ReactiveMessageSenderCache> reactiveMessageSenderCache, TopicResolver topicResolver,
115-
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider) {
116+
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider,
117+
PulsarTopicBuilder topicBuilder) {
116118
List<ReactiveMessageSenderBuilderCustomizer<?>> customizers = new ArrayList<>();
117119
customizers.add(this.propertiesMapper::customizeMessageSenderBuilder);
118120
customizers.addAll(customizersProvider.orderedStream().toList());
@@ -122,6 +124,7 @@ DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsar
122124
.withDefaultConfigCustomizers(lambdaSafeCustomizers)
123125
.withMessageSenderCache(reactiveMessageSenderCache.getIfAvailable())
124126
.withTopicResolver(topicResolver)
127+
.withTopicBuilder(topicBuilder)
125128
.build();
126129
}
127130

@@ -136,13 +139,17 @@ private void applyMessageSenderBuilderCustomizers(List<ReactiveMessageSenderBuil
136139
@ConditionalOnMissingBean(ReactivePulsarConsumerFactory.class)
137140
DefaultReactivePulsarConsumerFactory<?> reactivePulsarConsumerFactory(
138141
ReactivePulsarClient pulsarReactivePulsarClient,
139-
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider) {
142+
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider,
143+
PulsarTopicBuilder topicBuilder) {
140144
List<ReactiveMessageConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
141145
customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder);
142146
customizers.addAll(customizersProvider.orderedStream().toList());
143147
List<ReactiveMessageConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
144148
.of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder));
145-
return new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers);
149+
DefaultReactivePulsarConsumerFactory<?> consumerFactory =
150+
new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers);
151+
consumerFactory.setTopicBuilder(topicBuilder);
152+
return consumerFactory;
146153
}
147154

148155
@SuppressWarnings("unchecked")
@@ -167,13 +174,16 @@ DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainer
167174
@Bean
168175
@ConditionalOnMissingBean(ReactivePulsarReaderFactory.class)
169176
DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
170-
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider) {
177+
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider,
178+
PulsarTopicBuilder topicBuilder) {
171179
List<ReactiveMessageReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
172180
customizers.add(this.propertiesMapper::customizeMessageReaderBuilder);
173181
customizers.addAll(customizersProvider.orderedStream().toList());
174182
List<ReactiveMessageReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
175183
.of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder));
176-
return new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers);
184+
DefaultReactivePulsarReaderFactory<?> readerFactory = new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers);
185+
readerFactory.setTopicBuilder(topicBuilder);
186+
return readerFactory;
177187
}
178188

179189
@SuppressWarnings("unchecked")

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarAutoConfigurationTests.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.springframework.pulsar.core.PulsarProducerFactory;
6868
import org.springframework.pulsar.core.PulsarReaderFactory;
6969
import org.springframework.pulsar.core.PulsarTemplate;
70+
import org.springframework.pulsar.core.PulsarTopicBuilder;
7071
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
7172
import org.springframework.pulsar.core.SchemaResolver;
7273
import org.springframework.pulsar.core.TopicResolver;
@@ -219,7 +220,8 @@ void injectsExpectedBeans() {
219220
"spring.pulsar.producer.cache.enabled=false")
220221
.run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class)
221222
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
222-
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class)));
223+
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class))
224+
.hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class)));
223225
}
224226

225227
@ParameterizedTest
@@ -375,7 +377,8 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
375377
@Test
376378
void injectsExpectedBeans() {
377379
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class)
378-
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
380+
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
381+
.hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class)));
379382
}
380383

381384
@Test
@@ -574,7 +577,8 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
574577
@Test
575578
void injectsExpectedBeans() {
576579
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class)
577-
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
580+
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
581+
.hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class)));
578582
}
579583

580584
@Test

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarConfigurationTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.springframework.pulsar.core.PulsarAdministration;
5050
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
5151
import org.springframework.pulsar.core.PulsarClientFactory;
52+
import org.springframework.pulsar.core.PulsarTopicBuilder;
5253
import org.springframework.pulsar.core.SchemaResolver;
5354
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
5455
import org.springframework.pulsar.core.TopicResolver;
@@ -320,6 +321,34 @@ void whenHasDefaultsTypeMappingAddsToSchemaResolver() {
320321

321322
}
322323

324+
@Nested
325+
class TopicBuilderTests {
326+
327+
private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner;
328+
329+
@Test
330+
void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
331+
PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class);
332+
this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder)
333+
.run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder));
334+
}
335+
336+
@Test
337+
void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() {
338+
List<String> properties = new ArrayList<>();
339+
properties.add("spring.pulsar.defaults.tenant=my-tenant");
340+
properties.add("spring.pulsar.defaults.namespace=my-namespace");
341+
this.contextRunner.withPropertyValues(properties.toArray(String[]::new))
342+
.run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class)
343+
.asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class))
344+
.satisfies((topicBuilder -> {
345+
assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant");
346+
assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace");
347+
})));
348+
}
349+
350+
}
351+
323352
@Nested
324353
class FunctionAdministrationTests {
325354

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarPropertiesTests.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ void bindAuthentication() {
152152
}
153153

154154
@Nested
155-
class DefaultsProperties {
155+
class DefaultsTypeMappingProperties {
156156

157157
@Test
158158
void bindWhenNoTypeMappings() {
@@ -242,6 +242,29 @@ record TestMessage(String value) {
242242

243243
}
244244

245+
@Nested
246+
class DefaultsTenantNamespaceProperties {
247+
248+
@Test
249+
void bindWhenValuesNotSpecified() {
250+
assertThat(new PulsarProperties().getDefaults()).satisfies((defaults) -> {
251+
assertThat(defaults.getTenant()).isNull();
252+
assertThat(defaults.getNamespace()).isNull();
253+
});
254+
}
255+
256+
@Test
257+
void bindWhenValuesSpecified() {
258+
Map<String, String> map = new HashMap<>();
259+
map.put("spring.pulsar.defaults.tenant", "my-tenant");
260+
map.put("spring.pulsar.defaults.namespace", "my-namespace");
261+
PulsarProperties.Defaults properties = bindProperties(map).getDefaults();
262+
assertThat(properties.getTenant()).isEqualTo("my-tenant");
263+
assertThat(properties.getNamespace()).isEqualTo("my-namespace");
264+
}
265+
266+
}
267+
245268
@Nested
246269
class FunctionProperties {
247270

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/pulsar/PulsarReactiveAutoConfigurationTests.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.springframework.pulsar.core.DefaultSchemaResolver;
4949
import org.springframework.pulsar.core.DefaultTopicResolver;
5050
import org.springframework.pulsar.core.PulsarAdministration;
51+
import org.springframework.pulsar.core.PulsarTopicBuilder;
5152
import org.springframework.pulsar.core.SchemaResolver;
5253
import org.springframework.pulsar.core.TopicResolver;
5354
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
@@ -177,6 +178,9 @@ void injectsExpectedBeans() {
177178
assertThat(senderFactory)
178179
.extracting("topicResolver", InstanceOfAssertFactories.type(TopicResolver.class))
179180
.isSameAs(context.getBean(TopicResolver.class));
181+
assertThat(senderFactory)
182+
.extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class))
183+
.isSameAs(context.getBean(PulsarTopicBuilder.class));
180184
});
181185
}
182186

@@ -252,13 +256,19 @@ class ConsumerFactoryTests {
252256
@Test
253257
void injectsExpectedBeans() {
254258
ReactivePulsarClient client = mock(ReactivePulsarClient.class);
255-
this.contextRunner.withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client)
259+
PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class);
260+
this.contextRunner
261+
.withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client)
262+
.withBean("customTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder)
256263
.run((context) -> {
257264
ReactivePulsarConsumerFactory<?> consumerFactory = context
258265
.getBean(DefaultReactivePulsarConsumerFactory.class);
259266
assertThat(consumerFactory)
260267
.extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class))
261268
.isSameAs(client);
269+
assertThat(consumerFactory)
270+
.extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class))
271+
.isSameAs(topicBuilder);
262272
});
263273
}
264274

@@ -362,14 +372,19 @@ class ReaderFactoryTests {
362372
@Test
363373
void injectsExpectedBeans() {
364374
ReactivePulsarClient client = mock(ReactivePulsarClient.class);
375+
PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class);
365376
this.contextRunner.withPropertyValues("spring.pulsar.reader.name=test-reader")
366377
.withBean("customReactivePulsarClient", ReactivePulsarClient.class, () -> client)
378+
.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder)
367379
.run((context) -> {
368380
DefaultReactivePulsarReaderFactory<?> readerFactory = context
369381
.getBean(DefaultReactivePulsarReaderFactory.class);
370382
assertThat(readerFactory)
371383
.extracting("reactivePulsarClient", InstanceOfAssertFactories.type(ReactivePulsarClient.class))
372384
.isSameAs(client);
385+
assertThat(readerFactory)
386+
.extracting("topicBuilder", InstanceOfAssertFactories.type(PulsarTopicBuilder.class))
387+
.isSameAs(topicBuilder);
373388
});
374389
}
375390

0 commit comments

Comments
 (0)