From 56c614937d7fb5a475a4d16155b8b570c7266e90 Mon Sep 17 00:00:00 2001 From: Andrey Litvitski Date: Sat, 20 Sep 2025 20:56:41 +0300 Subject: [PATCH 1/2] Allow dead letter topics to be non-fully-qualified. The consumer factories now respect the configured PulsarTopicBuilder and will fully-qualify the `deadLetterTopic` and `retryLetterTopic` on the `DeadLetterPolicy`. This allows users to not have to fully-qualify these topic names. Resolves #1241 Signed-off-by: Andrey Litvitski --- .../core/DefaultReactivePulsarConsumerFactory.java | 12 ++++++++++++ .../pulsar/core/DefaultPulsarConsumerFactory.java | 14 ++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java index 92a01fdd..b34e910c 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java @@ -101,6 +101,18 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); mutableSpec.setTopicNames(fullyQualifiedTopics); } + if (mutableSpec.getDeadLetterPolicy() != null) { + var dlt = mutableSpec.getDeadLetterPolicy().getDeadLetterTopic(); + if (dlt != null) { + mutableSpec.getDeadLetterPolicy() + .setDeadLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(dlt)); + } + var rlt = mutableSpec.getDeadLetterPolicy().getRetryLetterTopic(); + if (rlt != null) { + mutableSpec.getDeadLetterPolicy() + .setRetryLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(rlt)); + } + } } protected void ensureTopicsPatternFullyQualified(ReactiveMessageConsumerBuilder consumerBuilder) { diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index fceedaa8..772bb068 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -149,6 +149,20 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder builder) { var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics)); } + if (builderImpl.getConf().getDeadLetterPolicy() != null) { + var dlt = builderImpl.getConf().getDeadLetterPolicy().getDeadLetterTopic(); + if (dlt != null) { + builderImpl.getConf() + .getDeadLetterPolicy() + .setDeadLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(dlt)); + } + var rlt = builderImpl.getConf().getDeadLetterPolicy().getRetryLetterTopic(); + if (rlt != null) { + builderImpl.getConf() + .getDeadLetterPolicy() + .setRetryLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(rlt)); + } + } } protected void ensureTopicsPatternFullyQualified(ConsumerBuilder builder) { From eca7b8687f386832163fbeddff1a6cec34aed20b Mon Sep 17 00:00:00 2001 From: onobc Date: Sun, 21 Sep 2025 15:38:59 -0500 Subject: [PATCH 2/2] Polish "Allow dead letter topics to be non-fully-qualified" Adds to the previous commit by... - adding a test for the fully qualified dead/retry letter topics - de-duplicate the logic for the retry and dead letter topic Signed-off-by: onobc --- .../DefaultReactivePulsarConsumerFactory.java | 28 ++++++++++------ ...ultReactivePulsarConsumerFactoryTests.java | 29 ++++++++++++++++ .../core/DefaultPulsarConsumerFactory.java | 30 ++++++++++------- .../DefaultPulsarConsumerFactoryTests.java | 33 +++++++++++++++++-- 4 files changed, 95 insertions(+), 25 deletions(-) diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java index b34e910c..aae8e018 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java @@ -18,6 +18,7 @@ import java.util.Collections; import java.util.List; +import java.util.function.Supplier; import java.util.regex.Pattern; import org.apache.pulsar.client.api.Schema; @@ -27,6 +28,7 @@ import org.jspecify.annotations.Nullable; import org.springframework.pulsar.core.PulsarTopicBuilder; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -101,17 +103,23 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); mutableSpec.setTopicNames(fullyQualifiedTopics); } + if (mutableSpec.getDeadLetterPolicy() != null) { - var dlt = mutableSpec.getDeadLetterPolicy().getDeadLetterTopic(); - if (dlt != null) { - mutableSpec.getDeadLetterPolicy() - .setDeadLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(dlt)); - } - var rlt = mutableSpec.getDeadLetterPolicy().getRetryLetterTopic(); - if (rlt != null) { - mutableSpec.getDeadLetterPolicy() - .setRetryLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(rlt)); - } + var deadLetterPolicy = mutableSpec.getDeadLetterPolicy(); + fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getDeadLetterTopic, + deadLetterPolicy::setDeadLetterTopic); + fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getRetryLetterTopic, + deadLetterPolicy::setRetryLetterTopic); + } + } + + protected void fullyQualifyDeadLetterPolicyTopic(Supplier topicGetter, + java.util.function.Consumer topicSetter) { + Assert.notNull(this.topicBuilder, "topicBuilder must not be null"); + var topicName = topicGetter.get(); + if (StringUtils.hasText(topicName)) { + var fqTopicName = this.topicBuilder.getFullyQualifiedNameForTopic(topicName); + topicSetter.accept(fqTopicName); } } diff --git a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java index 6ae80fd1..9cf47181 100644 --- a/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java +++ b/spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactoryTests.java @@ -24,7 +24,9 @@ import java.util.List; import java.util.regex.Pattern; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory; import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer; @@ -145,6 +147,33 @@ void createConsumerEnsureTopicsPatternFullyQualified() { verify(topicBuilder).getFullyQualifiedNameForTopic(inputTopicsPattern); } + @Test + void createConsumerEnsureDeadLetterPolicyTopicsFullyQualified() throws PulsarClientException { + var topicBuilder = spy(new PulsarTopicBuilder()); + var consumerFactory = new DefaultReactivePulsarConsumerFactory( + AdaptedReactivePulsarClientFactory.create((PulsarClient) null), null); + consumerFactory.setTopicBuilder(topicBuilder); + var deadLetterTopic = "with-topic-builder-reactive-ensure-dlp-dlt-fq"; + var retryLetterTopic = "%s-retry".formatted(deadLetterTopic); + var deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(2) + .deadLetterTopic(deadLetterTopic) + .retryLetterTopic(retryLetterTopic) + .build(); + var consumer = consumerFactory.createConsumer(SCHEMA, + Collections.singletonList(builder -> builder.deadLetterPolicy(deadLetterPolicy))); + var reactiveMessageConsumerSpec = assertThat(consumer) + .extracting("consumerSpec", InstanceOfAssertFactories.type(ReactiveMessageConsumerSpec.class)) + .actual(); + + assertThat(reactiveMessageConsumerSpec.getDeadLetterPolicy().getDeadLetterTopic()) + .isEqualTo("persistent://public/default/%s".formatted(deadLetterTopic)); + assertThat(reactiveMessageConsumerSpec.getDeadLetterPolicy().getRetryLetterTopic()) + .isEqualTo("persistent://public/default/%s".formatted(retryLetterTopic)); + verify(topicBuilder).getFullyQualifiedNameForTopic(deadLetterTopic); + verify(topicBuilder).getFullyQualifiedNameForTopic(retryLetterTopic); + } + } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index 772bb068..c07184e7 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Objects; import java.util.TreeMap; +import java.util.function.Supplier; import java.util.regex.Pattern; import org.apache.pulsar.client.api.Consumer; @@ -34,6 +35,7 @@ import org.jspecify.annotations.Nullable; import org.springframework.pulsar.PulsarException; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -149,19 +151,23 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder builder) { var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics)); } + if (builderImpl.getConf().getDeadLetterPolicy() != null) { - var dlt = builderImpl.getConf().getDeadLetterPolicy().getDeadLetterTopic(); - if (dlt != null) { - builderImpl.getConf() - .getDeadLetterPolicy() - .setDeadLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(dlt)); - } - var rlt = builderImpl.getConf().getDeadLetterPolicy().getRetryLetterTopic(); - if (rlt != null) { - builderImpl.getConf() - .getDeadLetterPolicy() - .setRetryLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(rlt)); - } + var deadLetterPolicy = builderImpl.getConf().getDeadLetterPolicy(); + fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getDeadLetterTopic, + deadLetterPolicy::setDeadLetterTopic); + fullyQualifyDeadLetterPolicyTopic(deadLetterPolicy::getRetryLetterTopic, + deadLetterPolicy::setRetryLetterTopic); + } + } + + protected void fullyQualifyDeadLetterPolicyTopic(Supplier topicGetter, + java.util.function.Consumer topicSetter) { + Assert.notNull(this.topicBuilder, "topicBuilder must not be null"); + var topicName = topicGetter.get(); + if (StringUtils.hasText(topicName)) { + var fqTopicName = this.topicBuilder.getFullyQualifiedNameForTopic(topicName); + topicSetter.accept(fqTopicName); } } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java index ae76bb82..de11437a 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactoryTests.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; @@ -273,10 +274,10 @@ void multipleConfigCustomizers() throws PulsarClientException { } @Nested - class CreateConsumerUsingPulsarTopicBuilder { + class CreateConsumerWithTopicBuilder { @Test - void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientException { + void ensureTopicNamesFullyQualified() throws PulsarClientException { var pulsarTopicBuilder = spy(new PulsarTopicBuilder()); DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, null); @@ -289,7 +290,7 @@ void withPulsarTopicBuilderEnsureTopicNamesFullyQualified() throws PulsarClientE } @Test - void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClientException { + void ensureTopicsPatternFullyQualified() throws PulsarClientException { var pulsarTopicBuilder = spy(new PulsarTopicBuilder()); ConsumerBuilderCustomizer customizer = (builder) -> builder.topicsPattern("topic-.*"); DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, @@ -307,6 +308,32 @@ void withPulsarTopicBuilderEnsureTopicsPatternFullyQualified() throws PulsarClie } } + @Test + void ensureDeadLetterPolicyTopicsFullyQualified() throws PulsarClientException { + var pulsarTopicBuilder = spy(new PulsarTopicBuilder()); + var deadLetterTopic = "with-pulsar-topic-builder-ensure-dlp-dlt-fq"; + var retryLetterTopic = "%s-retry".formatted(deadLetterTopic); + var deadLetterPolicy = DeadLetterPolicy.builder() + .maxRedeliverCount(2) + .deadLetterTopic(deadLetterTopic) + .retryLetterTopic(retryLetterTopic) + .build(); + ConsumerBuilderCustomizer customizer = (builder) -> builder.deadLetterPolicy(deadLetterPolicy); + DefaultPulsarConsumerFactory consumerFactory = new DefaultPulsarConsumerFactory<>(pulsarClient, + List.of(customizer)); + consumerFactory.setTopicBuilder(pulsarTopicBuilder); + + try (Consumer consumer = consumerFactory.createConsumer(SCHEMA, Collections.singletonList("topic1"), + "%s-sub".formatted(deadLetterTopic), null, null)) { + assertThat(consumer).extracting("deadLetterPolicy.deadLetterTopic") + .isEqualTo("persistent://public/default/%s".formatted(deadLetterTopic)); + assertThat(consumer).extracting("deadLetterPolicy.retryLetterTopic") + .isEqualTo("persistent://public/default/%s".formatted(retryLetterTopic)); + verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic(deadLetterTopic); + verify(pulsarTopicBuilder).getFullyQualifiedNameForTopic(retryLetterTopic); + } + } + // TODO remove when Pulsar client updates to 4.2.0 private void temporarilyDealWithPulsar24698(PatternMultiTopicsConsumerImpl consumer) { // See https://github.com/apache/pulsar/pull/24698