From a0a5d0bc136c29b405908e6ad449c9d4dda6bede Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Sat, 31 Aug 2024 09:16:22 -0500 Subject: [PATCH] Move default subscription name to factory This commit moves the default subscription name from the `@PulsarListener` and `@ReactivePulsarListener` annotation to the corresponding container factory (props) which allows the `spring.pulsar.consumer.subscription.name` config prop to be respected. See https://github.com/spring-projects/spring-boot/issues/42053 --- .../reference/pulsar/message-consumption.adoc | 3 - .../reactive-message-consumption.adoc | 5 - ...eactivePulsarListenerContainerFactory.java | 77 +++++----- ...arListenerAnnotationBeanPostProcessor.java | 24 ++-- ...vePulsarListenerContainerFactoryTests.java | 51 +++++++ .../listener/ReactivePulsarListenerTests.java | 72 +++++----- ...arListenerAnnotationBeanPostProcessor.java | 16 +-- ...currentPulsarListenerContainerFactory.java | 70 +++++----- .../PulsarListenerEndpointRegistrar.java | 2 +- ...ntPulsarListenerContainerFactoryTests.java | 131 ++++++++++++++++++ ...ntPulsarMessageListenerContainerTests.java | 45 ------ .../pulsar/listener/PulsarListenerTests.java | 74 ++++++++++ 12 files changed, 391 insertions(+), 179 deletions(-) create mode 100644 spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar/message-consumption.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar/message-consumption.adoc index 3546702aa..1382a8043 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar/message-consumption.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/pulsar/message-consumption.adoc @@ -11,9 +11,6 @@ When you use Spring Boot support, it automatically enables this annotation and c `PulsarMessageListenerContainer` uses a `PulsarConsumerFactory` to create and manage the Pulsar consumer the underlying Pulsar consumer that it uses to consume messages. Spring Boot provides this consumer factory which you can further configure by specifying the {spring-boot-pulsar-config-props}[`spring.pulsar.consumer.*`] application properties. -**Most** of the configured properties on the factory will be respected in the listener with the following **exceptions**: - -TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation. Let us revisit the `PulsarListener` code snippet we saw in the quick-tour section: diff --git a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar/reactive-message-consumption.adoc b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar/reactive-message-consumption.adoc index 74ca5f6f5..c3ed29d4c 100644 --- a/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar/reactive-message-consumption.adoc +++ b/spring-pulsar-docs/src/main/antora/modules/ROOT/pages/reference/reactive-pulsar/reactive-message-consumption.adoc @@ -118,11 +118,6 @@ NOTE: There is no support for using `org.apache.pulsar.client.api.Messages` i === Configuration - Application Properties The listener relies on the `ReactivePulsarConsumerFactory` to create and manage the underlying Pulsar consumer that it uses to consume messages. Spring Boot provides this consumer factory which you can further configure by specifying the {spring-boot-pulsar-config-props}[`spring.pulsar.consumer.*`] application properties. -**Most** of the configured properties on the factory will be respected in the listener with the following **exceptions**: - -TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation. - -TIP: The `spring.pulsar.consumer.subscription.type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default. === Generic records with AUTO_CONSUME If there is no chance to know the type of schema of a Pulsar topic in advance, you can use the `AUTO_CONSUME` schema type to consume generic records. diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java index 26b157f2d..f9a105c23 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java @@ -18,6 +18,7 @@ import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; @@ -39,6 +40,10 @@ */ public class DefaultReactivePulsarListenerContainerFactory implements ReactivePulsarListenerContainerFactory { + private static final String SUBSCRIPTION_NAME_PREFIX = "org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#"; + + private static final AtomicInteger COUNTER = new AtomicInteger(); + protected final LogAccessor logger = new LogAccessor(this.getClass()); private final ReactivePulsarConsumerFactory consumerFactory; @@ -84,58 +89,54 @@ public void setFluxListener(Boolean fluxListener) { @SuppressWarnings("unchecked") public DefaultReactivePulsarMessageListenerContainer createContainerInstance( ReactivePulsarListenerEndpoint endpoint) { - - ReactivePulsarContainerProperties properties = new ReactivePulsarContainerProperties<>(); - properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver()); - properties.setTopicResolver(this.getContainerProperties().getTopicResolver()); - properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType()); - + var containerProps = new ReactivePulsarContainerProperties(); + var factoryProps = this.getContainerProperties(); + + // Map factory props (defaults) to the container props + containerProps.setSchemaResolver(factoryProps.getSchemaResolver()); + containerProps.setTopicResolver(factoryProps.getTopicResolver()); + containerProps.setSubscriptionType(factoryProps.getSubscriptionType()); + containerProps.setSubscriptionName(factoryProps.getSubscriptionName()); + containerProps.setSchemaType(factoryProps.getSchemaType()); + containerProps.setConcurrency(factoryProps.getConcurrency()); + containerProps.setUseKeyOrderedProcessing(factoryProps.isUseKeyOrderedProcessing()); + + // Map relevant props from the endpoint to the container props if (!CollectionUtils.isEmpty(endpoint.getTopics())) { - properties.setTopics(endpoint.getTopics()); + containerProps.setTopics(endpoint.getTopics()); } - if (StringUtils.hasText(endpoint.getTopicPattern())) { - properties.setTopicsPattern(endpoint.getTopicPattern()); + containerProps.setTopicsPattern(endpoint.getTopicPattern()); } - - if (StringUtils.hasText(endpoint.getSubscriptionName())) { - properties.setSubscriptionName(endpoint.getSubscriptionName()); - } - if (endpoint.getSubscriptionType() != null) { - properties.setSubscriptionType(endpoint.getSubscriptionType()); + containerProps.setSubscriptionType(endpoint.getSubscriptionType()); } - // Default to Exclusive if not set on container props or endpoint - if (properties.getSubscriptionType() == null) { - properties.setSubscriptionType(SubscriptionType.Exclusive); + // Default subscription type to Exclusive when not set elsewhere + if (containerProps.getSubscriptionType() == null) { + containerProps.setSubscriptionType(SubscriptionType.Exclusive); } - - if (endpoint.getSchemaType() != null) { - properties.setSchemaType(endpoint.getSchemaType()); + if (StringUtils.hasText(endpoint.getSubscriptionName())) { + containerProps.setSubscriptionName(endpoint.getSubscriptionName()); } - else { - properties.setSchemaType(this.containerProperties.getSchemaType()); + // Default subscription name to generated when not set elsewhere + if (!StringUtils.hasText(containerProps.getSubscriptionName())) { + var generatedName = SUBSCRIPTION_NAME_PREFIX + COUNTER.getAndIncrement(); + containerProps.setSubscriptionName(generatedName); } - - if (properties.getSchema() == null) { - properties.setSchema((Schema) Schema.BYTES); + if (endpoint.getSchemaType() != null) { + containerProps.setSchemaType(endpoint.getSchemaType()); } - - if (endpoint.getConcurrency() != null) { - properties.setConcurrency(endpoint.getConcurrency()); + // Default to BYTES if not set elsewhere + if (containerProps.getSchema() == null) { + containerProps.setSchema((Schema) Schema.BYTES); } - else { - properties.setConcurrency(this.containerProperties.getConcurrency()); + if (endpoint.getConcurrency() != null) { + containerProps.setConcurrency(endpoint.getConcurrency()); } - if (endpoint.getUseKeyOrderedProcessing() != null) { - properties.setUseKeyOrderedProcessing(endpoint.getUseKeyOrderedProcessing()); + containerProps.setUseKeyOrderedProcessing(endpoint.getUseKeyOrderedProcessing()); } - else { - properties.setUseKeyOrderedProcessing(this.containerProperties.isUseKeyOrderedProcessing()); - } - - return new DefaultReactivePulsarMessageListenerContainer<>(this.getConsumerFactory(), properties); + return new DefaultReactivePulsarMessageListenerContainer<>(this.getConsumerFactory(), containerProps); } @Override diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java index fd4c1bc1d..3e78f7f3b 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java @@ -230,11 +230,11 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene ReactivePulsarListener reactivePulsarListener, Object bean, String[] topics, String topicPattern) { endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); - endpoint.setSubscriptionName(getEndpointSubscriptionName(reactivePulsarListener)); endpoint.setId(getEndpointId(reactivePulsarListener)); endpoint.setTopics(topics); endpoint.setTopicPattern(topicPattern); resolveSubscriptionType(endpoint, reactivePulsarListener); + resolveSubscriptionName(endpoint, reactivePulsarListener); endpoint.setSchemaType(reactivePulsarListener.schemaType()); String concurrency = reactivePulsarListener.concurrency(); if (StringUtils.hasText(concurrency)) { @@ -257,11 +257,18 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene } private void resolveSubscriptionType(MethodReactivePulsarListenerEndpoint endpoint, - ReactivePulsarListener reactivePulsarListener) { - Assert.state(reactivePulsarListener.subscriptionType().length <= 1, + ReactivePulsarListener listener) { + Assert.state(listener.subscriptionType().length <= 1, () -> "ReactivePulsarListener.subscriptionType must have 0 or 1 elements"); - if (reactivePulsarListener.subscriptionType().length == 1) { - endpoint.setSubscriptionType(reactivePulsarListener.subscriptionType()[0]); + if (listener.subscriptionType().length == 1) { + endpoint.setSubscriptionType(listener.subscriptionType()[0]); + } + } + + private void resolveSubscriptionName(MethodReactivePulsarListenerEndpoint endpoint, + ReactivePulsarListener listener) { + if (StringUtils.hasText(listener.subscriptionName())) { + endpoint.setSubscriptionName(resolveExpressionAsString(listener.subscriptionName(), "subscriptionName")); } } @@ -322,13 +329,6 @@ private void resolveConsumerCustomizer(MethodReactivePulsarListenerEndpoint e } } - private String getEndpointSubscriptionName(ReactivePulsarListener reactivePulsarListener) { - if (StringUtils.hasText(reactivePulsarListener.subscriptionName())) { - return resolveExpressionAsString(reactivePulsarListener.subscriptionName(), "subscriptionName"); - } - return GENERATED_ID_PREFIX + this.counter.getAndIncrement(); - } - private String getEndpointId(ReactivePulsarListener reactivePulsarListener) { if (StringUtils.hasText(reactivePulsarListener.id())) { return resolveExpressionAsString(reactivePulsarListener.id(), "id"); diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java index 02971919a..e05d5bd02 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java @@ -78,4 +78,55 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { } + @SuppressWarnings("unchecked") + @Nested + class SubscriptionNameFrom { + + @Test + void factoryPropsUsedWhenNotSetOnEndpoint() { + var factoryProps = new ReactivePulsarContainerProperties(); + factoryProps.setSubscriptionName("my-factory-subscription"); + var containerFactory = new DefaultReactivePulsarListenerContainerFactory( + mock(ReactivePulsarConsumerFactory.class), factoryProps); + var endpoint = mock(ReactivePulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionName()) + .isEqualTo("my-factory-subscription"); + } + + @Test + void endpointTakesPrecedenceOverFactoryProps() { + var factoryProps = new ReactivePulsarContainerProperties(); + factoryProps.setSubscriptionName("my-factory-subscription"); + var containerFactory = new DefaultReactivePulsarListenerContainerFactory( + mock(ReactivePulsarConsumerFactory.class), factoryProps); + var endpoint = mock(ReactivePulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + when(endpoint.getSubscriptionName()).thenReturn("my-endpoint-subscription"); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionName()) + .isEqualTo("my-endpoint-subscription"); + } + + @Test + void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { + var factoryProps = new ReactivePulsarContainerProperties(); + var containerFactory = new DefaultReactivePulsarListenerContainerFactory( + mock(ReactivePulsarConsumerFactory.class), factoryProps); + var endpoint = mock(ReactivePulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + + var container1 = containerFactory.createListenerContainer(endpoint); + assertThat(container1.getContainerProperties().getSubscriptionName()) + .startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#"); + var container2 = containerFactory.createListenerContainer(endpoint); + assertThat(container2.getContainerProperties().getSubscriptionName()) + .startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#"); + assertThat(container1.getContainerProperties().getSubscriptionName()) + .isNotEqualTo(container2.getContainerProperties().getSubscriptionName()); + } + + } + } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java index e8469b7ee..073546503 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java @@ -44,6 +44,7 @@ import org.apache.pulsar.reactive.client.api.MessageResult; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec; +import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -72,7 +73,7 @@ import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig; import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig; -import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.SubscriptionTypeTestsConfig; +import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionNameTests.SubscriptionNameTestsConfig; import org.springframework.pulsar.reactive.support.MessageUtils; import org.springframework.pulsar.support.PulsarHeaders; import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; @@ -815,80 +816,79 @@ Mono listen2(String message) { } @Nested - @ContextConfiguration(classes = SubscriptionTypeTestsConfig.class) - class SubscriptionTypeTests { + @ContextConfiguration(classes = SubscriptionNameTestsConfig.class) + class SubscriptionNameTests { - static final CountDownLatch latchTypeNotSet = new CountDownLatch(1); + static final CountDownLatch latchNameNotSet = new CountDownLatch(1); - static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1); + static final CountDownLatch latchNameSetOnAnnotation = new CountDownLatch(1); - static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1); + static final CountDownLatch latchNameSetOnCustomizer = new CountDownLatch(1); @Test - void defaultTypeFromContainerFactoryUsedWhenTypeNotSetAnywhere( + void defaultNameFromContainerFactoryUsedWhenNameNotSetAnywhere( @Autowired ConsumerTrackingReactivePulsarConsumerFactory consumerFactory) throws Exception { - var topic = "rpl-latchTypeNotSet-topic"; - assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType) - .isEqualTo(SubscriptionType.Exclusive); + var topic = "rpl-latchNameNotSet-topic"; + assertThat(consumerFactory.getSpec(topic)) + .extracting(ReactiveMessageConsumerSpec::getSubscriptionName, InstanceOfAssertFactories.STRING) + .startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#"); pulsarTemplate.send(topic, "hello-" + topic); - assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchNameNotSet.await(5, TimeUnit.SECONDS)).isTrue(); } @Test - void typeSetOnAnnotationOverridesDefaultTypeFromContainerFactory( + void nameSetOnAnnotationOverridesDefaultNameFromContainerFactory( @Autowired ConsumerTrackingReactivePulsarConsumerFactory consumerFactory) throws Exception { - var topic = "rpl-typeSetOnAnnotation-topic"; - assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType) - .isEqualTo(SubscriptionType.Key_Shared); + var topic = "rpl-nameSetOnAnnotation-topic"; + assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionName) + .isEqualTo("from-annotation"); pulsarTemplate.send(topic, "hello-" + topic); - assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchNameSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue(); } @Test - void typeSetOnCustomizerOverridesTypeSetOnAnnotation( + void nameSetOnCustomizerOverridesNameSetOnAnnotation( @Autowired ConsumerTrackingReactivePulsarConsumerFactory consumerFactory) throws Exception { - var topic = "rpl-typeSetOnCustomizer-topic"; - assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType) - .isEqualTo(SubscriptionType.Failover); + var topic = "rpl-nameSetOnCustomizer-topic"; + assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionName) + .isEqualTo("from-customizer"); pulsarTemplate.send(topic, "hello-" + topic); - assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(latchNameSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue(); } @Configuration(proxyBeanMethods = false) - static class SubscriptionTypeTestsConfig { + static class SubscriptionNameTestsConfig { @Bean - ReactiveMessageConsumerBuilderCustomizer consumerFactoryDefaultSubTypeCustomizer() { - return (b) -> b.subscriptionType(SubscriptionType.Shared); + ReactiveMessageConsumerBuilderCustomizer consumerFactoryDefaultSubNameCustomizer() { + return (b) -> b.subscriptionName("from-consumer-factory"); } - @ReactivePulsarListener(topics = "rpl-latchTypeNotSet-topic", subscriptionName = "rpl-latchTypeNotSet-sub", + @ReactivePulsarListener(topics = "rpl-latchNameNotSet-topic", consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenWithoutTypeSetAnywhere(String ignored) { - latchTypeNotSet.countDown(); + Mono listenWithoutNameSetAnywhere(String ignored) { + latchNameNotSet.countDown(); return Mono.empty(); } - @ReactivePulsarListener(topics = "rpl-typeSetOnAnnotation-topic", - subscriptionName = "rpl-typeSetOnAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared, + @ReactivePulsarListener(topics = "rpl-nameSetOnAnnotation-topic", subscriptionName = "from-annotation", consumerCustomizer = "subscriptionInitialPositionEarliest") - Mono listenWithTypeSetOnAnnotation(String ignored) { - latchTypeSetOnAnnotation.countDown(); + Mono listenWithNameSetOnAnnotation(String ignored) { + latchNameSetOnAnnotation.countDown(); return Mono.empty(); } - @ReactivePulsarListener(topics = "rpl-typeSetOnCustomizer-topic", - subscriptionName = "rpl-typeSetOnCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared, + @ReactivePulsarListener(topics = "rpl-nameSetOnCustomizer-topic", subscriptionName = "from-annotation", consumerCustomizer = "myCustomizer") - Mono listenWithTypeSetOnCustomizer(String ignored) { - latchTypeSetOnCustomizer.countDown(); + Mono listenWithNameSetOnCustomizer(String ignored) { + latchNameSetOnCustomizer.countDown(); return Mono.empty(); } @Bean public ReactivePulsarListenerMessageConsumerBuilderCustomizer myCustomizer() { return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscriptionType(SubscriptionType.Failover); + .subscriptionName("from-customizer"); } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListenerAnnotationBeanPostProcessor.java b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListenerAnnotationBeanPostProcessor.java index 0100169f1..8a6aee9dd 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListenerAnnotationBeanPostProcessor.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListenerAnnotationBeanPostProcessor.java @@ -214,10 +214,10 @@ private void processPulsarListenerAnnotation(MethodPulsarListenerEndpoint end PulsarListener pulsarListener, Object bean, String[] topics, String topicPattern) { endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); - endpoint.setSubscriptionName(getEndpointSubscriptionName(pulsarListener)); endpoint.setId(getEndpointId(pulsarListener)); endpoint.setTopics(topics); endpoint.setTopicPattern(topicPattern); + resolveSubscriptionName(endpoint, pulsarListener); resolveSubscriptionType(endpoint, pulsarListener); endpoint.setSchemaType(pulsarListener.schemaType()); endpoint.setAckMode(pulsarListener.ackMode()); @@ -252,6 +252,13 @@ private void resolveSubscriptionType(MethodPulsarListenerEndpoint endpoint, P } } + private void resolveSubscriptionName(MethodPulsarListenerEndpoint endpoint, PulsarListener pulsarListener) { + if (StringUtils.hasText(pulsarListener.subscriptionName())) { + endpoint + .setSubscriptionName(resolveExpressionAsString(pulsarListener.subscriptionName(), "subscriptionName")); + } + } + @SuppressWarnings({ "rawtypes" }) private void resolvePulsarConsumerErrorHandler(MethodPulsarListenerEndpoint endpoint, PulsarListener pulsarListener) { @@ -385,13 +392,6 @@ else if (value instanceof Collection values) { } } - private String getEndpointSubscriptionName(PulsarListener pulsarListener) { - if (StringUtils.hasText(pulsarListener.subscriptionName())) { - return resolveExpressionAsString(pulsarListener.subscriptionName(), "subscriptionName"); - } - return GENERATED_ID_PREFIX + this.counter.getAndIncrement(); - } - private String getEndpointId(PulsarListener pulsarListener) { if (StringUtils.hasText(pulsarListener.id())) { return resolveExpressionAsString(pulsarListener.id(), "id"); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java index abe86a0d0..ea1b6fb9d 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java @@ -19,6 +19,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.pulsar.client.api.SubscriptionType; @@ -39,6 +40,10 @@ public class ConcurrentPulsarListenerContainerFactory extends AbstractPulsarListenerContainerFactory, T> { + private static final String SUBSCRIPTION_NAME_PREFIX = "org.springframework.Pulsar.PulsarListenerEndpointContainer#"; + + private static final AtomicInteger COUNTER = new AtomicInteger(); + private Integer concurrency; public ConcurrentPulsarListenerContainerFactory(PulsarConsumerFactory consumerFactory, @@ -72,47 +77,50 @@ public Collection getTopics() { @Override protected ConcurrentPulsarMessageListenerContainer createContainerInstance(PulsarListenerEndpoint endpoint) { - - PulsarContainerProperties properties = new PulsarContainerProperties(); - properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver()); - properties.setTopicResolver(this.getContainerProperties().getTopicResolver()); - properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType()); - - var parentTxnProps = this.getContainerProperties().transactions(); - var childTxnProps = properties.transactions(); - childTxnProps.setEnabled(parentTxnProps.isEnabled()); - childTxnProps.setRequired(parentTxnProps.isRequired()); - childTxnProps.setTimeout(parentTxnProps.getTimeout()); - childTxnProps.setTransactionDefinition(parentTxnProps.getTransactionDefinition()); - childTxnProps.setTransactionManager(parentTxnProps.getTransactionManager()); - + var factoryProps = this.getContainerProperties(); + var containerProps = new PulsarContainerProperties(); + + // Map factory props (defaults) to the container props + containerProps.setSchemaResolver(factoryProps.getSchemaResolver()); + containerProps.setTopicResolver(factoryProps.getTopicResolver()); + containerProps.setSubscriptionType(factoryProps.getSubscriptionType()); + containerProps.setSubscriptionName(factoryProps.getSubscriptionName()); + var factoryTxnProps = factoryProps.transactions(); + var containerTxnProps = containerProps.transactions(); + containerTxnProps.setEnabled(factoryTxnProps.isEnabled()); + containerTxnProps.setRequired(factoryTxnProps.isRequired()); + containerTxnProps.setTimeout(factoryTxnProps.getTimeout()); + containerTxnProps.setTransactionDefinition(factoryTxnProps.getTransactionDefinition()); + containerTxnProps.setTransactionManager(factoryTxnProps.getTransactionManager()); + + // Map relevant props from the endpoint to the container props if (!CollectionUtils.isEmpty(endpoint.getTopics())) { - properties.setTopics(new HashSet<>(endpoint.getTopics())); + containerProps.setTopics(new HashSet<>(endpoint.getTopics())); } - if (StringUtils.hasText(endpoint.getTopicPattern())) { - properties.setTopicsPattern(endpoint.getTopicPattern()); + containerProps.setTopicsPattern(endpoint.getTopicPattern()); } - - if (StringUtils.hasText(endpoint.getSubscriptionName())) { - properties.setSubscriptionName(endpoint.getSubscriptionName()); - } - if (endpoint.isBatchListener()) { - properties.setBatchListener(endpoint.isBatchListener()); + containerProps.setBatchListener(endpoint.isBatchListener()); + } + if (StringUtils.hasText(endpoint.getSubscriptionName())) { + containerProps.setSubscriptionName(endpoint.getSubscriptionName()); } - if (endpoint.getSubscriptionType() != null) { - properties.setSubscriptionType(endpoint.getSubscriptionType()); + containerProps.setSubscriptionType(endpoint.getSubscriptionType()); } - // Default to Exclusive if not set on container props or endpoint - if (properties.getSubscriptionType() == null) { - properties.setSubscriptionType(SubscriptionType.Exclusive); + // Default subscription name to generated when not set elsewhere + if (!StringUtils.hasText(containerProps.getSubscriptionName())) { + var generatedName = SUBSCRIPTION_NAME_PREFIX + COUNTER.getAndIncrement(); + containerProps.setSubscriptionName(generatedName); } + // Default subscription type to Exclusive when not set elsewhere + if (containerProps.getSubscriptionType() == null) { + containerProps.setSubscriptionType(SubscriptionType.Exclusive); + } + containerProps.setSchemaType(endpoint.getSchemaType()); - properties.setSchemaType(endpoint.getSchemaType()); - - return new ConcurrentPulsarMessageListenerContainer<>(this.getConsumerFactory(), properties); + return new ConcurrentPulsarMessageListenerContainer<>(this.getConsumerFactory(), containerProps); } @Override diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarListenerEndpointRegistrar.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarListenerEndpointRegistrar.java index c3485dd07..c929f2011 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarListenerEndpointRegistrar.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/PulsarListenerEndpointRegistrar.java @@ -118,7 +118,7 @@ else if (this.containerFactoryBeanName != null) { public void registerEndpoint(ListenerEndpoint endpoint, @Nullable ListenerContainerFactory factory) { Assert.notNull(endpoint, "Endpoint must be set"); - Assert.hasText(endpoint.getSubscriptionName(), "Endpoint id must be set"); + Assert.hasText(endpoint.getId(), "Endpoint id must be set"); // Factory may be null, we defer the resolution right before actually creating the // container PulsarListenerEndpointDescriptor descriptor = new PulsarListenerEndpointDescriptor(endpoint, factory); diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java new file mode 100644 index 000000000..a808cb9bd --- /dev/null +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactoryTests.java @@ -0,0 +1,131 @@ +/* + * Copyright 2023-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.pulsar.config; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.pulsar.client.api.SubscriptionType; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import org.springframework.pulsar.core.PulsarConsumerFactory; +import org.springframework.pulsar.listener.PulsarContainerProperties; + +/** + * Unit tests for {@link ConcurrentPulsarListenerContainerFactory}. + */ +class ConcurrentPulsarListenerContainerFactoryTests { + + @SuppressWarnings("unchecked") + @Nested + class SubscriptionTypeFrom { + + @Test + void factoryPropsUsedWhenNotSetOnEndpoint() { + var factoryProps = new PulsarContainerProperties(); + factoryProps.setSubscriptionType(SubscriptionType.Shared); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionType()) + .isEqualTo(SubscriptionType.Shared); + } + + @Test + void endpointTakesPrecedenceOverFactoryProps() { + var factoryProps = new PulsarContainerProperties(); + factoryProps.setSubscriptionType(SubscriptionType.Shared); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + when(endpoint.getSubscriptionType()).thenReturn(SubscriptionType.Failover); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionType()) + .isEqualTo(SubscriptionType.Failover); + } + + @Test + void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { + var factoryProps = new PulsarContainerProperties(); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionType()) + .isEqualTo(SubscriptionType.Exclusive); + } + + } + + @SuppressWarnings("unchecked") + @Nested + class SubscriptionNameFrom { + + @Test + void factoryPropsUsedWhenNotSetOnEndpoint() { + var factoryProps = new PulsarContainerProperties(); + factoryProps.setSubscriptionName("my-factory-subscription"); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionName()) + .isEqualTo("my-factory-subscription"); + } + + @Test + void endpointTakesPrecedenceOverFactoryProps() { + var factoryProps = new PulsarContainerProperties(); + factoryProps.setSubscriptionName("my-factory-subscription"); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + when(endpoint.getSubscriptionName()).thenReturn("my-endpoint-subscription"); + var createdContainer = containerFactory.createListenerContainer(endpoint); + assertThat(createdContainer.getContainerProperties().getSubscriptionName()) + .isEqualTo("my-endpoint-subscription"); + } + + @Test + void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { + var factoryProps = new PulsarContainerProperties(); + var containerFactory = new ConcurrentPulsarListenerContainerFactory( + mock(PulsarConsumerFactory.class), factoryProps); + var endpoint = mock(PulsarListenerEndpoint.class); + when(endpoint.getConcurrency()).thenReturn(1); + + var container1 = containerFactory.createListenerContainer(endpoint); + assertThat(container1.getContainerProperties().getSubscriptionName()) + .startsWith("org.springframework.Pulsar.PulsarListenerEndpointContainer#"); + var container2 = containerFactory.createListenerContainer(endpoint); + assertThat(container2.getContainerProperties().getSubscriptionName()) + .startsWith("org.springframework.Pulsar.PulsarListenerEndpointContainer#"); + assertThat(container1.getContainerProperties().getSubscriptionName()) + .isNotEqualTo(container2.getContainerProperties().getSubscriptionName()); + } + + } + +} diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java index 1967efc75..e1f9d7ca9 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java @@ -212,51 +212,6 @@ private record PulsarListenerMockComponents(PulsarConsumerFactory consum Consumer consumer, ConcurrentPulsarMessageListenerContainer concurrentContainer) { } - @SuppressWarnings("unchecked") - @Nested - class SubscriptionTypeFrom { - - @Test - void factoryPropsUsedWhenNotSetOnEndpoint() { - var factoryProps = new PulsarContainerProperties(); - factoryProps.setSubscriptionType(SubscriptionType.Shared); - var containerFactory = new ConcurrentPulsarListenerContainerFactory( - mock(PulsarConsumerFactory.class), factoryProps); - var endpoint = mock(PulsarListenerEndpoint.class); - when(endpoint.getConcurrency()).thenReturn(1); - var createdContainer = containerFactory.createListenerContainer(endpoint); - assertThat(createdContainer.getContainerProperties().getSubscriptionType()) - .isEqualTo(SubscriptionType.Shared); - } - - @Test - void endpointTakesPrecedenceOverFactoryProps() { - var factoryProps = new PulsarContainerProperties(); - factoryProps.setSubscriptionType(SubscriptionType.Shared); - var containerFactory = new ConcurrentPulsarListenerContainerFactory( - mock(PulsarConsumerFactory.class), factoryProps); - var endpoint = mock(PulsarListenerEndpoint.class); - when(endpoint.getConcurrency()).thenReturn(1); - when(endpoint.getSubscriptionType()).thenReturn(SubscriptionType.Failover); - var createdContainer = containerFactory.createListenerContainer(endpoint); - assertThat(createdContainer.getContainerProperties().getSubscriptionType()) - .isEqualTo(SubscriptionType.Failover); - } - - @Test - void defaultUsedWhenNotSetOnEndpointNorFactoryProps() { - var factoryProps = new PulsarContainerProperties(); - var containerFactory = new ConcurrentPulsarListenerContainerFactory( - mock(PulsarConsumerFactory.class), factoryProps); - var endpoint = mock(PulsarListenerEndpoint.class); - when(endpoint.getConcurrency()).thenReturn(1); - var createdContainer = containerFactory.createListenerContainer(endpoint); - assertThat(createdContainer.getContainerProperties().getSubscriptionType()) - .isEqualTo(SubscriptionType.Exclusive); - } - - } - @Nested class ObservationConfigurationTests { diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java index 8f15f855e..c4289fe43 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java @@ -47,6 +47,7 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; import org.assertj.core.api.AbstractObjectAssert; +import org.assertj.core.api.AbstractStringAssert; import org.assertj.core.api.InstanceOfAssertFactories; import org.awaitility.Awaitility; import org.junit.jupiter.api.Nested; @@ -73,6 +74,7 @@ import org.springframework.pulsar.core.SchemaResolver; import org.springframework.pulsar.core.TopicResolver; import org.springframework.pulsar.listener.PulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig; +import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionNameTests.SubscriptionNameTestsConfig; import org.springframework.pulsar.listener.PulsarListenerTests.SubscriptionTypeTests.SubscriptionTypeTestsConfig; import org.springframework.pulsar.support.PulsarHeaders; import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper; @@ -1125,4 +1127,76 @@ public PulsarListenerConsumerBuilderCustomizer myCustomizer() { } + @Nested + @ContextConfiguration(classes = SubscriptionNameTestsConfig.class) + class SubscriptionNameTests { + + static final CountDownLatch latchNameNotSet = new CountDownLatch(1); + + static final CountDownLatch latchNameSetOnAnnotation = new CountDownLatch(1); + + static final CountDownLatch latchNameSetOnCustomizer = new CountDownLatch(1); + + @Test + void defaultNameFromContainerFactoryUsedWhenNameNotSetAnywhere() throws Exception { + pulsarTemplate.send("latchNameNotSet-topic", "hello-latchNameNotSet"); + assertThat(latchNameNotSet.await(5, TimeUnit.SECONDS)).isTrue(); + } + + @Test + void nameSetOnAnnotationOverridesDefaultNameFromContainerFactory() throws Exception { + pulsarTemplate.send("nameSetOnAnnotation-topic", "hello-nameSetOnAnnotation"); + assertThat(latchNameSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue(); + } + + @Test + void nameSetOnCustomizerOverridesNameSetOnAnnotation() throws Exception { + pulsarTemplate.send("nameSetOnCustomizer-topic", "hello-nameSetOnCustomizer"); + assertThat(latchNameSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue(); + } + + @Configuration(proxyBeanMethods = false) + static class SubscriptionNameTestsConfig { + + @Bean + ConsumerBuilderCustomizer consumerFactoryCustomizerSubNameIsIgnored() { + return (b) -> b.subscriptionName("from-consumer-factory"); + } + + @PulsarListener(topics = "latchNameNotSet-topic") + void listenWithNameNotSet(String ignored, Consumer consumer) { + assertSubscriptionName(consumer) + .startsWith("org.springframework.Pulsar.PulsarListenerEndpointContainer#"); + latchNameNotSet.countDown(); + } + + @PulsarListener(topics = "nameSetOnAnnotation-topic", subscriptionName = "from-annotation") + void listenWithNameSetOnAnnotation(String ignored, Consumer consumer) { + assertSubscriptionName(consumer).isEqualTo("from-annotation"); + latchNameSetOnAnnotation.countDown(); + } + + @PulsarListener(topics = "nameSetOnCustomizer-topic", subscriptionName = "from-annotation", + consumerCustomizer = "myCustomizer") + void listenWithNameSetOnCustomizer(String ignored, Consumer consumer) { + assertSubscriptionName(consumer).isEqualTo("from-customizer"); + latchNameSetOnCustomizer.countDown(); + } + + @Bean + public PulsarListenerConsumerBuilderCustomizer myCustomizer() { + return cb -> cb.subscriptionName("from-customizer"); + } + + @SuppressWarnings("rawtypes") + private static AbstractStringAssert assertSubscriptionName(Consumer consumer) { + return assertThat(consumer) + .extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class)) + .extracting(ConsumerConfigurationData::getSubscriptionName, InstanceOfAssertFactories.STRING); + } + + } + + } + }