Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarReaderFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
Expand Down Expand Up @@ -88,24 +89,29 @@ public class PulsarAutoConfiguration {
@ConditionalOnMissingBean(PulsarProducerFactory.class)
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "false")
DefaultPulsarProducerFactory<?> pulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
customizersProvider);
return new DefaultPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
lambdaSafeCustomizers, topicResolver);
DefaultPulsarProducerFactory<?> producerFactory = new DefaultPulsarProducerFactory<>(pulsarClient,
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver);
producerFactory.setTopicBuilder(topicBuilder);
return producerFactory;
}

@Bean
@ConditionalOnMissingBean(PulsarProducerFactory.class)
@ConditionalOnProperty(name = "spring.pulsar.producer.cache.enabled", havingValue = "true", matchIfMissing = true)
CachingPulsarProducerFactory<?> cachingPulsarProducerFactory(PulsarClient pulsarClient, TopicResolver topicResolver,
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ProducerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
PulsarProperties.Producer.Cache cacheProperties = this.properties.getProducer().getCache();
List<ProducerBuilderCustomizer<Object>> lambdaSafeCustomizers = lambdaSafeProducerBuilderCustomizers(
customizersProvider);
return new CachingPulsarProducerFactory<>(pulsarClient, this.properties.getProducer().getTopicName(),
lambdaSafeCustomizers, topicResolver, cacheProperties.getExpireAfterAccess(),
cacheProperties.getMaximumSize(), cacheProperties.getInitialCapacity());
CachingPulsarProducerFactory<?> producerFactory = new CachingPulsarProducerFactory<>(pulsarClient,
this.properties.getProducer().getTopicName(), lambdaSafeCustomizers, topicResolver,
cacheProperties.getExpireAfterAccess(), cacheProperties.getMaximumSize(),
cacheProperties.getInitialCapacity());
producerFactory.setTopicBuilder(topicBuilder);
return producerFactory;
}

private List<ProducerBuilderCustomizer<Object>> lambdaSafeProducerBuilderCustomizers(
Expand Down Expand Up @@ -138,13 +144,16 @@ PulsarTemplate<?> pulsarTemplate(PulsarProducerFactory<?> pulsarProducerFactory,
@Bean
@ConditionalOnMissingBean(PulsarConsumerFactory.class)
DefaultPulsarConsumerFactory<?> pulsarConsumerFactory(PulsarClient pulsarClient,
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ConsumerBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
List<ConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeConsumerBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyConsumerBuilderCustomizers(customizers, builder));
return new DefaultPulsarConsumerFactory<>(pulsarClient, lambdaSafeCustomizers);
DefaultPulsarConsumerFactory<?> consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient,
lambdaSafeCustomizers);
consumerFactory.setTopicBuilder(topicBuilder);
return consumerFactory;
}

@Bean
Expand Down Expand Up @@ -181,13 +190,16 @@ ConcurrentPulsarListenerContainerFactory<?> pulsarListenerContainerFactory(
@Bean
@ConditionalOnMissingBean(PulsarReaderFactory.class)
DefaultPulsarReaderFactory<?> pulsarReaderFactory(PulsarClient pulsarClient,
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReaderBuilderCustomizer<?>> customizersProvider, PulsarTopicBuilder topicBuilder) {
List<ReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeReaderBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyReaderBuilderCustomizers(customizers, builder));
return new DefaultPulsarReaderFactory<>(pulsarClient, lambdaSafeCustomizers);
DefaultPulsarReaderFactory<?> readerFactory = new DefaultPulsarReaderFactory<>(pulsarClient,
lambdaSafeCustomizers);
readerFactory.setTopicBuilder(topicBuilder);
return readerFactory;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.schema.SchemaType;

import org.springframework.beans.factory.ObjectProvider;
Expand All @@ -34,13 +35,15 @@
import org.springframework.boot.util.LambdaSafe;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.pulsar.core.DefaultPulsarClientFactory;
import org.springframework.pulsar.core.DefaultSchemaResolver;
import org.springframework.pulsar.core.DefaultTopicResolver;
import org.springframework.pulsar.core.PulsarAdminBuilderCustomizer;
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
import org.springframework.pulsar.core.PulsarClientFactory;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
import org.springframework.pulsar.core.TopicResolver;
Expand Down Expand Up @@ -176,4 +179,12 @@ PulsarFunctionAdministration pulsarFunctionAdministration(PulsarAdministration p
properties.isFailFast(), properties.isPropagateFailures(), properties.isPropagateStopFailures());
}

