Skip to content

Commit 6186598

Browse files
lanwenbsideup
andauthored
Make it possible to enable functions worker in PulsarContainer (#2711)
* Method to enable functions worker in pulsar container This change should simplify a bit pulsar functions development experience Tried here: https://github.com/lanwen/pulsar-functions-example Without waiter pulsar could return random 503, depending on how fast was a call. * treat functions worker method as a flag to enable in configure real functions upload test * upd pulsar to 2.5.1 * don't store jar * expect error for functions without worker * reduce the changeset Co-authored-by: Sergei Egorov <[email protected]>
1 parent d03768b commit 6186598

File tree

3 files changed

+53
-1
lines changed

3 files changed

+53
-1
lines changed

modules/pulsar/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ dependencies {
55

66
testCompile group: 'org.apache.pulsar', name: 'pulsar-client', version: '2.6.0'
77
testCompile group: 'org.assertj', name: 'assertj-core', version: '3.16.1'
8+
testCompile group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: '2.5.1'
89
}

modules/pulsar/src/main/java/org/testcontainers/containers/PulsarContainer.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.testcontainers.containers;
22

33
import org.testcontainers.containers.wait.strategy.Wait;
4+
import org.testcontainers.containers.wait.strategy.WaitAllStrategy;
45
import org.testcontainers.utility.TestcontainersConfiguration;
56

67
/**
@@ -14,6 +15,8 @@ public class PulsarContainer extends GenericContainer<PulsarContainer> {
1415

1516
private static final String PULSAR_VERSION = "2.2.0";
1617

18+
private boolean functionsWorkerEnabled = false;
19+
1720
public PulsarContainer() {
1821
this(PULSAR_VERSION);
1922
}
@@ -25,6 +28,25 @@ public PulsarContainer(String pulsarVersion) {
2528
waitingFor(Wait.forHttp(METRICS_ENDPOINT).forStatusCode(200).forPort(BROKER_HTTP_PORT));
2629
}
2730

31+
@Override
32+
protected void configure() {
33+
super.configure();
34+
35+
if (functionsWorkerEnabled) {
36+
withCommand("/pulsar/bin/pulsar", "standalone");
37+
waitingFor(
38+
new WaitAllStrategy()
39+
.withStrategy(waitStrategy)
40+
.withStrategy(Wait.forLogMessage(".*Function worker service started.*", 1))
41+
);
42+
}
43+
}
44+
45+
public PulsarContainer withFunctionsWorker() {
46+
functionsWorkerEnabled = true;
47+
return this;
48+
}
49+
2850
public String getPulsarBrokerUrl() {
2951
return String.format("pulsar://%s:%s", getHost(), getMappedPort(BROKER_PORT));
3052
}
@@ -33,4 +55,3 @@ public String getHttpServiceUrl() {
3355
return String.format("http://%s:%s", getHost(), getMappedPort(BROKER_HTTP_PORT));
3456
}
3557
}
36-

modules/pulsar/src/test/java/org/testcontainers/containers/PulsarContainerTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.testcontainers.containers;
22

3+
import org.apache.pulsar.client.admin.PulsarAdmin;
4+
import org.apache.pulsar.client.admin.PulsarAdminException;
35
import org.apache.pulsar.client.api.Consumer;
46
import org.apache.pulsar.client.api.Message;
57
import org.apache.pulsar.client.api.Producer;
@@ -10,6 +12,7 @@
1012
import java.util.concurrent.TimeUnit;
1113

1214
import static org.assertj.core.api.Assertions.assertThat;
15+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1316

1417
public class PulsarContainerTest {
1518

@@ -23,6 +26,33 @@ public void testUsage() throws Exception {
2326
}
2427
}
2528

29+
@Test
30+
public void shouldNotEnableFunctionsWorkerByDefault() throws Exception {
31+
try (PulsarContainer pulsar = new PulsarContainer("2.5.1")) {
32+
pulsar.start();
33+
34+
PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
35+
.serviceHttpUrl(pulsar.getHttpServiceUrl())
36+
.build();
37+
38+
assertThatThrownBy(() -> pulsarAdmin.functions().getFunctions("public", "default"))
39+
.isInstanceOf(PulsarAdminException.class);
40+
}
41+
}
42+
43+
@Test
44+
public void shouldWaitForFunctionsWorkerStarted() throws Exception {
45+
try (PulsarContainer pulsar = new PulsarContainer("2.5.1").withFunctionsWorker()) {
46+
pulsar.start();
47+
48+
PulsarAdmin pulsarAdmin = PulsarAdmin.builder()
49+
.serviceHttpUrl(pulsar.getHttpServiceUrl())
50+
.build();
51+
52+
assertThat(pulsarAdmin.functions().getFunctions("public", "default")).hasSize(0);
53+
}
54+
}
55+
2656
protected void testPulsarFunctionality(String pulsarBrokerUrl) throws Exception {
2757

2858
try (

0 commit comments

Comments
 (0)