From b2f704cbc50f342cb17a7979a27b575da764582a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Wed, 1 Oct 2025 10:39:07 -0600 Subject: [PATCH] Add Pulsar container implementation under org.testcontainers.pulsar --- docs/modules/pulsar.md | 10 +- .../containers/PulsarContainer.java | 3 + .../pulsar/PulsarContainer.java | 103 ++++++++++++++++++ .../AbstractPulsar.java | 2 +- .../CompatibleApachePulsarImageTest.java | 2 +- .../PulsarContainerTest.java | 4 +- 6 files changed, 115 insertions(+), 9 deletions(-) create mode 100644 modules/pulsar/src/main/java/org/testcontainers/pulsar/PulsarContainer.java rename modules/pulsar/src/test/java/org/testcontainers/{containers => pulsar}/AbstractPulsar.java (98%) rename modules/pulsar/src/test/java/org/testcontainers/{containers => pulsar}/CompatibleApachePulsarImageTest.java (97%) rename modules/pulsar/src/test/java/org/testcontainers/{containers => pulsar}/PulsarContainerTest.java (96%) diff --git a/docs/modules/pulsar.md b/docs/modules/pulsar.md index f74c315cbdc..5c0c2497f4e 100644 --- a/docs/modules/pulsar.md +++ b/docs/modules/pulsar.md @@ -9,13 +9,13 @@ It's based on the official Apache Pulsar docker image, it is recommended to read Create a `PulsarContainer` to use it in your tests: -[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithVersion +[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithVersion Then you can retrieve the broker and the admin url: -[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:coordinates +[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:coordinates ## Options @@ -26,7 +26,7 @@ If you need to set Pulsar configuration variables you can use the native APIs an For example, if you want to enable `brokerDeduplicationEnabled`: -[Set configuration variables](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithEnv +[Set configuration variables](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithEnv ### Pulsar IO @@ -34,7 +34,7 @@ For example, if you want to enable `brokerDeduplicationEnabled`: If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker: -[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker +[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker ### Pulsar Transactions @@ -42,7 +42,7 @@ If you need to test Pulsar IO framework you can enable the Pulsar Functions Work If you need to test Pulsar Transactions you can enable the transactions feature: -[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithTransactions +[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithTransactions diff --git a/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java b/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java index 027e030c0de..49475d09511 100644 --- a/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java +++ b/modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java @@ -14,7 +14,10 @@ *
  • Pulsar: 6650
  • *
  • HTTP: 8080
  • * + * + * @deprecated use {@link org.testcontainers.pulsar.PulsarContainer} instead. */ +@Deprecated public class PulsarContainer extends GenericContainer { public static final int BROKER_PORT = 6650; diff --git a/modules/pulsar/src/main/java/org/testcontainers/pulsar/PulsarContainer.java b/modules/pulsar/src/main/java/org/testcontainers/pulsar/PulsarContainer.java new file mode 100644 index 00000000000..8936a7523c4 --- /dev/null +++ b/modules/pulsar/src/main/java/org/testcontainers/pulsar/PulsarContainer.java @@ -0,0 +1,103 @@ +package org.testcontainers.pulsar; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.WaitAllStrategy; +import org.testcontainers.utility.DockerImageName; + +/** + * Testcontainers implementation for Apache Pulsar. + *

    + * Supported images: {@code apachepulsar/pulsar}, {@code apachepulsar/pulsar-all} + *

    + * Exposed ports: + *

    + */ +public class PulsarContainer extends GenericContainer { + + public static final int BROKER_PORT = 6650; + + public static final int BROKER_HTTP_PORT = 8080; + + private static final String ADMIN_CLUSTERS_ENDPOINT = "/admin/v2/clusters"; + + /** + * See SystemTopicNames. + */ + private static final String TRANSACTION_TOPIC_ENDPOINT = + "/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions"; + + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apachepulsar/pulsar"); + + private final WaitAllStrategy waitAllStrategy = new WaitAllStrategy(); + + private boolean functionsWorkerEnabled = false; + + private boolean transactionsEnabled = false; + + @Deprecated + public PulsarContainer(String dockerImageName) { + this(DockerImageName.parse(dockerImageName)); + } + + public PulsarContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME, DockerImageName.parse("apachepulsar/pulsar-all")); + withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT); + setWaitStrategy(waitAllStrategy); + } + + @Override + protected void configure() { + super.configure(); + setupCommandAndEnv(); + } + + public PulsarContainer withFunctionsWorker() { + functionsWorkerEnabled = true; + return this; + } + + public PulsarContainer withTransactions() { + transactionsEnabled = true; + return this; + } + + public String getPulsarBrokerUrl() { + return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT)); + } + + public String getHttpServiceUrl() { + return String.format("http://%s:%s", getHost(), getMappedPort(BROKER_HTTP_PORT)); + } + + protected void setupCommandAndEnv() { + String standaloneBaseCommand = + "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf " + "&& bin/pulsar standalone"; + + if (!functionsWorkerEnabled) { + standaloneBaseCommand += " --no-functions-worker -nss"; + } + + withCommand("/bin/bash", "-c", standaloneBaseCommand); + + final String clusterName = getEnvMap().getOrDefault("PULSAR_PREFIX_clusterName", "standalone"); + final String response = String.format("[\"%s\"]", clusterName); + waitAllStrategy.withStrategy( + Wait.forHttp(ADMIN_CLUSTERS_ENDPOINT).forPort(BROKER_HTTP_PORT).forResponsePredicate(response::equals) + ); + + if (transactionsEnabled) { + withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true"); + waitAllStrategy.withStrategy( + Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT) + ); + } + if (functionsWorkerEnabled) { + waitAllStrategy.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1)); + } + } +} diff --git a/modules/pulsar/src/test/java/org/testcontainers/containers/AbstractPulsar.java b/modules/pulsar/src/test/java/org/testcontainers/pulsar/AbstractPulsar.java similarity index 98% rename from modules/pulsar/src/test/java/org/testcontainers/containers/AbstractPulsar.java rename to modules/pulsar/src/test/java/org/testcontainers/pulsar/AbstractPulsar.java index c704ce15d41..c86b594b163 100644 --- a/modules/pulsar/src/test/java/org/testcontainers/containers/AbstractPulsar.java +++ b/modules/pulsar/src/test/java/org/testcontainers/pulsar/AbstractPulsar.java @@ -1,4 +1,4 @@ -package org.testcontainers.containers; +package org.testcontainers.pulsar; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.PulsarAdmin; diff --git a/modules/pulsar/src/test/java/org/testcontainers/containers/CompatibleApachePulsarImageTest.java b/modules/pulsar/src/test/java/org/testcontainers/pulsar/CompatibleApachePulsarImageTest.java similarity index 97% rename from modules/pulsar/src/test/java/org/testcontainers/containers/CompatibleApachePulsarImageTest.java rename to modules/pulsar/src/test/java/org/testcontainers/pulsar/CompatibleApachePulsarImageTest.java index 702a7223151..0661994466d 100644 --- a/modules/pulsar/src/test/java/org/testcontainers/containers/CompatibleApachePulsarImageTest.java +++ b/modules/pulsar/src/test/java/org/testcontainers/pulsar/CompatibleApachePulsarImageTest.java @@ -1,4 +1,4 @@ -package org.testcontainers.containers; +package org.testcontainers.pulsar; import org.apache.pulsar.client.admin.PulsarAdmin; import org.junit.jupiter.params.ParameterizedTest; diff --git a/modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java b/modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java similarity index 96% rename from modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java rename to modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java index 93e9410ada8..32d96c88453 100644 --- a/modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java +++ b/modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java @@ -1,4 +1,4 @@ -package org.testcontainers.containers; +package org.testcontainers.pulsar; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -19,7 +19,7 @@ void testUsage() throws Exception { try ( // do not use PULSAR_IMAGE to make the doc looks easier // constructorWithVersion { - PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.0.0")); + PulsarContainer pulsar = new PulsarContainer("apachepulsar/pulsar:3.0.0"); // } ) { pulsar.start();