diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java index efc3b98d..707b685b 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core/DefaultReactivePulsarConsumerFactory.java @@ -98,6 +98,18 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); mutableSpec.setTopicNames(fullyQualifiedTopics); } + if (mutableSpec.getDeadLetterPolicy() != null) { + var dlt = mutableSpec.getDeadLetterPolicy().getDeadLetterTopic(); + if (dlt != null) { + mutableSpec.getDeadLetterPolicy() + .setDeadLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(dlt)); + } + var rlt = mutableSpec.getDeadLetterPolicy().getRetryLetterTopic(); + if (rlt != null) { + mutableSpec.getDeadLetterPolicy() + .setRetryLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(rlt)); + } + } } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java index ec547c8d..a1440a64 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultPulsarConsumerFactory.java @@ -146,6 +146,20 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder builder) { var fullyQualifiedTopics = topics.stream().map(this.topicBuilder::getFullyQualifiedNameForTopic).toList(); builderImpl.getConf().setTopicNames(new HashSet<>(fullyQualifiedTopics)); } + if (builderImpl.getConf().getDeadLetterPolicy() != null) { + var dlt = builderImpl.getConf().getDeadLetterPolicy().getDeadLetterTopic(); + if (dlt != null) { + builderImpl.getConf() + .getDeadLetterPolicy() + .setDeadLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(dlt)); + } + var rlt = builderImpl.getConf().getDeadLetterPolicy().getRetryLetterTopic(); + if (rlt != null) { + builderImpl.getConf() + .getDeadLetterPolicy() + .setRetryLetterTopic(this.topicBuilder.getFullyQualifiedNameForTopic(rlt)); + } + } } }