File tree Expand file tree Collapse file tree 2 files changed +26
-0
lines changed
spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/core
spring-pulsar/src/main/java/org/springframework/pulsar/core Expand file tree Collapse file tree 2 files changed +26
-0
lines changed Original file line number Diff line number Diff line change @@ -98,6 +98,18 @@ protected void ensureTopicNamesFullyQualified(ReactiveMessageConsumerBuilder<T>
9898 var fullyQualifiedTopics = topics .stream ().map (this .topicBuilder ::getFullyQualifiedNameForTopic ).toList ();
9999 mutableSpec .setTopicNames (fullyQualifiedTopics );
100100 }
101+ if (mutableSpec .getDeadLetterPolicy () != null ) {
102+ var dlt = mutableSpec .getDeadLetterPolicy ().getDeadLetterTopic ();
103+ if (dlt != null ) {
104+ mutableSpec .getDeadLetterPolicy ()
105+ .setDeadLetterTopic (this .topicBuilder .getFullyQualifiedNameForTopic (dlt ));
106+ }
107+ var rlt = mutableSpec .getDeadLetterPolicy ().getRetryLetterTopic ();
108+ if (rlt != null ) {
109+ mutableSpec .getDeadLetterPolicy ()
110+ .setRetryLetterTopic (this .topicBuilder .getFullyQualifiedNameForTopic (rlt ));
111+ }
112+ }
101113 }
102114
103115}
Original file line number Diff line number Diff line change @@ -146,6 +146,20 @@ protected void ensureTopicNamesFullyQualified(ConsumerBuilder<T> builder) {
146146 var fullyQualifiedTopics = topics .stream ().map (this .topicBuilder ::getFullyQualifiedNameForTopic ).toList ();
147147 builderImpl .getConf ().setTopicNames (new HashSet <>(fullyQualifiedTopics ));
148148 }
149+ if (builderImpl .getConf ().getDeadLetterPolicy () != null ) {
150+ var dlt = builderImpl .getConf ().getDeadLetterPolicy ().getDeadLetterTopic ();
151+ if (dlt != null ) {
152+ builderImpl .getConf ()
153+ .getDeadLetterPolicy ()
154+ .setDeadLetterTopic (this .topicBuilder .getFullyQualifiedNameForTopic (dlt ));
155+ }
156+ var rlt = builderImpl .getConf ().getDeadLetterPolicy ().getRetryLetterTopic ();
157+ if (rlt != null ) {
158+ builderImpl .getConf ()
159+ .getDeadLetterPolicy ()
160+ .setRetryLetterTopic (this .topicBuilder .getFullyQualifiedNameForTopic (rlt ));
161+ }
162+ }
149163 }
150164
151165}
You can’t perform that action at this time.
0 commit comments