Skip to content

Commit 29cfd30

Browse files
committed
Better container properties sync
* If consumer properties are directly overridden, we need to sync that with container properties. This commit tries to centralize this synching.
1 parent 71241a2 commit 29cfd30

File tree

4 files changed

+41
-4
lines changed

4 files changed

+41
-4
lines changed

spring-pulsar-spring-cloud-stream-binder/src/main/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarMessageChannelBinder.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.pulsar.client.api.PulsarClientException;
2222
import org.apache.pulsar.client.api.Schema;
23-
import org.apache.pulsar.client.api.SubscriptionType;
2423
import org.apache.pulsar.common.schema.SchemaType;
2524

2625
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
@@ -163,16 +162,14 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
163162
}
164163
var subscriptionName = PulsarBinderUtils.subscriptionName(properties.getExtension(), destination);
165164
containerProperties.setSubscriptionName(subscriptionName);
166-
if (properties.getExtension().getSubscriptionType() != SubscriptionType.Exclusive) {
167-
containerProperties.setSubscriptionType(properties.getExtension().getSubscriptionType());
168-
}
169165

170166
var baseConsumerProps = new ConsumerConfigProperties().buildProperties();
171167
var binderConsumerProps = this.binderConfigProps.getConsumer().buildProperties();
172168
var bindingConsumerProps = properties.getExtension().buildProperties();
173169
var mergedConsumerProps = PulsarBinderUtils.mergePropertiesWithPrecedence(baseConsumerProps,
174170
binderConsumerProps, bindingConsumerProps);
175171
containerProperties.getPulsarConsumerProperties().putAll(mergedConsumerProps);
172+
containerProperties.updateContainerProperties();
176173

177174
var container = new DefaultPulsarMessageListenerContainer<>(this.pulsarConsumerFactory, containerProperties);
178175
messageDrivenChannelAdapter.setMessageListenerContainer(container);

spring-pulsar-spring-cloud-stream-binder/src/test/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarExtendedBindingPropertiesTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.pulsar.client.api.ProducerAccessMode;
2626
import org.apache.pulsar.client.api.SubscriptionMode;
27+
import org.apache.pulsar.client.api.SubscriptionType;
2728
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
2829
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
2930
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
@@ -34,6 +35,7 @@
3435
import org.springframework.boot.context.properties.bind.Binder;
3536
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
3637
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
38+
import org.springframework.pulsar.listener.PulsarContainerProperties;
3739
import org.springframework.pulsar.spring.cloud.stream.binder.properties.PulsarExtendedBindingProperties;
3840

3941
/**
@@ -111,4 +113,25 @@ void consumerProperties() {
111113
// @formatter:on
112114
}
113115

116+
@Test
117+
void extendedBindingsArePropagatedToContainerProperties() {
118+
Map<String, String> props = new HashMap<>();
119+
props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription-name", "my-foo-sbscription");
120+
props.put("spring.cloud.stream.pulsar.bindings.my-foo.consumer.subscription-type", "Shared");
121+
122+
bind(props);
123+
124+
var bindingConsumerProps = properties.getExtendedConsumerProperties("my-foo").buildProperties();
125+
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
126+
pulsarContainerProperties.getPulsarConsumerProperties().putAll(bindingConsumerProps);
127+
128+
assertThat(pulsarContainerProperties.getSubscriptionName()).isNull();
129+
assertThat(pulsarContainerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Exclusive);
130+
131+
pulsarContainerProperties.updateContainerProperties();
132+
133+
assertThat(pulsarContainerProperties.getSubscriptionName()).isEqualTo("my-foo-sbscription");
134+
assertThat(pulsarContainerProperties.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
135+
}
136+
114137
}

spring-pulsar/src/main/java/org/springframework/pulsar/config/AbstractPulsarListenerContainerFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ else if (this.autoStartup != null) {
178178
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
179179
.acceptIfNotNull(endpoint.getConsumerProperties(),
180180
instance.getContainerProperties()::setPulsarConsumerProperties);
181+
// Update container properties if there are relevant direct consumer properties
182+
instanceProperties.updateContainerProperties();
181183
}
182184

183185
}

spring-pulsar/src/main/java/org/springframework/pulsar/listener/PulsarContainerProperties.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ public class PulsarContainerProperties {
4242

4343
private static final Duration DEFAULT_CONSUMER_START_TIMEOUT = Duration.ofSeconds(30);
4444

45+
private static final String SUBSCRIPTION_NAME = "subscriptionName";
46+
47+
private static final String SUBSCRIPTION_TYPE = "subscriptionType";
48+
4549
private Duration consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT;
4650

4751
private String[] topics;
@@ -246,4 +250,15 @@ public void setPulsarConsumerProperties(Properties pulsarConsumerProperties) {
246250
this.pulsarConsumerProperties = pulsarConsumerProperties;
247251
}
248252

253+
public void updateContainerProperties() {
254+
if (!this.pulsarConsumerProperties.isEmpty()) {
255+
if (this.pulsarConsumerProperties.containsKey(SUBSCRIPTION_NAME)) {
256+
this.subscriptionName = (String) this.pulsarConsumerProperties.get(SUBSCRIPTION_NAME);
257+
}
258+
if (this.pulsarConsumerProperties.containsKey(SUBSCRIPTION_TYPE)) {
259+
this.subscriptionType = (SubscriptionType) this.pulsarConsumerProperties.get(SUBSCRIPTION_TYPE);
260+
}
261+
}
262+
}
263+
249264
}

0 commit comments

Comments
 (0)