Skip to content

Commit 3880dd4

Browse files
authored
Rework KafkaContainer (do not run a sub-process) (#2078)
* Rework KafkaContainer (do not run a sub-process) * revert unrelated change * Escape environment variables * Start ZK before Kafka
1 parent 96c8e05 commit 3880dd4

File tree

2 files changed

+27
-13
lines changed

2 files changed

+27
-13
lines changed

modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,22 @@
44
import com.github.dockerjava.api.command.InspectContainerResponse;
55
import com.github.dockerjava.core.command.ExecStartResultCallback;
66
import lombok.SneakyThrows;
7+
import org.testcontainers.images.builder.Transferable;
78
import org.testcontainers.utility.TestcontainersConfiguration;
89

10+
import java.nio.charset.StandardCharsets;
911
import java.util.concurrent.TimeUnit;
12+
import java.util.stream.Collectors;
13+
import java.util.stream.Stream;
1014

1115
/**
1216
* This container wraps Confluent Kafka and Zookeeper (optionally)
1317
*
1418
*/
1519
public class KafkaContainer extends GenericContainer<KafkaContainer> {
1620

21+
private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
22+
1723
public static final int KAFKA_PORT = 9093;
1824

1925
public static final int ZOOKEEPER_PORT = 2181;
@@ -97,7 +103,7 @@ public String getBootstrapServers() {
97103

98104
@Override
99105
protected void doStart() {
100-
withCommand("sleep infinity");
106+
withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
101107

102108
if (externalZookeeperConnect == null) {
103109
addExposedPort(ZOOKEEPER_PORT);
@@ -124,17 +130,24 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole
124130
zookeeperConnect = startZookeeper();
125131
}
126132

127-
String internalIp = containerInfo.getNetworkSettings().getIpAddress();
128-
129-
ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(getContainerId())
130-
.withCmd("sh", "-c", "" +
131-
"export KAFKA_ZOOKEEPER_CONNECT=" + zookeeperConnect + "\n" +
132-
"export KAFKA_ADVERTISED_LISTENERS=" + getBootstrapServers() + "," + String.format("BROKER://%s:9092", internalIp) + "\n" +
133-
"/etc/confluent/docker/run"
133+
String command = "#!/bin/bash \n";
134+
command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n";
135+
command += "export KAFKA_ADVERTISED_LISTENERS='" + Stream
136+
.concat(
137+
Stream.of(getBootstrapServers()),
138+
containerInfo.getNetworkSettings().getNetworks().values().stream()
139+
.map(it -> "BROKER://" + it.getIpAddress() + ":9092")
134140
)
135-
.exec();
141+
.collect(Collectors.joining(",")) + "'\n";
136142

137-
dockerClient.execStartCmd(execCreateCmdResponse.getId()).exec(new ExecStartResultCallback()).awaitStarted(10, TimeUnit.SECONDS);
143+
command += ". /etc/confluent/docker/bash-config \n";
144+
command += "/etc/confluent/docker/configure \n";
145+
command += "/etc/confluent/docker/launch \n";
146+
147+
copyFileToContainer(
148+
Transferable.of(command.getBytes(StandardCharsets.UTF_8), 700),
149+
STARTER_SCRIPT
150+
);
138151
}
139152

140153
@SneakyThrows(InterruptedException.class)

modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import java.util.Arrays;
1717
import java.util.UUID;
1818
import java.util.concurrent.TimeUnit;
19-
import java.util.stream.Stream;
2019

2120
import static org.assertj.core.api.Assertions.assertThat;
2221
import static org.assertj.core.api.Assertions.tuple;
@@ -46,7 +45,8 @@ public void testExternalZookeeperWithKafkaNetwork() throws Exception {
4645
.withNetworkAliases("zookeeper")
4746
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
4847
) {
49-
Stream.of(kafka, zookeeper).parallel().forEach(GenericContainer::start);
48+
zookeeper.start();
49+
kafka.start();
5050

5151
testKafkaFunctionality(kafka.getBootstrapServers());
5252
}
@@ -66,7 +66,8 @@ public void testExternalZookeeperWithExternalNetwork() throws Exception {
6666
.withNetworkAliases("zookeeper")
6767
.withEnv("ZOOKEEPER_CLIENT_PORT", "2181");
6868
) {
69-
Stream.of(kafka, zookeeper).parallel().forEach(GenericContainer::start);
69+
zookeeper.start();
70+
kafka.start();
7071

7172
testKafkaFunctionality(kafka.getBootstrapServers());
7273
}

0 commit comments

Comments
 (0)