Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/modules/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ It's based on the official Apache Pulsar docker image, it is recommended to read
Create a `PulsarContainer` to use it in your tests:

<!--codeinclude-->
[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithVersion
[Create a Pulsar container](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithVersion
<!--/codeinclude-->

Then you can retrieve the broker and the admin url:

<!--codeinclude-->
[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:coordinates
[Get broker and admin urls](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:coordinates
<!--/codeinclude-->

## Options
Expand All @@ -26,23 +26,23 @@ If you need to set Pulsar configuration variables you can use the native APIs an
For example, if you want to enable `brokerDeduplicationEnabled`:

<!--codeinclude-->
[Set configuration variables](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithEnv
[Set configuration variables](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithEnv
<!--/codeinclude-->

### Pulsar IO

If you need to test Pulsar IO framework you can enable the Pulsar Functions Worker:

<!--codeinclude-->
[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker
[Create a Pulsar container with functions worker](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithFunctionsWorker
<!--/codeinclude-->

### Pulsar Transactions

If you need to test Pulsar Transactions you can enable the transactions feature:

<!--codeinclude-->
[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java) inside_block:constructorWithTransactions
[Create a Pulsar container with transactions](../../modules/pulsar/src/test/java/org/testcontainers/pulsar/PulsarContainerTest.java) inside_block:constructorWithTransactions
<!--/codeinclude-->


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* <li>Pulsar: 6650</li>
* <li>HTTP: 8080</li>
* </ul>
*
* @deprecated use {@link org.testcontainers.pulsar.PulsarContainer} instead.
*/
@Deprecated
public class PulsarContainer extends GenericContainer<PulsarContainer> {

public static final int BROKER_PORT = 6650;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.testcontainers.pulsar;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
import org.testcontainers.utility.DockerImageName;

/**
* Testcontainers implementation for Apache Pulsar.
* <p>
* Supported images: {@code apachepulsar/pulsar}, {@code apachepulsar/pulsar-all}
* <p>
* Exposed ports:
* <ul>
* <li>Pulsar: 6650</li>
* <li>HTTP: 8080</li>
* </ul>
*/
public class PulsarContainer extends GenericContainer<PulsarContainer> {

public static final int BROKER_PORT = 6650;

public static final int BROKER_HTTP_PORT = 8080;

private static final String ADMIN_CLUSTERS_ENDPOINT = "/admin/v2/clusters";

/**
* See <a href="https://github.com/apache/pulsar/blob/master/pulsar-common/src/main/java/org/apache/pulsar/common/naming/SystemTopicNames.java">SystemTopicNames</a>.
*/
private static final String TRANSACTION_TOPIC_ENDPOINT =
"/admin/v2/persistent/pulsar/system/transaction_coordinator_assign/partitions";

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apachepulsar/pulsar");

private final WaitAllStrategy waitAllStrategy = new WaitAllStrategy();

private boolean functionsWorkerEnabled = false;

private boolean transactionsEnabled = false;

@Deprecated
public PulsarContainer(String dockerImageName) {
this(DockerImageName.parse(dockerImageName));
}

public PulsarContainer(final DockerImageName dockerImageName) {
super(dockerImageName);
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME, DockerImageName.parse("apachepulsar/pulsar-all"));
withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
setWaitStrategy(waitAllStrategy);
}

@Override
protected void configure() {
super.configure();
setupCommandAndEnv();
}

public PulsarContainer withFunctionsWorker() {
functionsWorkerEnabled = true;
return this;
}

public PulsarContainer withTransactions() {
transactionsEnabled = true;
return this;
}

public String getPulsarBrokerUrl() {
return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT));
}

public String getHttpServiceUrl() {
return String.format("http://%s:%s", getHost(), getMappedPort(BROKER_HTTP_PORT));
}

protected void setupCommandAndEnv() {
String standaloneBaseCommand =
"/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf " + "&& bin/pulsar standalone";

if (!functionsWorkerEnabled) {
standaloneBaseCommand += " --no-functions-worker -nss";
}

withCommand("/bin/bash", "-c", standaloneBaseCommand);

final String clusterName = getEnvMap().getOrDefault("PULSAR_PREFIX_clusterName", "standalone");
final String response = String.format("[\"%s\"]", clusterName);
waitAllStrategy.withStrategy(
Wait.forHttp(ADMIN_CLUSTERS_ENDPOINT).forPort(BROKER_HTTP_PORT).forResponsePredicate(response::equals)
);

if (transactionsEnabled) {
withEnv("PULSAR_PREFIX_transactionCoordinatorEnabled", "true");
waitAllStrategy.withStrategy(
Wait.forHttp(TRANSACTION_TOPIC_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT)
);
}
if (functionsWorkerEnabled) {
waitAllStrategy.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.testcontainers.containers;
package org.testcontainers.pulsar;

import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.testcontainers.containers;
package org.testcontainers.pulsar;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.testcontainers.containers;
package org.testcontainers.pulsar;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
Expand All @@ -19,7 +19,7 @@ void testUsage() throws Exception {
try (
// do not use PULSAR_IMAGE to make the doc looks easier
// constructorWithVersion {
PulsarContainer pulsar = new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.0.0"));
PulsarContainer pulsar = new PulsarContainer("apachepulsar/pulsar:3.0.0");
// }
) {
pulsar.start();
Expand Down
Loading