Skip to content

Commit 215c563

Browse files
committed
Add IT for config props driven listeners
This commit adds ITs to exercise the ability to configure the `@PulsarListener` / `@ReactivePulsarListener` subscription name and type using Spring Boot config props.
1 parent 136d465 commit 215c563

File tree

2 files changed

+148
-16
lines changed

2 files changed

+148
-16
lines changed

integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/PulsarListenerIntegrationTests.java

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,12 @@
2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.TimeUnit;
2424

25+
import org.apache.pulsar.client.api.Consumer;
2526
import org.apache.pulsar.client.api.Schema;
27+
import org.apache.pulsar.client.api.SubscriptionType;
28+
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
2629
import org.apache.pulsar.common.schema.SchemaType;
30+
import org.assertj.core.api.InstanceOfAssertFactories;
2731
import org.junit.jupiter.api.Test;
2832

2933
import org.springframework.boot.SpringApplication;
@@ -58,16 +62,17 @@ class PulsarListenerIntegrationTests implements PulsarTestContainerSupport {
5862

5963
private static final CountDownLatch LATCH_5 = new CountDownLatch(10);
6064

65+
private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1);
66+
6167
@Test
6268
void basicPulsarListener() throws Exception {
6369
SpringApplication app = new SpringApplication(BasicListenerConfig.class);
6470
app.setWebApplicationType(WebApplicationType.NONE);
65-
6671
try (ConfigurableApplicationContext context = app
6772
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
6873
@SuppressWarnings("unchecked")
6974
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
70-
pulsarTemplate.send("plt-basic-topic", "John Doe");
75+
pulsarTemplate.send("plit-basic-topic", "John Doe");
7176
assertThat(LATCH_1.await(20, TimeUnit.SECONDS)).isTrue();
7277
}
7378
}
@@ -76,12 +81,11 @@ void basicPulsarListener() throws Exception {
7681
void basicPulsarListenerCustomType() throws Exception {
7782
SpringApplication app = new SpringApplication(BasicListenerCustomTypeConfig.class);
7883
app.setWebApplicationType(WebApplicationType.NONE);
79-
8084
try (ConfigurableApplicationContext context = app
8185
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
8286
@SuppressWarnings("unchecked")
8387
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
84-
pulsarTemplate.send("plt-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
88+
pulsarTemplate.send("plit-foo-topic1", new Foo("John Doe"), Schema.JSON(Foo.class));
8589
assertThat(LATCH_2.await(20, TimeUnit.SECONDS)).isTrue();
8690
}
8791
}
@@ -90,12 +94,11 @@ void basicPulsarListenerCustomType() throws Exception {
9094
void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
9195
SpringApplication app = new SpringApplication(BasicListenerCustomTypeWithTypeMappingConfig.class);
9296
app.setWebApplicationType(WebApplicationType.NONE);
93-
9497
try (ConfigurableApplicationContext context = app
9598
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
9699
@SuppressWarnings("unchecked")
97100
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
98-
pulsarTemplate.send("plt-foo-topic2", new Foo("John Doe"));
101+
pulsarTemplate.send("plit-foo-topic2", new Foo("John Doe"));
99102
assertThat(LATCH_3.await(20, TimeUnit.SECONDS)).isTrue();
100103
}
101104
}
@@ -104,12 +107,11 @@ void basicPulsarListenerCustomTypeWithTypeMapping() throws Exception {
104107
void basicPulsarListenerWithTopicMapping() throws Exception {
105108
SpringApplication app = new SpringApplication(BasicListenerWithTopicMappingConfig.class);
106109
app.setWebApplicationType(WebApplicationType.NONE);
107-
108110
try (ConfigurableApplicationContext context = app
109111
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
110112
@SuppressWarnings("unchecked")
111113
PulsarTemplate<Foo> pulsarTemplate = context.getBean(PulsarTemplate.class);
112-
pulsarTemplate.send("plt-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
114+
pulsarTemplate.send("plit-topicMapping-topic", new Foo("Crazy8z"), Schema.JSON(Foo.class));
113115
assertThat(LATCH_4.await(20, TimeUnit.SECONDS)).isTrue();
114116
}
115117
}
@@ -118,23 +120,38 @@ void basicPulsarListenerWithTopicMapping() throws Exception {
118120
void batchPulsarListener() throws Exception {
119121
SpringApplication app = new SpringApplication(BatchListenerConfig.class);
120122
app.setWebApplicationType(WebApplicationType.NONE);
121-
122123
try (ConfigurableApplicationContext context = app
123124
.run("--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl())) {
124125
@SuppressWarnings("unchecked")
125126
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
126127
for (int i = 0; i < 10; i++) {
127-
pulsarTemplate.send("plt-batch-topic", "John Doe");
128+
pulsarTemplate.send("plit-batch-topic", "John Doe");
128129
}
129130
assertThat(LATCH_5.await(10, TimeUnit.SECONDS)).isTrue();
130131
}
131132
}
132133

134+
@Test
135+
void configPropsDrivenListener() throws Exception {
136+
SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class);
137+
app.setWebApplicationType(WebApplicationType.NONE);
138+
try (ConfigurableApplicationContext context = app.run(
139+
"--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), "--my.env=dev",
140+
"--spring.pulsar.consumer.topics=plit-config-props-topic-${my.env}",
141+
"--spring.pulsar.consumer.subscription.type=Shared",
142+
"--spring.pulsar.consumer.subscription.name=plit-config-props-subs-${my.env}")) {
143+
@SuppressWarnings("unchecked")
144+
PulsarTemplate<String> pulsarTemplate = context.getBean(PulsarTemplate.class);
145+
pulsarTemplate.send("plit-config-props-topic-dev", "hello config props driven");
146+
assertThat(LATCH_CONFIG_PROPS.await(20, TimeUnit.SECONDS)).isTrue();
147+
}
148+
}
149+
133150
@EnableAutoConfiguration
134151
@SpringBootConfiguration
135152
static class BasicListenerConfig {
136153

137-
@PulsarListener(subscriptionName = "plt-basic-sub", topics = "plt-basic-topic")
154+
@PulsarListener(subscriptionName = "plit-basic-sub", topics = "plit-basic-topic")
138155
public void listen(String ignored) {
139156
LATCH_1.countDown();
140157
}
@@ -145,7 +162,7 @@ public void listen(String ignored) {
145162
@SpringBootConfiguration
146163
static class BasicListenerCustomTypeConfig {
147164

148-
@PulsarListener(subscriptionName = "plt-foo-sub1", topics = "plt-foo-topic1", schemaType = SchemaType.JSON)
165+
@PulsarListener(subscriptionName = "plit-foo-sub1", topics = "plit-foo-topic1", schemaType = SchemaType.JSON)
149166
public void listen(Foo ignored) {
150167
LATCH_2.countDown();
151168
}
@@ -163,7 +180,7 @@ SchemaResolver customSchemaResolver() {
163180
return resolver;
164181
}
165182

166-
@PulsarListener(subscriptionName = "plt-foo-sub2", topics = "plt-foo-topic2")
183+
@PulsarListener(subscriptionName = "plit-foo-sub2", topics = "plit-foo-topic2")
167184
public void listen(Foo ignored) {
168185
LATCH_3.countDown();
169186
}
@@ -177,11 +194,11 @@ static class BasicListenerWithTopicMappingConfig {
177194
@Bean
178195
TopicResolver customTopicResolver() {
179196
DefaultTopicResolver resolver = new DefaultTopicResolver();
180-
resolver.addCustomTopicMapping(Foo.class, "plt-topicMapping-topic");
197+
resolver.addCustomTopicMapping(Foo.class, "plit-topicMapping-topic");
181198
return resolver;
182199
}
183200

184-
@PulsarListener(subscriptionName = "plt-topicMapping-sub", schemaType = SchemaType.JSON)
201+
@PulsarListener(subscriptionName = "plit-topicMapping-sub", schemaType = SchemaType.JSON)
185202
public void listen(Foo ignored) {
186203
LATCH_4.countDown();
187204
}
@@ -192,13 +209,29 @@ public void listen(Foo ignored) {
192209
@SpringBootConfiguration
193210
static class BatchListenerConfig {
194211

195-
@PulsarListener(subscriptionName = "plt-batch-sub", topics = "plt-batch-topic", batch = true)
212+
@PulsarListener(subscriptionName = "plit-batch-sub", topics = "plit-batch-topic", batch = true)
196213
public void listen(List<String> foo) {
197214
foo.forEach(t -> LATCH_5.countDown());
198215
}
199216

200217
}
201218

219+
@EnableAutoConfiguration
220+
@SpringBootConfiguration
221+
static class ConfigPropsDrivenListenerConfig {
222+
223+
@PulsarListener
224+
public void listen(String ignored, Consumer<String> consumer) {
225+
assertThat(consumer).extracting("conf", InstanceOfAssertFactories.type(ConsumerConfigurationData.class))
226+
.satisfies((conf) -> {
227+
assertThat(conf.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
228+
assertThat(conf.getSubscriptionName()).isEqualTo("plit-config-props-subs-dev");
229+
});
230+
LATCH_CONFIG_PROPS.countDown();
231+
}
232+
233+
}
234+
202235
record Foo(String value) {
203236
}
204237

integration-tests/src/intTest/java/org/springframework/pulsar/inttest/listener/ReactivePulsarListenerIntegrationTests.java

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,24 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
2124
import java.util.concurrent.CountDownLatch;
2225
import java.util.concurrent.TimeUnit;
2326

2427
import org.apache.pulsar.client.api.Message;
2528
import org.apache.pulsar.client.api.Schema;
2629
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
30+
import org.apache.pulsar.client.api.SubscriptionType;
2731
import org.apache.pulsar.common.schema.SchemaType;
2832
import org.apache.pulsar.reactive.client.api.MessageResult;
33+
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumer;
34+
import org.apache.pulsar.reactive.client.api.ReactiveMessageConsumerSpec;
35+
import org.apache.pulsar.reactive.client.api.ReactivePulsarClient;
2936
import org.junit.jupiter.api.Test;
3037

38+
import org.springframework.beans.factory.ObjectProvider;
3139
import org.springframework.boot.SpringApplication;
3240
import org.springframework.boot.SpringBootConfiguration;
3341
import org.springframework.boot.WebApplicationType;
@@ -43,8 +51,13 @@
4351
import org.springframework.pulsar.core.TopicResolver;
4452
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
4553
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
54+
import org.springframework.pulsar.reactive.core.DefaultReactivePulsarConsumerFactory;
55+
import org.springframework.pulsar.reactive.core.ReactiveMessageConsumerBuilderCustomizer;
56+
import org.springframework.pulsar.reactive.core.ReactivePulsarConsumerFactory;
4657
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
4758
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
59+
import org.springframework.test.util.ReflectionTestUtils;
60+
import org.springframework.util.ObjectUtils;
4861

4962
import reactor.core.publisher.Flux;
5063
import reactor.core.publisher.Mono;
@@ -67,6 +80,8 @@ class ReactivePulsarListenerIntegrationTests implements PulsarTestContainerSuppo
6780

6881
private static final CountDownLatch LATCH5 = new CountDownLatch(10);
6982

83+
private static final CountDownLatch LATCH_CONFIG_PROPS = new CountDownLatch(1);
84+
7085
@Test
7186
void basicListener() throws Exception {
7287
SpringApplication app = new SpringApplication(BasicListenerConfig.class);
@@ -134,6 +149,30 @@ void fluxListener() throws Exception {
134149
}
135150
}
136151

152+
@Test
153+
void configPropsDrivenListener() throws Exception {
154+
SpringApplication app = new SpringApplication(ConfigPropsDrivenListenerConfig.class);
155+
app.setWebApplicationType(WebApplicationType.NONE);
156+
try (ConfigurableApplicationContext context = app.run(
157+
"--spring.pulsar.client.serviceUrl=" + PulsarTestContainerSupport.getPulsarBrokerUrl(), "--my.env=dev",
158+
"--spring.pulsar.consumer.topics=plit-config-props-topic-${my.env}",
159+
"--spring.pulsar.consumer.subscription.type=Shared",
160+
"--spring.pulsar.consumer.subscription.name=plit-config-props-subs-${my.env}")) {
161+
var topic = "plit-config-props-topic-dev";
162+
@SuppressWarnings("unchecked")
163+
ReactivePulsarTemplate<String> pulsarTemplate = context.getBean(ReactivePulsarTemplate.class);
164+
pulsarTemplate.send(topic, "hello config props driven").block();
165+
assertThat(LATCH_CONFIG_PROPS.await(20, TimeUnit.SECONDS)).isTrue();
166+
@SuppressWarnings("unchecked")
167+
ConsumerTrackingReactivePulsarConsumerFactory<String> consumerFactory = context
168+
.getBean(ConsumerTrackingReactivePulsarConsumerFactory.class);
169+
assertThat(consumerFactory.getSpec(topic)).satisfies((consumerSpec) -> {
170+
assertThat(consumerSpec.getSubscriptionName()).isEqualTo("plit-config-props-subs-dev");
171+
assertThat(consumerSpec.getSubscriptionType()).isEqualTo(SubscriptionType.Shared);
172+
});
173+
}
174+
}
175+
137176
@EnableAutoConfiguration
138177
@SpringBootConfiguration
139178
@Import(ConsumerCustomizerConfig.class)
@@ -217,6 +256,28 @@ public Flux<MessageResult<Void>> listen(Flux<Message<String>> messages) {
217256

218257
}
219258

259+
@EnableAutoConfiguration
260+
@SpringBootConfiguration
261+
@Import(ConsumerCustomizerConfig.class)
262+
static class ConfigPropsDrivenListenerConfig {
263+
264+
@SuppressWarnings("unchecked")
265+
@Bean
266+
ConsumerTrackingReactivePulsarConsumerFactory<String> pulsarConsumerFactory(ReactivePulsarClient pulsarClient,
267+
ObjectProvider<ReactiveMessageConsumerBuilderCustomizer<String>> defaultConsumerCustomizersProvider) {
268+
DefaultReactivePulsarConsumerFactory<String> consumerFactory = new DefaultReactivePulsarConsumerFactory<>(
269+
pulsarClient, defaultConsumerCustomizersProvider.orderedStream().toList());
270+
return new ConsumerTrackingReactivePulsarConsumerFactory<>(consumerFactory);
271+
}
272+
273+
@ReactivePulsarListener(consumerCustomizer = "consumerCustomizer")
274+
public Mono<Void> listen(String ignored) {
275+
LATCH_CONFIG_PROPS.countDown();
276+
return Mono.empty();
277+
}
278+
279+
}
280+
220281
@Configuration(proxyBeanMethods = false)
221282
static class ConsumerCustomizerConfig {
222283

@@ -230,4 +291,42 @@ ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> consumerCustomize
230291
record Foo(String value) {
231292
}
232293

294+
static class ConsumerTrackingReactivePulsarConsumerFactory<T> implements ReactivePulsarConsumerFactory<T> {
295+
296+
private Map<String, ReactiveMessageConsumerSpec> topicNameToConsumerSpec = new HashMap<>();
297+
298+
private ReactivePulsarConsumerFactory<T> delegate;
299+
300+
ConsumerTrackingReactivePulsarConsumerFactory(ReactivePulsarConsumerFactory<T> delegate) {
301+
this.delegate = delegate;
302+
}
303+
304+
@Override
305+
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema) {
306+
var consumer = this.delegate.createConsumer(schema);
307+
storeSpec(consumer);
308+
return consumer;
309+
}
310+
311+
@Override
312+
public ReactiveMessageConsumer<T> createConsumer(Schema<T> schema,
313+
List<ReactiveMessageConsumerBuilderCustomizer<T>> reactiveMessageConsumerBuilderCustomizers) {
314+
var consumer = this.delegate.createConsumer(schema, reactiveMessageConsumerBuilderCustomizers);
315+
storeSpec(consumer);
316+
return consumer;
317+
}
318+
319+
private void storeSpec(ReactiveMessageConsumer<T> consumer) {
320+
var consumerSpec = (ReactiveMessageConsumerSpec) ReflectionTestUtils.getField(consumer, "consumerSpec");
321+
var topicNamesKey = !ObjectUtils.isEmpty(consumerSpec.getTopicNames()) ? consumerSpec.getTopicNames().get(0)
322+
: "no-topics-set";
323+
this.topicNameToConsumerSpec.put(topicNamesKey, consumerSpec);
324+
}
325+
326+
ReactiveMessageConsumerSpec getSpec(String topic) {
327+
return this.topicNameToConsumerSpec.get(topic);
328+
}
329+
330+
}
331+
233332
}

0 commit comments

Comments
 (0)