diff --git a/docs/modules/pulsar.md b/docs/modules/pulsar.md
index f74c315cbdc..5c0c2497f4e 100644
--- a/docs/modules/pulsar.md
+++ b/docs/modules/pulsar.md
@@ -9,13 +9,13 @@ It's based on the official Apache Pulsar docker image, it is recommended to read
Create a `PulsarContainer` to use it in your tests:
-[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithVersion
+[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithVersion
Then you can retrieve the broker and the admin url:
-[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:coordinates
+[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:coordinates
## Options
@@ -26,7 +26,7 @@ If you need to set Pulsar configuration variables you can use the native APIs an
For example, if you want to enable `brokerDeduplicationEnabled`:
-[Set configuration variables](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithEnv
+[Set configuration variables](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithEnv
### Pulsar IO
@@ -34,7 +34,7 @@ For example, if you want to enable `brokerDeduplicationEnabled`:
If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker:
-[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker
+[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker
### Pulsar Transactions
@@ -42,7 +42,7 @@ If you need to test Pulsar IO framework you can enable the Pulsar Functions Work
If you need to test Pulsar Transactions you can enable the transactions feature:
-[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithTransactions
+[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithTransactions
diff --git a/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java b/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java
index 027e030c0de..49475d09511 100644
--- a/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java
+++ b/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java
@@ -14,7 +14,10 @@
*
Pulsar: 6650
* HTTP: 8080
*
+ *
+ * @deprecated use {@link org.testcontainers.pulsar.PulsarContainer} instead.
*/
+@Deprecated
public class PulsarContainer extends GenericContainer {
public static final int BROKER_PORT = 6650;
diff --git a/modules/pulsar/src/main/java/org/testcontainers/pulsar/PulsarContainer.java b/modules/pulsar/src/main/java/org/testcontainers/pulsar/PulsarContainer.java
new file mode 100644
index 00000000000..8936a7523c4
--- /dev/null
+++ b/modules/pulsar/src/main/java/org/testcontainers/pulsar/PulsarContainer.java
@@ -0,0 +1,103 @@
+package org.testcontainers.pulsar;
+
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+/**
+ * Testcontainers implementation for Apache Pulsar.
+ *
+ * Supported images: {@code apachepulsar/pulsar}, {@code apachepulsar/pulsar-all}
+ *
+ * Exposed ports:
+ *
+ * - Pulsar: 6650
+ * - HTTP: 8080
+ *
+ */
+public class PulsarContainer extends GenericContainer {
+
+ public static final int BROKER_PORT = 6650;
+
+ public static final int BROKER_HTTP_PORT = 8080;
+
+ private static final String ADMIN_CLUSTERS_ENDPOINT = "/admin/v2/clusters";
+
+ /**
+ * See SystemTopicNames.
+ */
+ private static final String TRANSACTION_TOPIC_ENDPOINT =
+ "/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions";
+
+ private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apachepulsar/pulsar");
+
+ private final WaitAllStrategy waitAllStrategy = new WaitAllStrategy();
+
+ private boolean functionsWorkerEnabled = false;
+
+ private boolean transactionsEnabled = false;
+
+ @Deprecated
+ public PulsarContainer(String dockerImageName) {
+ this(DockerImageName.parse(dockerImageName));
+ }
+
+ public PulsarContainer(final DockerImageName dockerImageName) {
+ super(dockerImageName);
+ dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME, DockerImageName.parse("apachepulsar/pulsar-all"));
+ withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
+ setWaitStrategy(waitAllStrategy);
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ setupCommandAndEnv();
+ }
+
+ public PulsarContainer withFunctionsWorker() {
+ functionsWorkerEnabled = true;
+ return this;
+ }
+
+ public PulsarContainer withTransactions() {
+ transactionsEnabled = true;
+ return this;
+ }
+
+ public String getPulsarBrokerUrl() {
+ return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT));
+ }
+
+ public String getHttpServiceUrl() {
+ return String.format("http://%s:%s", getHost(), getMappedPort(BROKER_HTTP_PORT));
+ }
+
+ protected void setupCommandAndEnv() {
+ String standaloneBaseCommand =
+ "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf " + "&& bin/pulsar standalone";
+
+ if (!functionsWorkerEnabled) {
+ standaloneBaseCommand += " --no-functions-worker -nss";
+ }
+
+ withCommand("/bin/bash", "-c", standaloneBaseCommand);
+
+ final String clusterName = getEnvMap().getOrDefault("PULSAR_PREFIX_clusterName", "standalone");
+ final String response = String.format("[\"%s\"]", clusterName);
+ waitAllStrategy.withStrategy(
+ Wait.forHttp(ADMIN_CLUSTERS_ENDPOINT).forPort(BROKER_HTTP_PORT).forResponsePredicate(response::equals)
+ );
+
+ if (transactionsEnabled) {
+ withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
+ waitAllStrategy.withStrategy(
+ Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT)
+ );
+ }
+ if (functionsWorkerEnabled) {
+ waitAllStrategy.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1));
+ }
+ }
+}
diff --git a/modules/pulsar/src/test/java/org/testcontainers/containers/AbstractPulsar.java b/modules/pulsar/src/test/java/org/testcontainers/pulsar/AbstractPulsar.java
similarity index 98%
rename from modules/pulsar/src/test/java/org/testcontainers/containers/AbstractPulsar.java
rename to modules/pulsar/src/test/java/org/testcontainers/pulsar/AbstractPulsar.java
index c704ce15d41..c86b594b163 100644
--- a/modules/pulsar/src/test/java/org/testcontainers/containers/AbstractPulsar.java
+++ b/modules/pulsar/src/test/java/org/testcontainers/pulsar/AbstractPulsar.java
@@ -1,4 +1,4 @@
-package org.testcontainers.containers;
+package org.testcontainers.pulsar;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdmin;
diff --git a/modules/pulsar/src/test/java/org/testcontainers/containers/CompatibleApachePulsarImageTest.java b/modules/pulsar/src/test/java/org/testcontainers/pulsar/CompatibleApachePulsarImageTest.java
similarity index 97%
rename from modules/pulsar/src/test/java/org/testcontainers/containers/CompatibleApachePulsarImageTest.java
rename to modules/pulsar/src/test/java/org/testcontainers/pulsar/CompatibleApachePulsarImageTest.java
index 702a7223151..0661994466d 100644
--- a/modules/pulsar/src/test/java/org/testcontainers/containers/CompatibleApachePulsarImageTest.java
+++ b/modules/pulsar/src/test/java/org/testcontainers/pulsar/CompatibleApachePulsarImageTest.java
@@ -1,4 +1,4 @@
-package org.testcontainers.containers;
+package org.testcontainers.pulsar;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.junit.jupiter.params.ParameterizedTest;
diff --git a/modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java b/modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java
similarity index 96%
rename from modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java
rename to modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java
index 93e9410ada8..32d96c88453 100644
--- a/modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java
+++ b/modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java
@@ -1,4 +1,4 @@
-package org.testcontainers.containers;
+package org.testcontainers.pulsar;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -19,7 +19,7 @@ void testUsage() throws Exception {
try (
// do not use PULSAR_IMAGE to make the doc looks easier
// constructorWithVersion {
- PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.0.0"));
+ PulsarContainer pulsar = new PulsarContainer("apachepulsar/pulsar:3.0.0");
// }
) {
pulsar.start();