Skip to content

Commit 4abffe6

Browse files
authored
Add Pulsar container implementation under org.testcontainers.pulsar (#11089)
1 parent 067963c commit 4abffe6

File tree

6 files changed

+115
-9
lines changed

6 files changed

+115
-9
lines changed

docs/modules/pulsar.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ It's based on the official Apache Pulsar docker image, it is recommended to read
99
Create a `PulsarContainer` to use it in your tests:
1010

1111
<!--codeinclude-->
12-
[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithVersion
12+
[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithVersion
1313
<!--/codeinclude-->
1414

1515
Then you can retrieve the broker and the admin url:
1616

1717
<!--codeinclude-->
18-
[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:coordinates
18+
[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:coordinates
1919
<!--/codeinclude-->
2020

2121
## Options
@@ -26,23 +26,23 @@ If you need to set Pulsar configuration variables you can use the native APIs an
2626
For example, if you want to enable `brokerDeduplicationEnabled`:
2727

2828
<!--codeinclude-->
29-
[Set configuration variables](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithEnv
29+
[Set configuration variables](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithEnv
3030
<!--/codeinclude-->
3131

3232
### Pulsar IO
3333

3434
If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker:
3535

3636
<!--codeinclude-->
37-
[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker
37+
[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker
3838
<!--/codeinclude-->
3939

4040
### Pulsar Transactions
4141

4242
If you need to test Pulsar Transactions you can enable the transactions feature:
4343

4444
<!--codeinclude-->
45-
[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithTransactions
45+
[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithTransactions
4646
<!--/codeinclude-->
4747

4848

modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
* <li>Pulsar: 6650</li>
1515
* <li>HTTP: 8080</li>
1616
* </ul>
17+
*
18+
* @deprecated use {@link org.testcontainers.pulsar.PulsarContainer} instead.
1719
*/
20+
@Deprecated
1821
public class PulsarContainer extends GenericContainer<PulsarContainer> {
1922

2023
public static final int BROKER_PORT = 6650;
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package org.testcontainers.pulsar;
2+
3+
import org.testcontainers.containers.GenericContainer;
4+
import org.testcontainers.containers.wait.strategy.Wait;
5+
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
6+
import org.testcontainers.utility.DockerImageName;
7+
8+
/**
9+
* Testcontainers implementation for Apache Pulsar.
10+
* <p>
11+
* Supported images: {@code apachepulsar/pulsar}, {@code apachepulsar/pulsar-all}
12+
* <p>
13+
* Exposed ports:
14+
* <ul>
15+
* <li>Pulsar: 6650</li>
16+
* <li>HTTP: 8080</li>
17+
* </ul>
18+
*/
19+
public class PulsarContainer extends GenericContainer<PulsarContainer> {
20+
21+
public static final int BROKER_PORT = 6650;
22+
23+
public static final int BROKER_HTTP_PORT = 8080;
24+
25+
private static final String ADMIN_CLUSTERS_ENDPOINT = "/admin/v2/clusters";
26+
27+
/**
28+
* See <a href="https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java">SystemTopicNames</a>.
29+
*/
30+
private static final String TRANSACTION_TOPIC_ENDPOINT =
31+
"/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions";
32+
33+
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apachepulsar/pulsar");
34+
35+
private final WaitAllStrategy waitAllStrategy = new WaitAllStrategy();
36+
37+
private boolean functionsWorkerEnabled = false;
38+
39+
private boolean transactionsEnabled = false;
40+
41+
@Deprecated
42+
public PulsarContainer(String dockerImageName) {
43+
this(DockerImageName.parse(dockerImageName));
44+
}
45+
46+
public PulsarContainer(final DockerImageName dockerImageName) {
47+
super(dockerImageName);
48+
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME, DockerImageName.parse("apachepulsar/pulsar-all"));
49+
withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
50+
setWaitStrategy(waitAllStrategy);
51+
}
52+
53+
@Override
54+
protected void configure() {
55+
super.configure();
56+
setupCommandAndEnv();
57+
}
58+
59+
public PulsarContainer withFunctionsWorker() {
60+
functionsWorkerEnabled = true;
61+
return this;
62+
}
63+
64+
public PulsarContainer withTransactions() {
65+
transactionsEnabled = true;
66+
return this;
67+
}
68+
69+
public String getPulsarBrokerUrl() {
70+
return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT));
71+
}
72+
73+
public String getHttpServiceUrl() {
74+
return String.format("http://%s:%s", getHost(), getMappedPort(BROKER_HTTP_PORT));
75+
}
76+
77+
protected void setupCommandAndEnv() {
78+
String standaloneBaseCommand =
79+
"/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf " + "&& bin/pulsar standalone";
80+
81+
if (!functionsWorkerEnabled) {
82+
standaloneBaseCommand += " --no-functions-worker -nss";
83+
}
84+
85+
withCommand("/bin/bash", "-c", standaloneBaseCommand);
86+
87+
final String clusterName = getEnvMap().getOrDefault("PULSAR_PREFIX_clusterName", "standalone");
88+
final String response = String.format("[\"%s\"]", clusterName);
89+
waitAllStrategy.withStrategy(
90+
Wait.forHttp(ADMIN_CLUSTERS_ENDPOINT).forPort(BROKER_HTTP_PORT).forResponsePredicate(response::equals)
91+
);
92+
93+
if (transactionsEnabled) {
94+
withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
95+
waitAllStrategy.withStrategy(
96+
Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT)
97+
);
98+
}
99+
if (functionsWorkerEnabled) {
100+
waitAllStrategy.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1));
101+
}
102+
}
103+
}

modules/pulsar/src/test/java/org/testcontainers/containers/AbstractPulsar.java renamed to modules/pulsar/src/test/java/org/testcontainers/pulsar/AbstractPulsar.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.testcontainers.containers;
1+
package org.testcontainers.pulsar;
22

33
import org.apache.pulsar.client.admin.ListTopicsOptions;
44
import org.apache.pulsar.client.admin.PulsarAdmin;

modules/pulsar/src/test/java/org/testcontainers/containers/CompatibleApachePulsarImageTest.java renamed to modules/pulsar/src/test/java/org/testcontainers/pulsar/CompatibleApachePulsarImageTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.testcontainers.containers;
1+
package org.testcontainers.pulsar;
22

33
import org.apache.pulsar.client.admin.PulsarAdmin;
44
import org.junit.jupiter.params.ParameterizedTest;

modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java renamed to modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.testcontainers.containers;
1+
package org.testcontainers.pulsar;
22

33
import org.apache.pulsar.client.admin.PulsarAdmin;
44
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -19,7 +19,7 @@ void testUsage() throws Exception {
1919
try (
2020
// do not use PULSAR_IMAGE to make the doc looks easier
2121
// constructorWithVersion {
22-
PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.0.0"));
22+
PulsarContainer pulsar = new PulsarContainer("apachepulsar/pulsar:3.0.0");
2323
// }
2424
) {
2525
pulsar.start();

0 commit comments

Comments
 (0)