@Bean
@Scope("prototype")
@ConditionalOnMissingBean
PulsarTopicBuilder pulsarTopicBuilder() {
return new PulsarTopicBuilder(TopicDomain.persistent, this.properties.getDefaults().getTenant(),
this.properties.getDefaults().getNamespace());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,43 @@ public Authentication getAuthentication() {

public static class Defaults {

/**
* Default tenant to use when producing or consuming messages against a
* non-fully-qualified topic URL. When not specified Pulsar uses a default tenant
* of 'public'.
*/
private String tenant;

/**
* Default namespace to use when producing or consuming messages against a
* non-fully-qualified topic URL. When not specified Pulsar uses a default
* namespace of 'default'.
*/
private String namespace;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As these properties both relate to topics, it would be good to put them both in a topic group so they'd become spring.pulsar.defaults.topic.tenant and spring.pulsar.defaults.topic.namespace. I think that still makes sense given that typeMappings is partially topic related. What do you think though, @cbono?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion as this also gives way to adding an enabled flag to the spring.pulsar.defaults.topic to disable default topics if so desired.

/**
* List of mappings from message type to topic name and schema info to use as a
* defaults when a topic name and/or schema is not explicitly specified when
* producing or consuming messages of the mapped type.
*/
private List<TypeMapping> typeMappings = new ArrayList<>();

public String getTenant() {
return this.tenant;
}

public void setTenant(String tenant) {
this.tenant = tenant;
}

public String getNamespace() {
return this.namespace;
}

public void setNamespace(String namespace) {
this.namespace = namespace;
}

public List<TypeMapping> getTypeMappings() {
return this.typeMappings;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.pulsar.config.PulsarAnnotationSupportBeanNames;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
import org.springframework.pulsar.reactive.config.DefaultReactivePulsarListenerContainerFactory;
Expand Down Expand Up @@ -112,7 +113,8 @@ private ReactiveMessageSenderCache reactivePulsarMessageSenderCache(ProducerCach
@ConditionalOnMissingBean(ReactivePulsarSenderFactory.class)
DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsarClient reactivePulsarClient,
ObjectProvider<ReactiveMessageSenderCache> reactiveMessageSenderCache, TopicResolver topicResolver,
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReactiveMessageSenderBuilderCustomizer<?>> customizersProvider,
PulsarTopicBuilder topicBuilder) {
List<ReactiveMessageSenderBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeMessageSenderBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
Expand All @@ -122,6 +124,7 @@ DefaultReactivePulsarSenderFactory<?> reactivePulsarSenderFactory(ReactivePulsar
.withDefaultConfigCustomizers(lambdaSafeCustomizers)
.withMessageSenderCache(reactiveMessageSenderCache.getIfAvailable())
.withTopicResolver(topicResolver)
.withTopicBuilder(topicBuilder)
.build();
}

Expand All @@ -136,13 +139,17 @@ private void applyMessageSenderBuilderCustomizers(List<ReactiveMessageSenderBuil
@ConditionalOnMissingBean(ReactivePulsarConsumerFactory.class)
DefaultReactivePulsarConsumerFactory<?> reactivePulsarConsumerFactory(
ReactivePulsarClient pulsarReactivePulsarClient,
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<?>> customizersProvider,
PulsarTopicBuilder topicBuilder) {
List<ReactiveMessageConsumerBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeMessageConsumerBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ReactiveMessageConsumerBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyMessageConsumerBuilderCustomizers(customizers, builder));
return new DefaultReactivePulsarConsumerFactory<>(pulsarReactivePulsarClient, lambdaSafeCustomizers);
DefaultReactivePulsarConsumerFactory<?> consumerFactory = new DefaultReactivePulsarConsumerFactory<>(
pulsarReactivePulsarClient, lambdaSafeCustomizers);
consumerFactory.setTopicBuilder(topicBuilder);
return consumerFactory;
}

@SuppressWarnings("unchecked")
Expand All @@ -167,13 +174,17 @@ DefaultReactivePulsarListenerContainerFactory<?> reactivePulsarListenerContainer
@Bean
@ConditionalOnMissingBean(ReactivePulsarReaderFactory.class)
DefaultReactivePulsarReaderFactory<?> reactivePulsarReaderFactory(ReactivePulsarClient reactivePulsarClient,
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider) {
ObjectProvider<ReactiveMessageReaderBuilderCustomizer<?>> customizersProvider,
PulsarTopicBuilder topicBuilder) {
List<ReactiveMessageReaderBuilderCustomizer<?>> customizers = new ArrayList<>();
customizers.add(this.propertiesMapper::customizeMessageReaderBuilder);
customizers.addAll(customizersProvider.orderedStream().toList());
List<ReactiveMessageReaderBuilderCustomizer<Object>> lambdaSafeCustomizers = List
.of((builder) -> applyMessageReaderBuilderCustomizers(customizers, builder));
return new DefaultReactivePulsarReaderFactory<>(reactivePulsarClient, lambdaSafeCustomizers);
DefaultReactivePulsarReaderFactory<?> readerFactory = new DefaultReactivePulsarReaderFactory<>(
reactivePulsarClient, lambdaSafeCustomizers);
readerFactory.setTopicBuilder(topicBuilder);
return readerFactory;
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.springframework.pulsar.core.PulsarProducerFactory;
import org.springframework.pulsar.core.PulsarReaderFactory;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.ReaderBuilderCustomizer;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.TopicResolver;
Expand Down Expand Up @@ -219,7 +220,8 @@ void injectsExpectedBeans() {
"spring.pulsar.producer.cache.enabled=false")
.run((context) -> assertThat(context).getBean(DefaultPulsarProducerFactory.class)
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class)));
.hasFieldOrPropertyWithValue("topicResolver", context.getBean(TopicResolver.class))
.hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class)));
}

