diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java index 92a01fdd..aae8e018 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java @@ -18,6 +18,7 @@ import java.util.Collections; import java.util.List; +import java.util.function.Supplier; import java.util.regex.Pattern; import org.apache.pulsar.client.api.Schema; @@ -27,6 +28,7 @@ import org.jspecify.annotations.Nullable; import org.springframework.pulsar.core.PulsarTopicBuilder; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -101,6 +103,24 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); mutableSpec.setTopicNames(fullyQualifiedTopics); } + + if (mutableSpec.getDeadLetterPolicy() != null) { + var deadLetterPolicy = mutableSpec.getDeadLetterPolicy(); + fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getDeadLetterTopic, + deadLetterPolicy::setDeadLetterTopic); + fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getRetryLetterTopic, + deadLetterPolicy::setRetryLetterTopic); + } + } + + protected void fullyQualifyDeadLetterPolicyTopic(Supplier topicGetter, + java.util.function.Consumer topicSetter) { + Assert.notNull(this.topicBuilder, "topicBuilder must not be null"); + var topicName = topicGetter.get(); + if (StringUtils.hasText(topicName)) { + var fqTopicName = this.topicBuilder.getFullyQualifiedNameForTopic(topicName); + topicSetter.accept(fqTopicName); + } } protected void ensureTopicsPatternFullyQualified(ReactiveMessageConsumerBuilder consumerBuilder) { diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java index 6ae80fd1..9cf47181 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java @@ -24,7 +24,9 @@ import java.util.List; import java.util.regex.Pattern; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer; @@ -145,6 +147,33 @@ void createConsumerEnsureTopicsPatternFullyQualified() { verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopicsPattern); } + @Test + void createConsumerEnsureDeadLetterPolicyTopicsFullyQualified() throws PulsarClientException { + var topicBuilder = spy(new PulsarTopicBuilder()); + var consumerFactory = new DefaultReactivePulsarConsumerFactory( + AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null); + consumerFactory.setTopicBuilder(topicBuilder); + var deadLetterTopic = "with-topic-builder-reactive-ensure-dlp-dlt-fq"; + var retryLetterTopic = "%s-retry".formatted(deadLetterTopic); + var deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(2) + .deadLetterTopic(deadLetterTopic) + .retryLetterTopic(retryLetterTopic) + .build(); + var consumer = consumerFactory.createConsumer(SCHEMA, + Collections.singletonList(builder -> builder.deadLetterPolicy(deadLetterPolicy))); + var reactiveMessageConsumerSpec = assertThat(consumer) + .extracting("consumerSpec", InstanceOfAssertFactories.type(ReactiveMessageConsumerSpec.class)) + .actual(); + + assertThat(reactiveMessageConsumerSpec.getDeadLetterPolicy().getDeadLetterTopic()) + .isEqualTo("persistent://public/default/%s".formatted(deadLetterTopic)); + assertThat(reactiveMessageConsumerSpec.getDeadLetterPolicy().getRetryLetterTopic()) + .isEqualTo("persistent://public/default/%s".formatted(retryLetterTopic)); + verify(topicBuilder).getFullyQualifiedNameForTopic(deadLetterTopic); + verify(topicBuilder).getFullyQualifiedNameForTopic(retryLetterTopic); + } + } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index fceedaa8..c07184e7 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.function.Supplier; import java.util.regex.Pattern; import org.apache.pulsar.client.api.Consumer; @@ -34,6 +35,7 @@ import org.jspecify.annotations.Nullable; import org.springframework.pulsar.PulsarException; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -149,6 +151,24 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder builder) { var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics)); } + + if (builderImpl.getConf().getDeadLetterPolicy() != null) { + var deadLetterPolicy = builderImpl.getConf().getDeadLetterPolicy(); + fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getDeadLetterTopic, + deadLetterPolicy::setDeadLetterTopic); + fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getRetryLetterTopic, + deadLetterPolicy::setRetryLetterTopic); + } + } + + protected void fullyQualifyDeadLetterPolicyTopic(Supplier topicGetter, + java.util.function.Consumer topicSetter) { + Assert.notNull(this.topicBuilder, "topicBuilder must not be null"); + var topicName = topicGetter.get(); + if (StringUtils.hasText(topicName)) { + var fqTopicName = this.topicBuilder.getFullyQualifiedNameForTopic(topicName); + topicSetter.accept(fqTopicName); + } } protected void ensureTopicsPatternFullyQualified(ConsumerBuilder builder) { diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java index ae76bb82..de11437a 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; @@ -273,10 +274,10 @@ void multipleConfigCustomizers() throws PulsarClientException { } @Nested - class CreateConsumerUsingPulsarTopicBuilder { + class CreateConsumerWithTopicBuilder { @Test - void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientException { + void ensureTopicNamesFullyQualified() throws PulsarClientException { var pulsarTopicBuilder = spy(new PulsarTopicBuilder()); DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, null); @@ -289,7 +290,7 @@ void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientE } @Test - void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClientException { + void ensureTopicsPatternFullyQualified() throws PulsarClientException { var pulsarTopicBuilder = spy(new PulsarTopicBuilder()); ConsumerBuilderCustomizer customizer = (builder) -> builder.topicsPattern("topic-.*"); DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, @@ -307,6 +308,32 @@ void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClie } } + @Test + void ensureDeadLetterPolicyTopicsFullyQualified() throws PulsarClientException { + var pulsarTopicBuilder = spy(new PulsarTopicBuilder()); + var deadLetterTopic = "with-pulsar-topic-builder-ensure-dlp-dlt-fq"; + var retryLetterTopic = "%s-retry".formatted(deadLetterTopic); + var deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(2) + .deadLetterTopic(deadLetterTopic) + .retryLetterTopic(retryLetterTopic) + .build(); + ConsumerBuilderCustomizer customizer = (builder) -> builder.deadLetterPolicy(deadLetterPolicy); + DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, + List.of(customizer)); + consumerFactory.setTopicBuilder(pulsarTopicBuilder); + + try (Consumer consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"), + "%s-sub".formatted(deadLetterTopic), null, null)) { + assertThat(consumer).extracting("deadLetterPolicy.deadLetterTopic") + .isEqualTo("persistent://public/default/%s".formatted(deadLetterTopic)); + assertThat(consumer).extracting("deadLetterPolicy.retryLetterTopic") + .isEqualTo("persistent://public/default/%s".formatted(retryLetterTopic)); + verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic(deadLetterTopic); + verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic(retryLetterTopic); + } + } + // TODO remove when Pulsar client updates to 4.2.0 private void temporarilyDealWithPulsar24698(PatternMultiTopicsConsumerImpl consumer) { // See https://github.com/apache/pulsar/pull/24698