Skip to content

Commit a46488d

Browse files
committed
Move default subscription type to factory
This commit moves the default subscription type from the `@PulsarListener` annotation to the container factory (props) which allows the `spring.pulsar.consumer.subscription-type` config prop to be respected. See spring-projects/spring-boot#42053
1 parent 9064623 commit a46488d

File tree

3 files changed

+53
-1
lines changed

3 files changed

+53
-1
lines changed

spring-pulsar/src/main/java/org/springframework/pulsar/annotation/PulsarListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@
8181
* @return single element array with the subscription type or empty array to indicate
8282
* no type chosen by user
8383
*/
84-
SubscriptionType[] subscriptionType() default { SubscriptionType.Exclusive };
84+
SubscriptionType[] subscriptionType() default {};
8585

8686
/**
8787
* Pulsar schema type for this listener.

spring-pulsar/src/main/java/org/springframework/pulsar/config/ConcurrentPulsarListenerContainerFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.Collection;
2121
import java.util.HashSet;
2222

23+
import org.apache.pulsar.client.api.SubscriptionType;
24+
2325
import org.springframework.pulsar.core.PulsarConsumerFactory;
2426
import org.springframework.pulsar.listener.ConcurrentPulsarMessageListenerContainer;
2527
import org.springframework.pulsar.listener.PulsarContainerProperties;
@@ -74,6 +76,7 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu
7476
PulsarContainerProperties properties = new PulsarContainerProperties();
7577
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
7678
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
79+
properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType());
7780

7881
var parentTxnProps = this.getContainerProperties().transactions();
7982
var childTxnProps = properties.transactions();
@@ -102,6 +105,10 @@ protected ConcurrentPulsarMessageListenerContainer<T> createContainerInstance(Pu
102105
if (endpoint.getSubscriptionType() != null) {
103106
properties.setSubscriptionType(endpoint.getSubscriptionType());
104107
}
108+
// Default to Exclusive if not set on container props or endpoint
109+
if (properties.getSubscriptionType() == null) {
110+
properties.setSubscriptionType(SubscriptionType.Exclusive);
111+
}
105112

106113
properties.setSchemaType(endpoint.getSchemaType());
107114

spring-pulsar/src/test/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainerTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,51 @@ private record PulsarListenerMockComponents(PulsarConsumerFactory<String> consum
212212
Consumer<String> consumer, ConcurrentPulsarMessageListenerContainer<String> concurrentContainer) {
213213
}
214214

215+
@SuppressWarnings("unchecked")
216+
@Nested
217+
class SubscriptionTypeFrom {
218+
219+
@Test
220+
void factoryPropsUsedWhenNotSetOnEndpoint() {
221+
var factoryProps = new PulsarContainerProperties();
222+
factoryProps.setSubscriptionType(SubscriptionType.Shared);
223+
var containerFactory = new ConcurrentPulsarListenerContainerFactory<String>(
224+
mock(PulsarConsumerFactory.class), factoryProps);
225+
var endpoint = mock(PulsarListenerEndpoint.class);
226+
when(endpoint.getConcurrency()).thenReturn(1);
227+
var createdContainer = containerFactory.createListenerContainer(endpoint);
228+
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
229+
.isEqualTo(SubscriptionType.Shared);
230+
}
231+
232+
@Test
233+
void endpointTakesPrecedenceOverFactoryProps() {
234+
var factoryProps = new PulsarContainerProperties();
235+
factoryProps.setSubscriptionType(SubscriptionType.Shared);
236+
var containerFactory = new ConcurrentPulsarListenerContainerFactory<String>(
237+
mock(PulsarConsumerFactory.class), factoryProps);
238+
var endpoint = mock(PulsarListenerEndpoint.class);
239+
when(endpoint.getConcurrency()).thenReturn(1);
240+
when(endpoint.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
241+
var createdContainer = containerFactory.createListenerContainer(endpoint);
242+
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
243+
.isEqualTo(SubscriptionType.Failover);
244+
}
245+
246+
@Test
247+
void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {
248+
var factoryProps = new PulsarContainerProperties();
249+
var containerFactory = new ConcurrentPulsarListenerContainerFactory<String>(
250+
mock(PulsarConsumerFactory.class), factoryProps);
251+
var endpoint = mock(PulsarListenerEndpoint.class);
252+
when(endpoint.getConcurrency()).thenReturn(1);
253+
var createdContainer = containerFactory.createListenerContainer(endpoint);
254+
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
255+
.isEqualTo(SubscriptionType.Exclusive);
256+
}
257+
258+
}
259+
215260
@Nested
216261
class ObservationConfigurationTests {
217262

0 commit comments

Comments
 (0)