@ParameterizedTest
Expand Down Expand Up @@ -375,7 +377,8 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
@Test
void injectsExpectedBeans() {
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarConsumerFactory.class)
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
.hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class)));
}

@Test
Expand Down Expand Up @@ -574,7 +577,8 @@ void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
@Test
void injectsExpectedBeans() {
this.contextRunner.run((context) -> assertThat(context).getBean(DefaultPulsarReaderFactory.class)
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class)));
.hasFieldOrPropertyWithValue("pulsarClient", context.getBean(PulsarClient.class))
.hasFieldOrPropertyWithValue("topicBuilder", context.getBean(PulsarTopicBuilder.class)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.springframework.pulsar.core.PulsarAdministration;
import org.springframework.pulsar.core.PulsarClientBuilderCustomizer;
import org.springframework.pulsar.core.PulsarClientFactory;
import org.springframework.pulsar.core.PulsarTopicBuilder;
import org.springframework.pulsar.core.SchemaResolver;
import org.springframework.pulsar.core.SchemaResolver.SchemaResolverCustomizer;
import org.springframework.pulsar.core.TopicResolver;
Expand Down Expand Up @@ -320,6 +321,34 @@ void whenHasDefaultsTypeMappingAddsToSchemaResolver() {

}

@Nested
class TopicBuilderTests {

private final ApplicationContextRunner contextRunner = PulsarConfigurationTests.this.contextRunner;

@Test
void whenHasUserDefinedBeanDoesNotAutoConfigureBean() {
PulsarTopicBuilder topicBuilder = mock(PulsarTopicBuilder.class);
this.contextRunner.withBean("customPulsarTopicBuilder", PulsarTopicBuilder.class, () -> topicBuilder)
.run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class).isSameAs(topicBuilder));
}

@Test
void whenHasDefaultsTenantAndNamespaceAppliedToTopicBuilder() {
List<String> properties = new ArrayList<>();
properties.add("spring.pulsar.defaults.tenant=my-tenant");
properties.add("spring.pulsar.defaults.namespace=my-namespace");
this.contextRunner.withPropertyValues(properties.toArray(String[]::new))
.run((context) -> assertThat(context).getBean(PulsarTopicBuilder.class)
.asInstanceOf(InstanceOfAssertFactories.type(PulsarTopicBuilder.class))
.satisfies((topicBuilder) -> {
assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultTenant", "my-tenant");
assertThat(topicBuilder).hasFieldOrPropertyWithValue("defaultNamespace", "my-namespace");
}));
}

}

@Nested
class FunctionAdministrationTests {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ void bindAuthentication() {
}

@Nested
class DefaultsProperties {
class DefaultsTypeMappingProperties {

@Test
void bindWhenNoTypeMappings() {
Expand Down Expand Up @@ -242,6 +242,29 @@ record TestMessage(String value) {

}

@Nested
class DefaultsTenantNamespaceProperties {

@Test
void bindWhenValuesNotSpecified() {
assertThat(new PulsarProperties().getDefaults()).satisfies((defaults) -> {
assertThat(defaults.getTenant()).isNull();
assertThat(defaults.getNamespace()).isNull();
});
}

@Test
void bindWhenValuesSpecified() {
Map<String, String> map = new HashMap<>();
map.put("spring.pulsar.defaults.tenant", "my-tenant");
map.put("spring.pulsar.defaults.namespace", "my-namespace");
PulsarProperties.Defaults properties = bindProperties(map).getDefaults();
assertThat(properties.getTenant()).isEqualTo("my-tenant");
assertThat(properties.getNamespace()).isEqualTo("my-namespace");
}

}

@Nested
class FunctionProperties {

Expand Down
Loading
Loading