Skip to content

Commit a03a93c

Browse files
committed
p9092
1 parent 2e0ef57 commit a03a93c

File tree

6 files changed

+130
-23
lines changed

6 files changed

+130
-23
lines changed

modules/kafka/diff

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
diff --git i/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java w/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java
2+
index 61e790d47..50c40b16e 100644
3+
--- i/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java
4+
+++ w/modules/kafka/src/main/java/org/testcontainers/kafka/KafkaHelper.java
5+
@@ -72,6 +72,11 @@ class KafkaHelper {
6+
.collect(Collectors.toSet());
7+
8+
List<String> listenersToTransform = new ArrayList<>(listenersSuppliers);
9+
+ List<String> networkAliases = kafkaContainer.getNetworkAliases();
10+
+ if (listenersToTransform.isEmpty() && !networkAliases.isEmpty()) {
11+
+ System.out.println("qqq " + networkAliases.get(0) + ":" + KAFKA_PORT);
12+
+ listenersToTransform.add(networkAliases.get(0) + ":" + KAFKA_PORT);
13+
+ }
14+
for (int i = 0; i < listenersToTransform.size(); i++) {
15+
String protocol = String.format("%s-%d", PROTOCOL_PREFIX, i);
16+
String listener = listenersToTransform.get(i);
17+
diff --git i/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java w/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java
18+
index e81b52574..06a2b1835 100644
19+
--- i/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java
20+
+++ w/modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java
21+
@@ -21,6 +21,19 @@ public class KafkaContainerTest extends AbstractKafka {
22+
}
23+
}
24+
25+
+ @Test
26+
+ public void testUsageWithNetworkAlias() throws Exception {
27+
+ try (
28+
+ Network network = Network.newNetwork();
29+
+ KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
30+
+ .withNetworkAliases("mykafka")
31+
+ .withNetwork(network)
32+
+ ) {
33+
+ kafka.start();
34+
+ testUsage("mykafka:9092", network);
35+
+ }
36+
+ }
37+
+
38+
@Test
39+
public void testUsageWithListener() throws Exception {
40+
try (
41+
@@ -60,4 +73,17 @@ public class KafkaContainerTest extends AbstractKafka {
42+
testKafkaFunctionality(bootstrapServers);
43+
}
44+
}
45+
+
46+
+ private void testUsage(String listener, Network network) throws Exception {
47+
+ try (KCatContainer kcat = new KCatContainer().withNetwork(network)) {
48+
+ kcat.start();
49+
+
50+
+ kcat.execInContainer("kcat", "-b", listener, "-t", "msgs", "-P", "-l", "/data/msgs.txt");
51+
+ String stdout = kcat
52+
+ .execInContainer("kcat", "-b", listener, "-C", "-t", "msgs", "-c", "1")
53+
+ .getStdout();
54+
+
55+
+ assertThat(stdout).contains("Message produced by kcat");
56+
+ }
57+
+ }
58+
}

modules/kafka/src/main/java/org/testcontainers/kafka/KafkaContainer.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,22 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
5959
containerInfo.getConfig().getHostName(),
6060
"9093"
6161
);
62+
63+
List<String> networkAliases = getNetworkAliases();
64+
String plaintextServer;
65+
if (listeners.isEmpty() && networkAliases.size() > 1) {
66+
// 0 is the random network alias generated by GenericContainer
67+
plaintextServer = networkAliases.get(1) + ":" + KAFKA_PORT;
68+
} else {
69+
plaintextServer = getBootstrapServers();
70+
}
71+
6272
List<String> advertisedListeners = new ArrayList<>();
63-
advertisedListeners.add("PLAINTEXT://" + getBootstrapServers());
73+
advertisedListeners.add("PLAINTEXT://" + plaintextServer);
6474
advertisedListeners.add(brokerAdvertisedListener);
6575

6676
advertisedListeners.addAll(KafkaHelper.resolveAdvertisedListeners(this.advertisedListeners));
6777
String kafkaAdvertisedListeners = String.join(",", advertisedListeners);
68-
6978
String command = "#!/bin/bash\n";
7079
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
7180
command += String.format("export KAFKA_ADVERTISED_LISTENERS=%s\n", kafkaAdvertisedListeners);

modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.testcontainers;
22

33
import com.google.common.collect.ImmutableMap;
4+
import lombok.SneakyThrows;
45
import org.apache.kafka.clients.admin.AdminClient;
56
import org.apache.kafka.clients.admin.AdminClientConfig;
67
import org.apache.kafka.clients.admin.NewTopic;
@@ -15,6 +16,7 @@
1516
import org.apache.kafka.common.serialization.StringDeserializer;
1617
import org.apache.kafka.common.serialization.StringSerializer;
1718
import org.awaitility.Awaitility;
19+
import org.testcontainers.containers.Network;
1820

