Skip to content

Commit a0a05cd

Browse files
committed
[Samples] Make sample-pulsar-binder testable
1 parent 3667169 commit a0a05cd

File tree

8 files changed

+190
-134
lines changed

8 files changed

+190
-134
lines changed

spring-pulsar-sample-apps/sample-pulsar-binder/build.gradle

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,15 @@ ext['pulsar.version'] = "${pulsarVersion}"
2222
dependencies {
2323
implementation "org.springframework.boot:spring-boot-starter-pulsar"
2424
implementation "org.springframework.cloud:spring-cloud-stream-binder-pulsar:${springCloudStreamVersion}"
25+
developmentOnly 'org.springframework.boot:spring-boot-docker-compose'
26+
27+
testImplementation project(':spring-pulsar-test')
28+
testRuntimeOnly 'ch.qos.logback:logback-classic'
29+
testImplementation "org.springframework.boot:spring-boot-starter-test"
30+
testImplementation "org.springframework.boot:spring-boot-testcontainers"
31+
testImplementation 'org.testcontainers:junit-jupiter'
32+
testImplementation 'org.testcontainers:pulsar'
33+
2534
}
2635

2736
test {
@@ -36,4 +45,6 @@ bootRun {
3645
"--add-opens", "java.base/java.util=ALL-UNNAMED",
3746
"--add-opens", "java.base/sun.net=ALL-UNNAMED"
3847
]
48+
// when run from command line, path must be set relative to module dir
49+
systemProperty 'spring.docker.compose.file', 'compose.yaml'
3950
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
services:
2+
pulsar:
3+
image: 'apachepulsar/pulsar:3.1.2'
4+
ports:
5+
- '6650'
6+
- '8080'
7+
command: 'bin/pulsar standalone'
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2022-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example;
18+
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
import java.util.function.Consumer;
21+
import java.util.function.Function;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import org.springframework.boot.ApplicationRunner;
27+
import org.springframework.boot.SpringApplication;
28+
import org.springframework.boot.autoconfigure.SpringBootApplication;
29+
import org.springframework.cloud.stream.function.StreamBridge;
30+
import org.springframework.context.annotation.Bean;
31+
32+
@SpringBootApplication
33+
public class SpringPulsarBinderSampleApp {
34+
35+
private static final Logger LOG = LoggerFactory.getLogger(SpringPulsarBinderSampleApp.class);
36+
37+
public static void main(String[] args) {
38+
SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
39+
}
40+
41+
private AtomicInteger counter = new AtomicInteger();
42+
43+
@Bean
44+
ApplicationRunner fooSupplier(StreamBridge streamBridge) {
45+
return (args) -> {
46+
for (int i = 0; i < 10; i++) {
47+
var foo = new Foo("fooSupplier:" + i);
48+
streamBridge.send("fooSupplier-out-0", foo);
49+
LOG.info("++++++SOURCE {}------", foo);
50+
}
51+
};
52+
}
53+
54+
@Bean
55+
public Function<Foo, Bar> fooProcessor() {
56+
return (foo) -> {
57+
var bar = new Bar(foo);
58+
LOG.info("++++++PROCESSOR {} --> {}------", foo, bar);
59+
return bar;
60+
};
61+
}
62+
63+
@Bean
64+
public Consumer<Bar> barLogger() {
65+
return (bar) -> LOG.info("++++++SINK {}------", bar);
66+
}
67+
68+
record Foo(String value) {
69+
}
70+
71+
record Bar(Foo value) {
72+
}
73+
74+
}

spring-pulsar-sample-apps/sample-pulsar-binder/src/main/java/org/springframework/pulsar/sample/binder/package-info.java renamed to spring-pulsar-sample-apps/sample-pulsar-binder/src/main/java/com/example/package-info.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
*/
44
@NonNullApi
55
@NonNullFields
6-
package org.springframework.pulsar.sample.binder;
6+
package com.example;
77

88
import org.springframework.lang.NonNullApi;
99
import org.springframework.lang.NonNullFields;

spring-pulsar-sample-apps/sample-pulsar-binder/src/main/java/org/springframework/pulsar/sample/binder/SpringPulsarBinderSampleApp.java

Lines changed: 0 additions & 104 deletions
This file was deleted.
Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,44 @@
11
spring:
22
cloud:
33
function:
4-
definition: timeSupplier;timeProcessor;timeLogger
4+
definition: fooProcessor;barLogger
55
stream:
6+
output-bindings: fooSupplier-out-0
67
bindings:
7-
timeSupplier-out-0:
8+
fooSupplier-out-0:
89
producer:
910
use-native-encoding: true
10-
timeProcessor-in-0:
11-
destination: timeSupplier-out-0
11+
fooProcessor-in-0:
12+
destination: fooSupplier-out-0
1213
consumer:
1314
use-native-decoding: true
14-
timeProcessor-out-0:
15-
destination: timeProcessor-out-0
15+
fooProcessor-out-0:
16+
destination: fooProcessor-out-0
1617
producer:
1718
use-native-encoding: true
18-
timeLogger-in-0:
19-
destination: timeProcessor-out-0
20-
consumer:
21-
use-native-decoding: true
22-
timeLoggerToDlt-in-0:
23-
destination: timeProcessor-out-0
19+
barLogger-in-0:
20+
destination: fooProcessor-out-0
2421
consumer:
2522
use-native-decoding: true
2623
pulsar:
2724
bindings:
28-
timeSupplier-out-0:
25+
fooSupplier-out-0:
2926
producer:
3027
schema-type: JSON
31-
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
32-
timeProcessor-in-0:
28+
message-type: com.example.SpringPulsarBinderSampleApp.Foo
29+
fooProcessor-in-0:
3330
consumer:
3431
schema-type: JSON
35-
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
36-
timeProcessor-out-0:
32+
message-type: com.example.SpringPulsarBinderSampleApp.Foo
33+
fooProcessor-out-0:
3734
producer:
3835
schema-type: JSON
39-
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
40-
timeLogger-in-0:
41-
consumer:
42-
schema-type: JSON
43-
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
44-
timeLoggerToDlt-in-0:
36+
message-type: com.example.SpringPulsarBinderSampleApp.Bar
37+
barLogger-in-0:
4538
consumer:
46-
subscription-type: Shared
47-
negative-ack-redelivery-delay: 1s
48-
dead-letter-policy:
49-
dead-letter-topic: notification-dlq
50-
max-redeliver-count: 5
5139
schema-type: JSON
52-
message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
40+
message-type: com.example.SpringPulsarBinderSampleApp.Bar
41+
docker:
42+
compose:
43+
# when run from Intellij via "Run" button, path must be set from project root
44+
file: spring-pulsar-sample-apps/sample-pulsar-binder/compose.yaml
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2012-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example;
18+
19+
import java.time.Duration;
20+
import java.util.ArrayList;
21+
import java.util.stream.IntStream;
22+
23+
import com.example.SpringPulsarBinderSampleApp.Bar;
24+
import com.example.SpringPulsarBinderSampleApp.Foo;
25+
import org.awaitility.Awaitility;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.extension.ExtendWith;
28+
29+
import org.springframework.boot.test.context.SpringBootTest;
30+
import org.springframework.boot.test.system.CapturedOutput;
31+
import org.springframework.boot.test.system.OutputCaptureExtension;
32+
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
33+
import org.springframework.test.context.DynamicPropertyRegistry;
34+
import org.springframework.test.context.DynamicPropertySource;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
@SpringBootTest
39+
@ExtendWith(OutputCaptureExtension.class)
40+
class SpringPulsarBinderSampleAppTests implements PulsarTestContainerSupport {
41+
42+
@DynamicPropertySource
43+
static void pulsarProperties(DynamicPropertyRegistry registry) {
44+
registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerUrl);
45+
registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceUrl);
46+
}
47+
48+
@Test
49+
void produceConsumeWithPrimitiveMessageType(CapturedOutput output) {
50+
var expectedOutput = new ArrayList<String>();
51+
IntStream.range(0, 10).forEachOrdered((i) -> {
52+
var foo = new Foo("fooSupplier:" + i);
53+
var bar = new Bar(foo);
54+
expectedOutput.add("++++++SOURCE %s------".formatted(foo));
55+
expectedOutput.add("++++++PROCESSOR %s --> %s------".formatted(foo, bar));
56+
expectedOutput.add("++++++SINK %s------".formatted(bar));
57+
});
58+
Awaitility.waitAtMost(Duration.ofSeconds(15))
59+
.untilAsserted(() -> assertThat(output).contains(expectedOutput));
60+
}
61+
62+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<configuration>
2+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
3+
<encoder>
4+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern>
5+
</encoder>
6+
</appender>
7+
<root level="WARN">
8+
<appender-ref ref="STDOUT"/>
9+
</root>
10+
<logger name="com.example" level="INFO"/>
11+
<logger name="com.github.dockerjava" level="ERROR"/>
12+
<logger name="org.apache.pulsar.common.util.netty" level="ERROR" />
13+
<logger name="org.testcontainers" level="ERROR"/>
14+
</configuration>

0 commit comments

Comments
 (0)