Skip to content

Commit 56ad97a

Browse files
committed
Add ReactivePulsarListener support
1 parent 16dbc9c commit 56ad97a

File tree

4 files changed

+138
-86
lines changed

4 files changed

+138
-86
lines changed

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactory.java

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.Arrays;
2020
import java.util.List;
21+
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
import org.apache.pulsar.client.api.Schema;
2324
import org.apache.pulsar.client.api.SubscriptionType;
@@ -39,6 +40,10 @@
3940
*/
4041
public class DefaultReactivePulsarListenerContainerFactory<T> implements ReactivePulsarListenerContainerFactory<T> {
4142

43+
private static final String SUBSCRIPTION_NAME_PREFIX = "org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#";
44+
45+
private static final AtomicInteger COUNTER = new AtomicInteger();
46+
4247
protected final LogAccessor logger = new LogAccessor(this.getClass());
4348

4449
private final ReactivePulsarConsumerFactory<T> consumerFactory;
@@ -84,58 +89,54 @@ public void setFluxListener(Boolean fluxListener) {
8489
@SuppressWarnings("unchecked")
8590
public DefaultReactivePulsarMessageListenerContainer<T> createContainerInstance(
8691
ReactivePulsarListenerEndpoint<T> endpoint) {
87-
88-
ReactivePulsarContainerProperties<T> properties = new ReactivePulsarContainerProperties<>();
89-
properties.setSchemaResolver(this.getContainerProperties().getSchemaResolver());
90-
properties.setTopicResolver(this.getContainerProperties().getTopicResolver());
91-
properties.setSubscriptionType(this.getContainerProperties().getSubscriptionType());
92-
92+
var containerProps = new ReactivePulsarContainerProperties<T>();
93+
var factoryProps = this.getContainerProperties();
94+
95+
// Map factory props (defaults) to the container props
96+
containerProps.setSchemaResolver(factoryProps.getSchemaResolver());
97+
containerProps.setTopicResolver(factoryProps.getTopicResolver());
98+
containerProps.setSubscriptionType(factoryProps.getSubscriptionType());
99+
containerProps.setSubscriptionName(factoryProps.getSubscriptionName());
100+
containerProps.setSchemaType(factoryProps.getSchemaType());
101+
containerProps.setConcurrency(factoryProps.getConcurrency());
102+
containerProps.setUseKeyOrderedProcessing(factoryProps.isUseKeyOrderedProcessing());
103+
104+
// Map relevant props from the endpoint to the container props
93105
if (!CollectionUtils.isEmpty(endpoint.getTopics())) {
94-
properties.setTopics(endpoint.getTopics());
106+
containerProps.setTopics(endpoint.getTopics());
95107
}
96-
97108
if (StringUtils.hasText(endpoint.getTopicPattern())) {
98-
properties.setTopicsPattern(endpoint.getTopicPattern());
109+
containerProps.setTopicsPattern(endpoint.getTopicPattern());
99110
}
100-
101-
if (StringUtils.hasText(endpoint.getSubscriptionName())) {
102-
properties.setSubscriptionName(endpoint.getSubscriptionName());
103-
}
104-
105111
if (endpoint.getSubscriptionType() != null) {
106-
properties.setSubscriptionType(endpoint.getSubscriptionType());
112+
containerProps.setSubscriptionType(endpoint.getSubscriptionType());
107113
}
108-
// Default to Exclusive if not set on container props or endpoint
109-
if (properties.getSubscriptionType() == null) {
110-
properties.setSubscriptionType(SubscriptionType.Exclusive);
114+
// Default subscription type to Exclusive when not set elsewhere
115+
if (containerProps.getSubscriptionType() == null) {
116+
containerProps.setSubscriptionType(SubscriptionType.Exclusive);
111117
}
112-
113-
if (endpoint.getSchemaType() != null) {
114-
properties.setSchemaType(endpoint.getSchemaType());
118+
if (StringUtils.hasText(endpoint.getSubscriptionName())) {
119+
containerProps.setSubscriptionName(endpoint.getSubscriptionName());
115120
}
116-
else {
117-
properties.setSchemaType(this.containerProperties.getSchemaType());
121+
// Default subscription name to generated when not set elsewhere
122+
if (!StringUtils.hasText(containerProps.getSubscriptionName())) {
123+
var generatedName = SUBSCRIPTION_NAME_PREFIX + COUNTER.getAndIncrement();
124+
containerProps.setSubscriptionName(generatedName);
118125
}
119-
120-
if (properties.getSchema() == null) {
121-
properties.setSchema((Schema<T>) Schema.BYTES);
126+
if (endpoint.getSchemaType() != null) {
127+
containerProps.setSchemaType(endpoint.getSchemaType());
122128
}
123-
124-
if (endpoint.getConcurrency() != null) {
125-
properties.setConcurrency(endpoint.getConcurrency());
129+
// Default to BYTES if not set elsewhere
130+
if (containerProps.getSchema() == null) {
131+
containerProps.setSchema((Schema<T>) Schema.BYTES);
126132
}
127-
else {
128-
properties.setConcurrency(this.containerProperties.getConcurrency());
133+
if (endpoint.getConcurrency() != null) {
134+
containerProps.setConcurrency(endpoint.getConcurrency());
129135
}
130-
131136
if (endpoint.getUseKeyOrderedProcessing() != null) {
132-
properties.setUseKeyOrderedProcessing(endpoint.getUseKeyOrderedProcessing());
137+
containerProps.setUseKeyOrderedProcessing(endpoint.getUseKeyOrderedProcessing());
133138
}
134-
else {
135-
properties.setUseKeyOrderedProcessing(this.containerProperties.isUseKeyOrderedProcessing());
136-
}
137-
138-
return new DefaultReactivePulsarMessageListenerContainer<>(this.getConsumerFactory(), properties);
139+
return new DefaultReactivePulsarMessageListenerContainer<>(this.getConsumerFactory(), containerProps);
139140
}
140141

141142
@Override

spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/annotation/ReactivePulsarListenerAnnotationBeanPostProcessor.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -230,11 +230,11 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene
230230
ReactivePulsarListener reactivePulsarListener, Object bean, String[] topics, String topicPattern) {
231231
endpoint.setBean(bean);
232232
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
233-
endpoint.setSubscriptionName(getEndpointSubscriptionName(reactivePulsarListener));
234233
endpoint.setId(getEndpointId(reactivePulsarListener));
235234
endpoint.setTopics(topics);
236235
endpoint.setTopicPattern(topicPattern);
237236
resolveSubscriptionType(endpoint, reactivePulsarListener);
237+
resolveSubscriptionName(endpoint, reactivePulsarListener);
238238
endpoint.setSchemaType(reactivePulsarListener.schemaType());
239239
String concurrency = reactivePulsarListener.concurrency();
240240
if (StringUtils.hasText(concurrency)) {
@@ -257,11 +257,18 @@ private void processReactivePulsarListenerAnnotation(MethodReactivePulsarListene
257257
}
258258

259259
private void resolveSubscriptionType(MethodReactivePulsarListenerEndpoint<?> endpoint,
260-
ReactivePulsarListener reactivePulsarListener) {
261-
Assert.state(reactivePulsarListener.subscriptionType().length <= 1,
260+
ReactivePulsarListener listener) {
261+
Assert.state(listener.subscriptionType().length <= 1,
262262
() -> "ReactivePulsarListener.subscriptionType must have 0 or 1 elements");
263-
if (reactivePulsarListener.subscriptionType().length == 1) {
264-
endpoint.setSubscriptionType(reactivePulsarListener.subscriptionType()[0]);
263+
if (listener.subscriptionType().length == 1) {
264+
endpoint.setSubscriptionType(listener.subscriptionType()[0]);
265+
}
266+
}
267+
268+
private void resolveSubscriptionName(MethodReactivePulsarListenerEndpoint<?> endpoint,
269+
ReactivePulsarListener listener) {
270+
if (StringUtils.hasText(listener.subscriptionName())) {
271+
endpoint.setSubscriptionName(resolveExpressionAsString(listener.subscriptionName(), "subscriptionName"));
265272
}
266273
}
267274

@@ -322,13 +329,6 @@ private void resolveConsumerCustomizer(MethodReactivePulsarListenerEndpoint<?> e
322329
}
323330
}
324331

325-
private String getEndpointSubscriptionName(ReactivePulsarListener reactivePulsarListener) {
326-
if (StringUtils.hasText(reactivePulsarListener.subscriptionName())) {
327-
return resolveExpressionAsString(reactivePulsarListener.subscriptionName(), "subscriptionName");
328-
}
329-
return GENERATED_ID_PREFIX + this.counter.getAndIncrement();
330-
}
331-
332332
private String getEndpointId(ReactivePulsarListener reactivePulsarListener) {
333333
if (StringUtils.hasText(reactivePulsarListener.id())) {
334334
return resolveExpressionAsString(reactivePulsarListener.id(), "id");

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/config/DefaultReactivePulsarListenerContainerFactoryTests.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,55 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {
7878

7979
}
8080

81+
@SuppressWarnings("unchecked")
82+
@Nested
83+
class SubscriptionNameFrom {
84+
85+
@Test
86+
void factoryPropsUsedWhenNotSetOnEndpoint() {
87+
var factoryProps = new ReactivePulsarContainerProperties<String>();
88+
factoryProps.setSubscriptionName("my-factory-subscription");
89+
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
90+
mock(ReactivePulsarConsumerFactory.class), factoryProps);
91+
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
92+
when(endpoint.getConcurrency()).thenReturn(1);
93+
var createdContainer = containerFactory.createListenerContainer(endpoint);
94+
assertThat(createdContainer.getContainerProperties().getSubscriptionName())
95+
.isEqualTo("my-factory-subscription");
96+
}
97+
98+
@Test
99+
void endpointTakesPrecedenceOverFactoryProps() {
100+
var factoryProps = new ReactivePulsarContainerProperties<String>();
101+
factoryProps.setSubscriptionName("my-factory-subscription");
102+
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
103+
mock(ReactivePulsarConsumerFactory.class), factoryProps);
104+
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
105+
when(endpoint.getConcurrency()).thenReturn(1);
106+
when(endpoint.getSubscriptionName()).thenReturn("my-endpoint-subscription");
107+
var createdContainer = containerFactory.createListenerContainer(endpoint);
108+
assertThat(createdContainer.getContainerProperties().getSubscriptionName())
109+
.isEqualTo("my-endpoint-subscription");
110+
}
111+
112+
@Test
113+
void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {
114+
var factoryProps = new ReactivePulsarContainerProperties<String>();
115+
var containerFactory = new DefaultReactivePulsarListenerContainerFactory<String>(
116+
mock(ReactivePulsarConsumerFactory.class), factoryProps);
117+
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
118+
when(endpoint.getConcurrency()).thenReturn(1);
119+
120+
var container1 = containerFactory.createListenerContainer(endpoint);
121+
assertThat(container1.getContainerProperties().getSubscriptionName())
122+
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
123+
var container2 = containerFactory.createListenerContainer(endpoint);
124+
assertThat(container2.getContainerProperties().getSubscriptionName())
125+
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
126+
assertThat(container1.getContainerProperties().getSubscriptionName())
127+
.isNotEqualTo(container2.getContainerProperties().getSubscriptionName());
128+
}
129+
130+
}
131+
81132
}

spring-pulsar-reactive/src/test/java/org/springframework/pulsar/reactive/listener/ReactivePulsarListenerTests.java

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.pulsar.reactive.client.api.MessageResult;
4545
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
4646
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
47+
import org.assertj.core.api.InstanceOfAssertFactories;
4748
import org.junit.jupiter.api.Nested;
4849
import org.junit.jupiter.api.Test;
4950

@@ -72,7 +73,7 @@
7273
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersCustomObjectMapperTest.PulsarHeadersCustomObjectMapperTestConfig;
7374
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.PulsarHeadersTest.PulsarListenerWithHeadersConfig;
7475
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.StreamingListenerTestCases.StreamingListenerTestCasesConfig;
75-
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionTypeTests.SubscriptionTypeTestsConfig;
76+
import org.springframework.pulsar.reactive.listener.ReactivePulsarListenerTests.SubscriptionNameTests.SubscriptionNameTestsConfig;
7677
import org.springframework.pulsar.reactive.support.MessageUtils;
7778
import org.springframework.pulsar.support.PulsarHeaders;
7879
import org.springframework.pulsar.support.header.JsonPulsarHeaderMapper;
@@ -815,80 +816,79 @@ Mono<Void> listen2(String message) {
815816
}
816817

817818
@Nested
818-
@ContextConfiguration(classes = SubscriptionTypeTestsConfig.class)
819-
class SubscriptionTypeTests {
819+
@ContextConfiguration(classes = SubscriptionNameTestsConfig.class)
820+
class SubscriptionNameTests {
820821

821-
static final CountDownLatch latchTypeNotSet = new CountDownLatch(1);
822+
static final CountDownLatch latchNameNotSet = new CountDownLatch(1);
822823

823-
static final CountDownLatch latchTypeSetOnAnnotation = new CountDownLatch(1);
824+
static final CountDownLatch latchNameSetOnAnnotation = new CountDownLatch(1);
824825

825-
static final CountDownLatch latchTypeSetOnCustomizer = new CountDownLatch(1);
826+
static final CountDownLatch latchNameSetOnCustomizer = new CountDownLatch(1);
826827

827828
@Test
828-
void defaultTypeFromContainerFactoryUsedWhenTypeNotSetAnywhere(
829+
void defaultNameFromContainerFactoryUsedWhenNameNotSetAnywhere(
829830
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
830-
var topic = "rpl-latchTypeNotSet-topic";
831-
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
832-
.isEqualTo(SubscriptionType.Exclusive);
831+
var topic = "rpl-latchNameNotSet-topic";
832+
assertThat(consumerFactory.getSpec(topic))
833+
.extracting(ReactiveMessageConsumerSpec::getSubscriptionName, InstanceOfAssertFactories.STRING)
834+
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
833835
pulsarTemplate.send(topic, "hello-" + topic);
834-
assertThat(latchTypeNotSet.await(5, TimeUnit.SECONDS)).isTrue();
836+
assertThat(latchNameNotSet.await(5, TimeUnit.SECONDS)).isTrue();
835837
}
836838

837839
@Test
838-
void typeSetOnAnnotationOverridesDefaultTypeFromContainerFactory(
840+
void nameSetOnAnnotationOverridesDefaultNameFromContainerFactory(
839841
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
840-
var topic = "rpl-typeSetOnAnnotation-topic";
841-
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
842-
.isEqualTo(SubscriptionType.Key_Shared);
842+
var topic = "rpl-nameSetOnAnnotation-topic";
843+
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionName)
844+
.isEqualTo("from-annotation");
843845
pulsarTemplate.send(topic, "hello-" + topic);
844-
assertThat(latchTypeSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue();
846+
assertThat(latchNameSetOnAnnotation.await(5, TimeUnit.SECONDS)).isTrue();
845847
}
846848

847849
@Test
848-
void typeSetOnCustomizerOverridesTypeSetOnAnnotation(
850+
void nameSetOnCustomizerOverridesNameSetOnAnnotation(
849851
@Autowired ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory) throws Exception {
850-
var topic = "rpl-typeSetOnCustomizer-topic";
851-
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionType)
852-
.isEqualTo(SubscriptionType.Failover);
852+
var topic = "rpl-nameSetOnCustomizer-topic";
853+
assertThat(consumerFactory.getSpec(topic)).extracting(ReactiveMessageConsumerSpec::getSubscriptionName)
854+
.isEqualTo("from-customizer");
853855
pulsarTemplate.send(topic, "hello-" + topic);
854-
assertThat(latchTypeSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue();
856+
assertThat(latchNameSetOnCustomizer.await(5, TimeUnit.SECONDS)).isTrue();
855857
}
856858

857859
@Configuration(proxyBeanMethods = false)
858-
static class SubscriptionTypeTestsConfig {
860+
static class SubscriptionNameTestsConfig {
859861

860862
@Bean
861-
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubTypeCustomizer() {
862-
return (b) -> b.subscriptionType(SubscriptionType.Shared);
863+
ReactiveMessageConsumerBuilderCustomizer<String> consumerFactoryDefaultSubNameCustomizer() {
864+
return (b) -> b.subscriptionName("from-consumer-factory");
863865
}
864866

865-
@ReactivePulsarListener(topics = "rpl-latchTypeNotSet-topic", subscriptionName = "rpl-latchTypeNotSet-sub",
867+
@ReactivePulsarListener(topics = "rpl-latchNameNotSet-topic",
866868
consumerCustomizer = "subscriptionInitialPositionEarliest")
867-
Mono<Void> listenWithoutTypeSetAnywhere(String ignored) {
868-
latchTypeNotSet.countDown();
869+
Mono<Void> listenWithoutNameSetAnywhere(String ignored) {
870+
latchNameNotSet.countDown();
869871
return Mono.empty();
870872
}
871873

872-
@ReactivePulsarListener(topics = "rpl-typeSetOnAnnotation-topic",
873-
subscriptionName = "rpl-typeSetOnAnnotation-sub", subscriptionType = SubscriptionType.Key_Shared,
874+
@ReactivePulsarListener(topics = "rpl-nameSetOnAnnotation-topic", subscriptionName = "from-annotation",
874875
consumerCustomizer = "subscriptionInitialPositionEarliest")
875-
Mono<Void> listenWithTypeSetOnAnnotation(String ignored) {
876-
latchTypeSetOnAnnotation.countDown();
876+
Mono<Void> listenWithNameSetOnAnnotation(String ignored) {
877+
latchNameSetOnAnnotation.countDown();
877878
return Mono.empty();
878879
}
879880

880-
@ReactivePulsarListener(topics = "rpl-typeSetOnCustomizer-topic",
881-
subscriptionName = "rpl-typeSetOnCustomizer-sub", subscriptionType = SubscriptionType.Key_Shared,
881+
@ReactivePulsarListener(topics = "rpl-nameSetOnCustomizer-topic", subscriptionName = "from-annotation",
882882
consumerCustomizer = "myCustomizer")
883-
Mono<Void> listenWithTypeSetOnCustomizer(String ignored) {
884-
latchTypeSetOnCustomizer.countDown();
883+
Mono<Void> listenWithNameSetOnCustomizer(String ignored) {
884+
latchNameSetOnCustomizer.countDown();
885885
return Mono.empty();
886886
}
887887

888888
@Bean
889889
public ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myCustomizer() {
890890
return cb -> cb.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
891-
.subscriptionType(SubscriptionType.Failover);
891+
.subscriptionName("from-customizer");
892892
}
893893

894894
}

0 commit comments

Comments
 (0)