1921
import java.time.Duration;
2022
import java.util.Collection;
@@ -150,4 +152,19 @@ protected static String getJaasConfig() {
150152
"user_test=\"secret\";";
151153
return jaasConfig;
152154
}
155+
156+
@SneakyThrows
157+
protected void assertKafka(String listener, Network network) {
158+
try (KCatContainer kcat = new KCatContainer().withNetwork(network)) {
159+
kcat.start();
160+
161+
kcat.execInContainer("kcat", "-b", listener, "-t", "msgs", "-P", "-l", "/data/msgs.txt");
162+
String stdout = kcat
163+
.execInContainer("kcat", "-b", listener, "-C", "-t", "msgs", "-c", "1")
164+
.getStdout();
165+
166+
assertThat(stdout).contains("Message produced by kcat");
167+
}
168+
}
169+
153170
}

modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,19 @@ public void testKraftPrecedenceOverEmbeddedZookeeper() throws Exception {
185185
}
186186
}
187187

188+
@Test
189+
public void testUsageWithNetworkAlias() {
190+
try (
191+
Network network = Network.newNetwork();
192+
KafkaContainer kafka = new KafkaContainer(KAFKA_KRAFT_TEST_IMAGE)
193+
.withNetworkAliases("mykafka")
194+
.withNetwork(network)
195+
) {
196+
kafka.start();
197+
assertKafka("mykafka:9092", network);
198+
}
199+
}
200+
188201
@Test
189202
public void testUsageWithListener() throws Exception {
190203
try (

modules/kafka/src/test/java/org/testcontainers/kafka/ConfluentKafkaContainerTest.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,19 @@ public void testUsage() throws Exception {
2424
}
2525
}
2626

27+
@Test
28+
public void testUsageWithNetworkAlias() {
29+
try (
30+
Network network = Network.newNetwork();
31+
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.4.0")
32+
.withNetworkAliases("mykafka")
33+
.withNetwork(network)
34+
) {
35+
kafka.start();
36+
assertKafka("mykafka:9092", network);
37+
}
38+
}
39+
2740
@Test
2841
public void testUsageWithListener() throws Exception {
2942
try (
@@ -36,14 +49,7 @@ public void testUsageWithListener() throws Exception {
3649
KCatContainer kcat = new KCatContainer().withNetwork(network)
3750
) {
3851
kafka.start();
39-
kcat.start();
40-
41-
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
42-
String stdout = kcat
43-
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
44-
.getStdout();
45-
46-
assertThat(stdout).contains("Message produced by kcat");
52+
assertKafka("kafka:19092", network);
4753
}
4854
}
4955

modules/kafka/src/test/java/org/testcontainers/kafka/KafkaContainerTest.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,9 @@
22

33
import org.junit.Test;
44
import org.testcontainers.AbstractKafka;
5-
import org.testcontainers.KCatContainer;
65
import org.testcontainers.containers.Network;
76
import org.testcontainers.containers.SocatContainer;
87

9-
import static org.assertj.core.api.Assertions.assertThat;
10-
118
public class KafkaContainerTest extends AbstractKafka {
129

1310
@Test
@@ -22,25 +19,32 @@ public void testUsage() throws Exception {
2219
}
2320

2421
@Test
25-
public void testUsageWithListener() throws Exception {
22+
public void testUsageWithNetworkAlias() {
23+
try (
24+
// registerAlias {
25+
Network network = Network.newNetwork();
26+
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
27+
.withNetworkAliases("kafka")
28+
.withNetwork(network);
29+
// }
30+
) {
31+
kafka.start();
32+
assertKafka("kafka:9092", network);
33+
}
34+
}
35+
36+
@Test
37+
public void testUsageWithListener() {
2638
try (
2739
Network network = Network.newNetwork();
2840
// registerListener {
2941
KafkaContainer kafka = new KafkaContainer("apache/kafka-native:3.8.0")
3042
.withListener("kafka:19092")
3143
.withNetwork(network);
3244
// }
33-
KCatContainer kcat = new KCatContainer().withNetwork(network)
3445
) {
3546
kafka.start();
36-
kcat.start();
37-
38-
kcat.execInContainer("kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt");
39-
String stdout = kcat
40-
.execInContainer("kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1")
41-
.getStdout();
42-
43-
assertThat(stdout).contains("Message produced by kcat");
47+
assertKafka("kafka:19092", network);
4448
}
4549
}
4650

0 commit comments

Comments
 (0)