Skip to content

Commit bbdba00

Browse files
committed
[Samples] Make sample-reactive testable
1 parent 3f9fad4 commit bbdba00

File tree

8 files changed

+242
-147
lines changed

8 files changed

+242
-147
lines changed

spring-pulsar-sample-apps/sample-reactive/build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ ext['pulsar-reactive.version'] = "${pulsarReactiveVersion}"
2222

2323
dependencies {
2424
implementation "org.springframework.boot:spring-boot-starter-pulsar-reactive"
25+
developmentOnly 'org.springframework.boot:spring-boot-docker-compose'
26+
testImplementation project(':spring-pulsar-test')
27+
testRuntimeOnly 'ch.qos.logback:logback-classic'
28+
testImplementation "org.springframework.boot:spring-boot-starter-test"
29+
testImplementation "org.springframework.boot:spring-boot-testcontainers"
30+
testImplementation 'org.testcontainers:junit-jupiter'
31+
testImplementation 'org.testcontainers:pulsar'
2532
}
2633

2734
test {
@@ -36,4 +43,6 @@ bootRun {
3643
"--add-opens", "java.base/java.util=ALL-UNNAMED",
3744
"--add-opens", "java.base/sun.net=ALL-UNNAMED"
3845
]
46+
// when run from command line, path must be set relative to module dir
47+
systemProperty 'spring.docker.compose.file', 'compose.yaml'
3948
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
services:
2+
pulsar:
3+
image: 'apachepulsar/pulsar:3.1.2'
4+
ports:
5+
- '6650'
6+
- '8080'
7+
command: 'bin/pulsar standalone'
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2022-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example;
18+
19+
import org.apache.pulsar.client.api.Message;
20+
import org.apache.pulsar.client.api.Schema;
21+
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
22+
import org.apache.pulsar.common.schema.SchemaType;
23+
import org.apache.pulsar.reactive.client.api.MessageResult;
24+
import org.apache.pulsar.reactive.client.api.MessageSpec;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import org.springframework.boot.ApplicationRunner;
29+
import org.springframework.boot.SpringApplication;
30+
import org.springframework.boot.autoconfigure.SpringBootApplication;
31+
import org.springframework.context.annotation.Bean;
32+
import org.springframework.context.annotation.Configuration;
33+
import org.springframework.pulsar.annotation.PulsarListener;
34+
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
35+
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListenerMessageConsumerBuilderCustomizer;
36+
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
37+
38+
import reactor.core.publisher.Flux;
39+
import reactor.core.publisher.Mono;
40+
41+
@SpringBootApplication
42+
public class ReactiveSpringPulsarBootApp {
43+
44+
private static final Logger LOG = LoggerFactory.getLogger(ReactiveSpringPulsarBootApp.class);
45+
46+
public static void main(String[] args) {
47+
SpringApplication.run(ReactiveSpringPulsarBootApp.class, args);
48+
}
49+
50+
@Configuration(proxyBeanMethods = false)
51+
static class ReactiveTemplateWithSimpleReactiveListener {
52+
53+
private static final String TOPIC = "sample-reactive-topic1";
54+
55+
@Bean
56+
ApplicationRunner sendPrimitiveMessagesToPulsarTopic(ReactivePulsarTemplate<String> template) {
57+
return (args) -> Flux.range(0, 10)
58+
.map((i) -> MessageSpec.of("ReactiveTemplateWithSimpleReactiveListener:" + i))
59+
.as(messages -> template.send(TOPIC, messages))
60+
.doOnNext((msr) -> LOG.info("++++++PRODUCE {}------", msr.getMessageSpec().getValue()))
61+
.subscribe();
62+
}
63+
64+
@ReactivePulsarListener(topics = TOPIC, consumerCustomizer = "subscriptionInitialPositionEarliest")
65+
public Mono<Void> listenSimple(String msg) {
66+
LOG.info("++++++CONSUME {}------", msg);
67+
return Mono.empty();
68+
}
69+
70+
}
71+
72+
@Configuration(proxyBeanMethods = false)
73+
static class ReactiveTemplateWithStreamingReactiveListener {
74+
75+
private static final String TOPIC = "sample-reactive-topic2";
76+
77+
@Bean
78+
ApplicationRunner sendComplexMessagesToPulsarTopic(ReactivePulsarTemplate<Foo> template) {
79+
var schema = Schema.JSON(Foo.class);
80+
return (args) -> Flux.range(0, 10)
81+
.map((i) -> MessageSpec.of(new Foo("Foo-" + i, "Bar-" + i)))
82+
.as(messages -> template.send(TOPIC, messages, schema))
83+
.doOnNext((msr) -> LOG.info("++++++PRODUCE {}------", msr.getMessageSpec().getValue()))
84+
.subscribe();
85+
}
86+
87+
@ReactivePulsarListener(topics = TOPIC, stream = true, schemaType = SchemaType.JSON,
88+
consumerCustomizer = "subscriptionInitialPositionEarliest")
89+
public Flux<MessageResult<Void>> listenStreaming(Flux<Message<Foo>> messages) {
90+
return messages
91+
.doOnNext((msg) -> LOG.info("++++++CONSUME {}------", msg.getValue()))
92+
.map(MessageResult::acknowledge);
93+
}
94+
95+
}
96+
97+
@Configuration(proxyBeanMethods = false)
98+
static class ConsumerCustomizerConfig {
99+
100+
@Bean
101+
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> subscriptionInitialPositionEarliest() {
102+
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
103+
}
104+
105+
}
106+
107+
@Configuration(proxyBeanMethods = false)
108+
static class ReactiveTemplateWithImperativeListener {
109+
110+
private static final String TOPIC = "sample-reactive-topic3";
111+
112+
@Bean
113+
ApplicationRunner sendMessagesToPulsarTopic(ReactivePulsarTemplate<String> template) {
114+
return (args) -> Flux.range(0, 10)
115+
.map((i) -> MessageSpec.of("ReactiveTemplateWithImperativeListener:" + i))
116+
.as(messages -> template.send(TOPIC, messages))
117+
.doOnNext((msr) -> LOG.info("++++++PRODUCE {}------", msr.getMessageSpec().getValue()))
118+
.subscribe();
119+
}
120+
121+
@PulsarListener(topics = TOPIC)
122+
void listenSimple(String msg) {
123+
LOG.info("++++++CONSUME {}------", msg);
124+
}
125+
126+
}
127+
128+
record Foo(String foo, String bar) {
129+
}
130+
131+
}

spring-pulsar-sample-apps/sample-reactive/src/main/java/org.springframework.pulsar.example/package-info.java renamed to spring-pulsar-sample-apps/sample-reactive/src/main/java/com/example/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
@NonNullApi
55
@NonNullFields
6-
package org.springframework.pulsar.example;
6+
package com.example;
77

88
import org.springframework.lang.NonNullApi;
99
import org.springframework.lang.NonNullFields;

spring-pulsar-sample-apps/sample-reactive/src/main/java/org.springframework.pulsar.example/ReactiveSpringPulsarBootApp.java

Lines changed: 0 additions & 143 deletions
This file was deleted.
Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1-
logging:
2-
level:
3-
org.apache.pulsar: warn
1+
spring:
2+
docker:
3+
compose:
4+
# when run from Intellij via "Run" button, path must be set from project root
5+
file: spring-pulsar-sample-apps/sample-reactive/compose.yaml
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.function.Function;
23+
import java.util.stream.IntStream;
24+
25+
import com.example.ReactiveSpringPulsarBootApp.Foo;
26+
import org.awaitility.Awaitility;
27+
import org.junit.jupiter.api.Test;
28+
import org.junit.jupiter.api.extension.ExtendWith;
29+
30+
import org.springframework.boot.test.context.SpringBootTest;
31+
import org.springframework.boot.test.system.CapturedOutput;
32+
import org.springframework.boot.test.system.OutputCaptureExtension;
33+
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
34+
import org.springframework.test.context.DynamicPropertyRegistry;
35+
import org.springframework.test.context.DynamicPropertySource;
36+
37+
import static org.assertj.core.api.Assertions.assertThat;
38+
39+
@SpringBootTest
40+
@ExtendWith(OutputCaptureExtension.class)
41+
class ReactiveSpringPulsarBootAppTests implements PulsarTestContainerSupport {
42+
43+
@DynamicPropertySource
44+
static void pulsarProperties(DynamicPropertyRegistry registry) {
45+
registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerUrl);
46+
registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl);
47+
}
48+
49+
@Test
50+
void reactiveTemplateWithSimpleReactiveListener(CapturedOutput output) {
51+
verifyProduceConsume(output,10, (i) -> "ReactiveTemplateWithSimpleReactiveListener:" + i);
52+
}
53+
54+
@Test
55+
void reactiveTemplateWithStreamingReactiveListener(CapturedOutput output) {
56+
verifyProduceConsume(output,10, (i) -> new Foo("Foo-" + i, "Bar-" + i));
57+
}
58+
59+
@Test
60+
void reactiveTemplateWithImperativeListener(CapturedOutput output) {
61+
verifyProduceConsume(output,10, (i) -> "ReactiveTemplateWithImperativeListener:" + i);
62+
}
63+
64+
private void verifyProduceConsume(CapturedOutput output, int numExpectedMessages,
65+
Function<Integer, Object> expectedMessageFactory) {
66+
List < String > expectedOutput = new ArrayList<>();
67+
IntStream.range(0, numExpectedMessages).forEachOrdered((i) -> {
68+
var msg = expectedMessageFactory.apply(i);
69+
expectedOutput.add("++++++PRODUCE %s------".formatted(msg));
70+
expectedOutput.add("++++++CONSUME %s------".formatted(msg));
71+
});
72+
Awaitility.waitAtMost(Duration.ofSeconds(15))
73+
.untilAsserted(() -> assertThat(output).contains(expectedOutput));
74+
}
75+
}

0 commit comments

Comments
 (0)