Skip to content

Commit 8300f1a

Browse files
committed
Subscription Type Propagation in the Binder
1 parent 19cc0ab commit 8300f1a

File tree

1 file changed

+4
-0
lines changed

1 file changed

+4
-0
lines changed

spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.pulsar.client.api.PulsarClientException;
2222
import org.apache.pulsar.client.api.Schema;
23+
import org.apache.pulsar.client.api.SubscriptionType;
2324
import org.apache.pulsar.common.schema.SchemaType;
2425

2526
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
@@ -162,6 +163,9 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
162163
}
163164
var subscriptionName = PulsarBinderUtils.subscriptionName(properties.getExtension(), destination);
164165
containerProperties.setSubscriptionName(subscriptionName);
166+
if (properties.getExtension().getSubscriptionType() != SubscriptionType.Exclusive) {
167+
containerProperties.setSubscriptionType(properties.getExtension().getSubscriptionType());
168+
}
165169

166170
var baseConsumerProps = new ConsumerConfigProperties().buildProperties();
167171
var binderConsumerProps = this.binderConfigProps.getConsumer().buildProperties();

0 commit comments

Comments
 (0)