Skip to content

Commit e60b1ac

Browse files
authored
Support adding new listeners to Apache Kafka (#9142)
Fixes #8669
1 parent ba22a7b commit e60b1ac

File tree

4 files changed

+155
-32
lines changed

4 files changed

+155
-32
lines changed

docs/modules/kafka.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,14 @@ Create a `ConfluentKafkaContainer` to use it in your tests:
4646
[Creating a ConlfuentKafkaContainer](../../modules/kafka/src/test/java/org/testcontainers/kafka/ConfluentKafkaContainerTest.java) inside_block:constructorWithVersion
4747
<!--/codeinclude-->
4848

49+
### Using org.testcontainers.kafka.KafkaContainer
50+
51+
Create a `KafkaContainer` to use it in your tests:
52+
53+
<!--codeinclude-->
54+
[Creating a KafkaContainer](../../modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java) inside_block:constructorWithVersion
55+
<!--/codeinclude-->
56+
4957
## Options
5058
5159
### <a name="zookeeper"></a> Using external Zookeeper

modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java

Lines changed: 75 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
import org.testcontainers.utility.DockerImageName;
88

99
import java.util.ArrayList;
10+
import java.util.HashSet;
1011
import java.util.List;
12+
import java.util.Set;
13+
import java.util.function.Supplier;
1114

1215
/**
1316
* Testcontainers implementation for Apache Kafka.
@@ -24,11 +27,11 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
2427

2528
private static final int KAFKA_PORT = 9092;
2629

27-
private static final String DEFAULT_INTERNAL_TOPIC_RF = "1";
28-
2930
private static final String STARTER_SCRIPT = "/tmp/testcontainers_start.sh";
3031

31-
private static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";
32+
private final Set<String> listeners = new HashSet<>();
33+
34+
private final Set<Supplier<String>> advertisedListeners = new HashSet<>();
3235

3336
public KafkaContainer(String imageName) {
3437
this(DockerImageName.parse(imageName));
@@ -39,31 +42,16 @@ public KafkaContainer(DockerImageName dockerImageName) {
3942
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME, APACHE_KAFKA_NATIVE_IMAGE_NAME);
4043

4144
withExposedPorts(KAFKA_PORT);
42-
withEnv("CLUSTER_ID", DEFAULT_CLUSTER_ID);
43-
44-
withEnv(
45-
"KAFKA_LISTENERS",
46-
"PLAINTEXT://0.0.0.0:" + KAFKA_PORT + ",BROKER://0.0.0.0:9093, CONTROLLER://0.0.0.0:9094"
47-
);
48-
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT");
49-
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
50-
withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
51-
withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
52-
53-
withEnv("KAFKA_NODE_ID", "1");
54-
withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
55-
withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", DEFAULT_INTERNAL_TOPIC_RF);
56-
withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", DEFAULT_INTERNAL_TOPIC_RF);
57-
withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", DEFAULT_INTERNAL_TOPIC_RF);
58-
withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", Long.MAX_VALUE + "");
59-
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
45+
withEnv(KafkaHelper.envVars());
6046

6147
withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT);
6248
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
6349
}
6450

6551
@Override
6652
protected void configure() {
53+
KafkaHelper.resolveListeners(this, this.listeners);
54+
6755
String firstNetworkAlias = getNetworkAliases().stream().findFirst().orElse(null);
6856
String networkAlias = getNetwork() != null ? firstNetworkAlias : "localhost";
6957
String controllerQuorumVoters = String.format("%s@%s:9094", getEnvMap().get("KAFKA_NODE_ID"), networkAlias);
@@ -80,7 +68,10 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
8068
List<String> advertisedListeners = new ArrayList<>();
8169
advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
8270
advertisedListeners.add(brokerAdvertisedListener);
71+
72+
advertisedListeners.addAll(KafkaHelper.resolveAdvertisedListeners(this.advertisedListeners));
8373
String kafkaAdvertisedListeners = String.join(",", advertisedListeners);
74+
8475
String command = "#!/bin/bash\n";
8576
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
8677
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);
@@ -89,6 +80,69 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
8980
copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
9081
}
9182

83+
/**
84+
* Add a listener in the format {@code host:port}.
85+
* Host will be included as a network alias.
86+
* <p>
87+
* Use it to register additional connections to the Kafka broker within the same container network.
88+
* <p>
89+
* The listener will be added to the list of default listeners.
90+
* <p>
91+
* Default listeners:
92+
* <ul>
93+
* <li>0.0.0.0:9092</li>
94+
* <li>0.0.0.0:9093</li>
95+
* <li>0.0.0.0:9094</li>
96+
* </ul>
97+
* <p>
98+
* The listener will be added to the list of default advertised listeners.
99+
* <p>
100+
* Default advertised listeners:
101+
* <ul>
102+
* <li>{@code container.getConfig().getHostName():9092}</li>
103+
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
104+
* </ul>
105+
* @param listener a listener with format {@code host:port}
106+
* @return this {@link KafkaContainer} instance
107+
*/
108+
public KafkaContainer withListener(String listener) {
109+
this.listeners.add(listener);
110+
this.advertisedListeners.add(() -> listener);
111+
return this;
112+
}
113+
114+
/**
115+
* Add a listener in the format {@code host:port} and a {@link Supplier} for the advertised listener.
116+
* Host from listener will be included as a network alias.
117+
* <p>
118+
* Use it to register additional connections to the Kafka broker from outside the container network
119+
* <p>
120+
* The listener will be added to the list of default listeners.
121+
* <p>
122+
* Default listeners:
123+
* <ul>
124+
* <li>0.0.0.0:9092</li>
125+
* <li>0.0.0.0:9093</li>
126+
* <li>0.0.0.0:9094</li>
127+
* </ul>
128+
* <p>
129+
* The {@link Supplier} will be added to the list of default advertised listeners.
130+
* <p>
131+
* Default advertised listeners:
132+
* <ul>
133+
* <li>{@code container.getConfig().getHostName():9092}</li>
134+
* <li>{@code container.getHost():container.getMappedPort(9093)}</li>
135+
* </ul>
136+
* @param listener a supplier that will provide a listener
137+
* @param advertisedListener a supplier that will provide a listener
138+
* @return this {@link KafkaContainer} instance
139+
*/
140+
public KafkaContainer withListener(String listener, Supplier<String> advertisedListener) {
141+
this.listeners.add(listener);
142+
this.advertisedListeners.add(advertisedListener);
143+
return this;
144+
}
145+
92146
public String getBootstrapServers() {
93147
return String.format("%s:%s", getHost(), getMappedPort(KAFKA_PORT));
94148
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.testcontainers.kafka;
2+
3+
import org.junit.Test;
4+
import org.junit.runner.RunWith;
5+
import org.junit.runners.Parameterized;
6+
import org.testcontainers.AbstractKafka;
7+
8+
@RunWith(Parameterized.class)
9+
public class CompatibleApacheKafkaImageTest extends AbstractKafka {
10+
11+
@Parameterized.Parameters(name = "{0}")
12+
public static String[] params() {
13+
return new String[] { "apache/kafka:3.8.0", "apache/kafka-native:3.8.0" };
14+
}
15+
16+
@Parameterized.Parameter
17+
public String imageName;
18+
19+
@Test
20+
public void testUsage() throws Exception {
21+
try (KafkaContainer kafka = new KafkaContainer(this.imageName)) {
22+
kafka.start();
23+
testKafkaFunctionality(kafka.getBootstrapServers());
24+
}
25+
}
26+
}
Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,61 @@
11
package org.testcontainers.kafka;
22

33
import org.junit.Test;
4-
import org.junit.runner.RunWith;
5-
import org.junit.runners.Parameterized;
64
import org.testcontainers.AbstractKafka;
5+
import org.testcontainers.KCatContainer;
6+
import org.testcontainers.containers.Network;
7+
import org.testcontainers.containers.SocatContainer;
8+
9+
import static org.assertj.core.api.Assertions.assertThat;
710

8-
@RunWith(Parameterized.class)
911
public class KafkaContainerTest extends AbstractKafka {
1012

11-
@Parameterized.Parameters(name = "{0}")
12-
public static String[] params() {
13-
return new String[] { "apache/kafka:3.8.0", "apache/kafka-native:3.8.0" };
13+
@Test
14+
public void testUsage() throws Exception {
15+
try ( // constructorWithVersion {
16+
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
17+
// }
18+
) {
19+
kafka.start();
20+
testKafkaFunctionality(kafka.getBootstrapServers());
21+
}
1422
}
1523

16-
@Parameterized.Parameter
17-
public String imageName;
24+
@Test
25+
public void testUsageWithListener() throws Exception {
26+
try (
27+
Network network = Network.newNetwork();
28+
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
29+
.withListener("kafka:19092")
30+
.withNetwork(network);
31+
KCatContainer kcat = new KCatContainer().withNetwork(network)
32+
) {
33+
kafka.start();
34+
kcat.start();
35+
36+
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
37+
String stdout = kcat
38+
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
39+
.getStdout();
40+
41+
assertThat(stdout).contains("Message produced by kcat");
42+
}
43+
}
1844

1945
@Test
20-
public void testUsage() throws Exception {
21-
try (KafkaContainer kafka = new KafkaContainer(imageName)) {
46+
public void testUsageWithListenerFromProxy() throws Exception {
47+
try (
48+
Network network = Network.newNetwork();
49+
SocatContainer socat = new SocatContainer().withNetwork(network).withTarget(2000, "kafka", 19092);
50+
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
51+
.withListener("kafka:19092", () -> socat.getHost() + ":" + socat.getMappedPort(2000))
52+
.withNetwork(network)
53+
) {
54+
socat.start();
2255
kafka.start();
23-
testKafkaFunctionality(kafka.getBootstrapServers());
56+
57+
String bootstrapServers = String.format("%s:%s", socat.getHost(), socat.getMappedPort(2000));
58+
testKafkaFunctionality(bootstrapServers);
2459
}
2560
}
2661
}

0 commit comments

Comments
 (0)