Skip to content

Commit 0273027

Browse files
authored
Add SSL integration tests (#442)
* Also move inttests to subpackages based on functionality * Also add app tests (non-ssl)
1 parent a48eaf2 commit 0273027

27 files changed

+844
-4
lines changed

integration-tests/integration-tests.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ dependencies {
1616
intTestImplementation project(':spring-pulsar-test')
1717
intTestRuntimeOnly 'ch.qos.logback:logback-classic'
1818
intTestRuntimeOnly 'org.junit.platform:junit-platform-launcher'
19+
intTestImplementation 'org.awaitility:awaitility'
1920
intTestImplementation "org.springframework.boot:spring-boot-starter-test:${springBootVersion}"
2021
intTestImplementation "org.springframework.boot:spring-boot-starter-amqp:${springBootVersion}"
2122
intTestImplementation "org.springframework.boot:spring-boot-starter-pulsar:${springBootVersion}"
2223
intTestImplementation "org.springframework.boot:spring-boot-starter-pulsar-reactive:${springBootVersion}"
24+
intTestImplementation "org.springframework.boot:spring-boot-testcontainers:${springBootVersion}"
2325
intTestImplementation 'org.testcontainers:junit-jupiter'
2426
intTestImplementation 'org.testcontainers:pulsar'
2527
intTestImplementation 'org.testcontainers:rabbitmq'
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.inttest.app;
18+
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
22+
import org.springframework.boot.ApplicationRunner;
23+
import org.springframework.boot.SpringBootConfiguration;
24+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
25+
import org.springframework.context.annotation.Bean;
26+
import org.springframework.context.annotation.Profile;
27+
import org.springframework.pulsar.annotation.PulsarListener;
28+
import org.springframework.pulsar.core.PulsarTemplate;
29+
import org.springframework.pulsar.core.PulsarTopic;
30+
31+
@SpringBootConfiguration
32+
@EnableAutoConfiguration
33+
@Profile("smoketest.pulsar.imperative")
34+
class ImperativeAppConfig {
35+
36+
private static final Log LOG = LogFactory.getLog(ImperativeAppConfig.class);
37+
38+
private static final String TOPIC = "pulsar-inttest-topic";
39+
40+
@Bean
41+
PulsarTopic pulsarTestTopic() {
42+
return PulsarTopic.builder(TOPIC).numberOfPartitions(1).build();
43+
}
44+
45+
@Bean
46+
ApplicationRunner sendMessagesToPulsarTopic(PulsarTemplate<SampleMessage> template) {
47+
return (args) -> {
48+
for (int i = 0; i < 10; i++) {
49+
template.send(TOPIC, new SampleMessage(i, "message:" + i));
50+
LOG.info("++++++PRODUCE IMPERATIVE:(" + i + ")------");
51+
}
52+
};
53+
}
54+
55+
@PulsarListener(topics = TOPIC)
56+
void consumeMessagesFromPulsarTopic(SampleMessage msg) {
57+
LOG.info("++++++CONSUME IMPERATIVE:(" + msg.id() + ")------");
58+
}
59+
60+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.inttest.app;
18+
19+
import org.testcontainers.junit.jupiter.Container;
20+
import org.testcontainers.junit.jupiter.Testcontainers;
21+
22+
import org.springframework.test.context.DynamicPropertyRegistry;
23+
import org.springframework.test.context.DynamicPropertySource;
24+
25+
@Testcontainers(disabledWithoutDocker = true)
26+
public interface PulsarContainerWithJksBasedSslTestSupport {
27+
28+
/**
29+
* Pulsar container with JKS based TLS enabled.
30+
*/
31+
@Container
32+
PulsarContainerWithSsl PULSAR_CONTAINER = PulsarContainerWithSsl.withJksBasedTls();
33+
34+
@DynamicPropertySource
35+
static void pulsarProperties(DynamicPropertyRegistry registry) {
36+
registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerTlsUrl);
37+
registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceTlsUrl);
38+
}
39+
40+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.inttest.app;
18+
19+
import org.testcontainers.junit.jupiter.Container;
20+
import org.testcontainers.junit.jupiter.Testcontainers;
21+
22+
import org.springframework.test.context.DynamicPropertyRegistry;
23+
import org.springframework.test.context.DynamicPropertySource;
24+
25+
@Testcontainers(disabledWithoutDocker = true)
26+
public interface PulsarContainerWithPemBasedSslTestSupport {
27+
28+
/**
29+
* Pulsar container with JKS based TLS enabled.
30+
*/
31+
@Container
32+
PulsarContainerWithSsl PULSAR_CONTAINER = PulsarContainerWithSsl.withPemBasedTls();
33+
34+
@DynamicPropertySource
35+
static void pulsarProperties(DynamicPropertyRegistry registry) {
36+
registry.add("spring.pulsar.client.service-url", PULSAR_CONTAINER::getPulsarBrokerTlsUrl);
37+
registry.add("spring.pulsar.admin.service-url", PULSAR_CONTAINER::getHttpServiceTlsUrl);
38+
}
39+
40+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.inttest.app;
18+
19+
import java.time.Duration;
20+
21+
import org.testcontainers.containers.PulsarContainer;
22+
import org.testcontainers.utility.MountableFile;
23+
24+
import org.springframework.pulsar.test.support.PulsarTestContainerSupport;
25+
26+
/**
27+
* A {@link PulsarContainer} with TLS (SSL) configuration.
28+
*
29+
* @author Chris Bono
30+
*/
31+
final class PulsarContainerWithSsl extends PulsarContainer {
32+
33+
static final int BROKER_TLS_PORT = 6651;
34+
35+
static final int BROKER_HTTP_TLS_PORT = 8081;
36+
37+
static PulsarContainerWithSsl withJksBasedTls() {
38+
return new PulsarContainerWithSsl(false);
39+
}
40+
41+
static PulsarContainerWithSsl withPemBasedTls() {
42+
return new PulsarContainerWithSsl(true);
43+
}
44+
45+
private PulsarContainerWithSsl(boolean pemBasedTls) {
46+
super(PulsarTestContainerSupport.getPulsarImage());
47+
withStartupAttempts(2);
48+
withStartupTimeout(Duration.ofMinutes(3));
49+
withEnv("PF_ENV_DEBUG", "1");
50+
51+
// TLS ports
52+
addExposedPorts(BROKER_TLS_PORT, BROKER_HTTP_TLS_PORT);
53+
withEnv("PULSAR_PREFIX_brokerServicePortTls", String.valueOf(BROKER_TLS_PORT));
54+
withEnv("PULSAR_PREFIX_webServicePortTls", String.valueOf(BROKER_HTTP_TLS_PORT));
55+
56+
// Enable mTLS
57+
withEnv("PULSAR_PREFIX_tlsEnabled", "true");
58+
withEnv("PULSAR_PREFIX_tlsRequireTrustedClientCertOnConnect", "true");
59+
60+
if (pemBasedTls) {
61+
// PEM based TLS
62+
withCopyFileToContainer(MountableFile.forClasspathResource("/ssl/pem/test-ca.crt"),
63+
"/pulsar/ssl/app/test-ca.crt");
64+
withCopyFileToContainer(MountableFile.forClasspathResource("/ssl/pem/test-server.crt"),
65+
"/pulsar/ssl/app/test-server.crt");
66+
withCopyFileToContainer(MountableFile.forClasspathResource("/ssl/pem/test-server.key"),
67+
"/pulsar/ssl/app/test-server.key");
68+
withCopyFileToContainer(MountableFile.forClasspathResource("/ssl/pem/test-client.crt"),
69+
"/pulsar/ssl/app/test-client.crt");
70+
withCopyFileToContainer(MountableFile.forClasspathResource("/ssl/pem/test-client.key"),
71+
"/pulsar/ssl/app/test-client.key");
72+
73+
// Pulsar client config
74+
withEnv("PULSAR_PREFIX_tlsTrustCertsFilePath", "/pulsar/ssl/app/test-ca.crt");
75+
withEnv("PULSAR_PREFIX_tlsCertificateFilePath", "/pulsar/ssl/app/test-server.crt");
76+
withEnv("PULSAR_PREFIX_tlsKeyFilePath", "/pulsar/ssl/app/test-server.key");
77+
78+
// Admin client config
79+
withEnv("PULSAR_PREFIX_brokerClientTlsEnabled", "true");
80+
withEnv("PULSAR_PREFIX_brokerClientTrustCertsFilePath", "/pulsar/ssl/app/test-ca.crt");
81+
withEnv("PULSAR_PREFIX_brokerClientCertificateFilePath", "/pulsar/ssl/app/test-client.crt");
82+
withEnv("PULSAR_PREFIX_brokerClientKeyFilePath", "/pulsar/ssl/app/test-client.key");
83+
}
84+
else {
85+
// JKS based TLS
86+
withCopyFileToContainer(MountableFile.forClasspathResource("/ssl/jks/test-ca.p12"),
87+
"/pulsar/ssl/app/test-ca.p12");
88+
withCopyFileToContainer(MountableFile.forClasspathResource("/ssl/jks/test-server.p12"),
89+
"/pulsar/ssl/app/test-server.p12");
90+
withCopyFileToContainer(MountableFile.forClasspathResource("/ssl/jks/test-client.p12"),
91+
"/pulsar/ssl/app/test-client.p12");
92+
93+
// Enable key store TLS
94+
withEnv("PULSAR_PREFIX_tlsEnabledWithKeyStore", "true");
95+
96+
// Pulsar client config
97+
withEnv("PULSAR_PREFIX_tlsEnabledWithKeyStore", "true");
98+
withEnv("PULSAR_PREFIX_tlsKeyStoreType", "PKCS12");
99+
withEnv("PULSAR_PREFIX_tlsKeyStore", "/pulsar/ssl/app/test-server.p12");
100+
withEnv("PULSAR_PREFIX_tlsKeyStorePassword", "password");
101+
withEnv("PULSAR_PREFIX_tlsTrustStoreType", "PKCS12");
102+
withEnv("PULSAR_PREFIX_tlsTrustStore", "/pulsar/ssl/app/test-ca.p12");
103+
withEnv("PULSAR_PREFIX_tlsTrustStorePassword", "password");
104+
105+
// Admin client config
106+
withEnv("PULSAR_PREFIX_brokerClientTlsEnabled", "true");
107+
withEnv("PULSAR_PREFIX_brokerClientTlsEnabledWithKeyStore", "true");
108+
withEnv("PULSAR_PREFIX_brokerClientTlsKeyStoreType", "PKCS12");
109+
withEnv("PULSAR_PREFIX_brokerClientTlsKeyStore", "/pulsar/ssl/app/test-client.p12");
110+
withEnv("PULSAR_PREFIX_brokerClientTlsKeyStorePassword", "password");
111+
withEnv("PULSAR_PREFIX_brokerClientTlsTrustStoreType", "PKCS12");
112+
withEnv("PULSAR_PREFIX_brokerClientTlsTrustStore", "/pulsar/ssl/app/test-ca.p12");
113+
withEnv("PULSAR_PREFIX_brokerClientTlsTrustStorePassword", "password");
114+
}
115+
116+
}
117+
118+
String getPulsarBrokerTlsUrl() {
119+
return String.format("pulsar+ssl://%s:%s", getHost(), getMappedPort(BROKER_TLS_PORT));
120+
}
121+
122+
String getHttpServiceTlsUrl() {
123+
return String.format("https://%s:%s", getHost(), getMappedPort(BROKER_HTTP_TLS_PORT));
124+
}
125+
126+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2012-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.pulsar.inttest.app;
18+
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
import org.apache.pulsar.reactive.client.api.MessageSpec;
22+
23+
import org.springframework.boot.ApplicationRunner;
24+
import org.springframework.boot.SpringBootConfiguration;
25+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
26+
import org.springframework.context.annotation.Bean;
27+
import org.springframework.context.annotation.Profile;
28+
import org.springframework.pulsar.core.PulsarTopic;
29+
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
30+
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
31+
32+
import reactor.core.publisher.Flux;
33+
import reactor.core.publisher.Mono;
34+
35+
@SpringBootConfiguration
36+
@EnableAutoConfiguration
37+
@Profile("smoketest.pulsar.reactive")
38+
class ReactiveAppConfig {
39+
40+
private static final Log LOG = LogFactory.getLog(ReactiveAppConfig.class);
41+
42+
private static final String TOPIC = "pulsar-reactive-inttest-topic";
43+
44+
@Bean
45+
PulsarTopic pulsarTestTopic() {
46+
return PulsarTopic.builder(TOPIC).numberOfPartitions(1).build();
47+
}
48+
49+
@Bean
50+
ApplicationRunner sendMessagesToPulsarTopic(ReactivePulsarTemplate<SampleMessage> template) {
51+
return (args) -> Flux.range(0, 10)
52+
.map((i) -> new SampleMessage(i, "message:" + i))
53+
.map(MessageSpec::of)
54+
.as((msgs) -> template.send(TOPIC, msgs))
55+
.doOnNext((sendResult) -> LOG
56+
.info("++++++PRODUCE REACTIVE:(" + sendResult.getMessageSpec().getValue().id() + ")------"))
57+
.subscribe();
58+
}
59+
60+
@ReactivePulsarListener(topics = TOPIC)
61+
Mono<Void> consumeMessagesFromPulsarTopic(SampleMessage msg) {
62+
LOG.info("++++++CONSUME REACTIVE:(" + msg.id() + ")------");
63+
return Mono.empty();
64+
}
65+
66+
}

0 commit comments

Comments
 (0)