Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ Spring Boot provides this consumer factory which you can further configure by sp

TIP: The `spring.pulsar.consumer.subscription.name` property is ignored and is instead generated when not specified on the annotation.

TIP: The `spring.pulsar.consumer.subscription.type` property is ignored and is instead taken from the value on the annotation. However, you can set the `subscriptionType = {}` on the annotation to instead use the property value as the default.


Let us revisit the `PulsarListener` code snippet we saw in the quick-tour section:

[source, java]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;

import org.springframework.core.log.LogAccessor;
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
Expand Down Expand Up @@ -87,6 +88,7 @@ public DefaultReactivePulsarMessageListenerContainer<T> createContainerInstance(
ReactivePulsarContainerProperties<T> properties = new ReactivePulsarContainerProperties<>();
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType());

if (!CollectionUtils.isEmpty(endpoint.getTopics())) {
properties.setTopics(endpoint.getTopics());
Expand All @@ -103,8 +105,9 @@ public DefaultReactivePulsarMessageListenerContainer<T> createContainerInstance(
if (endpoint.getSubscriptionType() != null) {
properties.setSubscriptionType(endpoint.getSubscriptionType());
}
else {
properties.setSubscriptionType(this.containerProperties.getSubscriptionType());
// Default to Exclusive if not set on container props or endpoint
if (properties.getSubscriptionType() == null) {
properties.setSubscriptionType(SubscriptionType.Exclusive);
}

if (endpoint.getSchemaType() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
* @return single element array with the subscription type or empty array to indicate
* no type chosen by user
*/
SubscriptionType[] subscriptionType() default { SubscriptionType.Exclusive };
SubscriptionType[] subscriptionType() default {};

/**
* Pulsar schema type for this listener.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.reactive.config;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.apache.pulsar.client.api.SubscriptionType;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
import org.springframework.pulsar.reactive.listener.ReactivePulsarContainerProperties;

/**
* Unit tests for {@link DefaultReactivePulsarListenerContainerFactory}.
*/
class DefaultReactivePulsarListenerContainerFactoryTests {

@SuppressWarnings("unchecked")
@Nested
class SubscriptionTypeFrom {

@Test
void factoryPropsUsedWhenNotSetOnEndpoint() {
var factoryProps = new ReactivePulsarContainerProperties<String>();
factoryProps.setSubscriptionType(SubscriptionType.Shared);
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
var createdContainer = containerFactory.createListenerContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
.isEqualTo(SubscriptionType.Shared);
}

@Test
void endpointTakesPrecedenceOverFactoryProps() {
var factoryProps = new ReactivePulsarContainerProperties<String>();
factoryProps.setSubscriptionType(SubscriptionType.Shared);
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
when(endpoint.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
var createdContainer = containerFactory.createListenerContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
.isEqualTo(SubscriptionType.Failover);
}

@Test
void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {
var factoryProps = new ReactivePulsarContainerProperties<String>();
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
var createdContainer = containerFactory.createListenerContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
.isEqualTo(SubscriptionType.Exclusive);

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithDefaultType.WithDefaultTypeConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.WithSpecificTypes.WithSpecificTypesConfig;
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.SubscriptionTypeTestsConfig;
import org.springframework.pulsar.reactive.support.MessageUtils;
import org.springframework.pulsar.support.PulsarHeaders;
import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper;
Expand Down Expand Up @@ -816,117 +815,80 @@ Mono<Void> listen2(String message) {
}

@Nested
@ContextConfiguration(classes = SubscriptionTypeTestsConfig.class)
class SubscriptionTypeTests {

@Nested
@ContextConfiguration(classes = WithDefaultTypeConfig.class)
class WithDefaultType {
static final CountDownLatch latchTypeNotSet = new CountDownLatch(1);

static final CountDownLatch latchTypeNotSet = new CountDownLatch(1);
static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1);

@Test
void whenTypeNotSetAnywhereThenFallbackTypeIsUsed(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
assertThat(consumerFactory.topicNameToConsumerSpec).hasEntrySatisfying("rpl-typeNotSetAnywhere-topic",
(consumerSpec) -> assertThat(consumerSpec.getSubscriptionType())
.isEqualTo(SubscriptionType.Exclusive));
pulsarTemplate.send("rpl-typeNotSetAnywhere-topic", "hello-rpl-typeNotSetAnywhere");
assertThat(latchTypeNotSet.await(10, TimeUnit.SECONDS)).isTrue();
}

@Configuration(proxyBeanMethods = false)
static class WithDefaultTypeConfig {

@ReactivePulsarListener(topics = "rpl-typeNotSetAnywhere-topic",
subscriptionName = "rpl-typeNotSetAnywhere-sub",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithoutTypeSetAnywhere(String ignored) {
latchTypeNotSet.countDown();
return Mono.empty();
}

}
static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1);

@Test
void defaultTypeFromContainerFactoryUsedWhenTypeNotSetAnywhere(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
var topic = "rpl-latchTypeNotSet-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Exclusive);
pulsarTemplate.send(topic, "hello-" + topic);
assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue();
}

@Nested
@ContextConfiguration(classes = WithSpecificTypesConfig.class)
class WithSpecificTypes {

static final CountDownLatch latchTypeSetConsumerFactory = new CountDownLatch(1);
@Test
void typeSetOnAnnotationOverridesDefaultTypeFromContainerFactory(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
var topic = "rpl-typeSetOnAnnotation-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Key_Shared);
pulsarTemplate.send(topic, "hello-" + topic);
assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue();
}

static final CountDownLatch latchTypeSetAnnotation = new CountDownLatch(1);
@Test
void typeSetOnCustomizerOverridesTypeSetOnAnnotation(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
var topic = "rpl-typeSetOnCustomizer-topic";
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Failover);
pulsarTemplate.send(topic, "hello-" + topic);
assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue();
}

static final CountDownLatch latchWithCustomizer = new CountDownLatch(1);
@Configuration(proxyBeanMethods = false)
static class SubscriptionTypeTestsConfig {

@Test
void whenTypeSetOnlyInConsumerFactoryThenConsumerFactoryTypeIsUsed(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
assertThat(consumerFactory.getSpec("rpl-typeSetConsumerFactory-topic"))
.extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Shared);
pulsarTemplate.send("rpl-typeSetConsumerFactory-topic", "hello-rpl-typeSetConsumerFactory");
assertThat(latchTypeSetConsumerFactory.await(10, TimeUnit.SECONDS)).isTrue();
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubTypeCustomizer() {
return (b) -> b.subscriptionType(SubscriptionType.Shared);
}

@Test
void whenTypeSetOnAnnotationThenAnnotationTypeIsUsed(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
assertThat(consumerFactory.getSpec("rpl-typeSetAnnotation-topic"))
.extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Key_Shared);
pulsarTemplate.send("rpl-typeSetAnnotation-topic", "hello-rpl-typeSetAnnotation");
assertThat(latchTypeSetAnnotation.await(10, TimeUnit.SECONDS)).isTrue();
@ReactivePulsarListener(topics = "rpl-latchTypeNotSet-topic", subscriptionName = "rpl-latchTypeNotSet-sub",
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithoutTypeSetAnywhere(String ignored) {
latchTypeNotSet.countDown();
return Mono.empty();
}

@Test
void whenTypeSetWithCustomizerThenCustomizerTypeIsUsed(
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
assertThat(consumerFactory.getSpec("rpl-typeSetCustomizer-topic"))
.extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
.isEqualTo(SubscriptionType.Failover);
pulsarTemplate.send("rpl-typeSetCustomizer-topic", "hello-rpl-typeSetCustomizer");
assertThat(latchWithCustomizer.await(10, TimeUnit.SECONDS)).isTrue();
@ReactivePulsarListener(topics = "rpl-typeSetOnAnnotation-topic",
subscriptionName = "rpl-typeSetOnAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared,
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithTypeSetOnAnnotation(String ignored) {
latchTypeSetOnAnnotation.countDown();
return Mono.empty();
}

@Configuration(proxyBeanMethods = false)
static class WithSpecificTypesConfig {

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubTypeCustomizer() {
return (b) -> b.subscriptionType(SubscriptionType.Shared);
}

@ReactivePulsarListener(topics = "rpl-typeSetConsumerFactory-topic",
subscriptionName = "rpl-typeSetConsumerFactory-sub", subscriptionType = {},
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithTypeSetOnlyOnConsumerFactory(String ignored) {
latchTypeSetConsumerFactory.countDown();
return Mono.empty();
}

@ReactivePulsarListener(topics = "rpl-typeSetAnnotation-topic",
subscriptionName = "rpl-typeSetAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared,
consumerCustomizer = "subscriptionInitialPositionEarliest")
Mono<Void> listenWithTypeSetOnAnnotation(String ignored) {
latchTypeSetAnnotation.countDown();
return Mono.empty();
}

@ReactivePulsarListener(topics = "rpl-typeSetCustomizer-topic",
subscriptionName = "rpl-typeSetCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared,
consumerCustomizer = "myCustomizer")
Mono<Void> listenWithTypeSetInCustomizer(String ignored) {
latchWithCustomizer.countDown();
return Mono.empty();
}

@Bean
public ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Failover);
}
@ReactivePulsarListener(topics = "rpl-typeSetOnCustomizer-topic",
subscriptionName = "rpl-typeSetOnCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared,
consumerCustomizer = "myCustomizer")
Mono<Void> listenWithTypeSetOnCustomizer(String ignored) {
latchTypeSetOnCustomizer.countDown();
return Mono.empty();
}

@Bean
public ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionType(SubscriptionType.Failover);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
* @return single element array with the subscription type or empty array to indicate
* no type chosen by user
*/
SubscriptionType[] subscriptionType() default { SubscriptionType.Exclusive };
SubscriptionType[] subscriptionType() default {};

/**
* Pulsar schema type for this listener.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.Collection;
import java.util.HashSet;

import org.apache.pulsar.client.api.SubscriptionType;

import org.springframework.pulsar.core.PulsarConsumerFactory;
import org.springframework.pulsar.listener.ConcurrentPulsarMessageListenerContainer;
import org.springframework.pulsar.listener.PulsarContainerProperties;
Expand Down Expand Up @@ -74,6 +76,7 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu
PulsarContainerProperties properties = new PulsarContainerProperties();
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType());

var parentTxnProps = this.getContainerProperties().transactions();
var childTxnProps = properties.transactions();
Expand Down Expand Up @@ -102,6 +105,10 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu
if (endpoint.getSubscriptionType() != null) {
properties.setSubscriptionType(endpoint.getSubscriptionType());
}
// Default to Exclusive if not set on container props or endpoint
if (properties.getSubscriptionType() == null) {
properties.setSubscriptionType(SubscriptionType.Exclusive);
}

properties.setSchemaType(endpoint.getSchemaType());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,9 @@ else if (messageListener != null) {
topicNames, this.containerProperties.getSubscriptionName(), properties, customizers);
Assert.state(this.consumer != null, "Unable to create a consumer");

// If subtype is null - update it based on the actual subtype of the
// underlying consumer
if (this.subscriptionType == null) {
updateSubscriptionTypeFromConsumer(this.consumer);
}
// Update sub type from underlying consumer as customizer from annotation
// may have updated it
updateSubscriptionTypeFromConsumer(this.consumer);
}
catch (PulsarException e) {
DefaultPulsarMessageListenerContainer.this.logger.error(e, () -> "Pulsar exception.");
Expand Down
Loading