Skip to content

Commit 92a475d

Browse files
committed
Add Confluent Kafka examples using SASL/SCRAM auth
1 parent 2707f31 commit 92a475d

File tree

4 files changed

+137
-17
lines changed

4 files changed

+137
-17
lines changed

modules/kafka/src/test/java/org/testcontainers/AbstractKafka.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.time.Duration;
2020
import java.util.Collection;
2121
import java.util.Collections;
22+
import java.util.Map;
2223
import java.util.Properties;
2324
import java.util.UUID;
2425
import java.util.concurrent.TimeUnit;
@@ -28,7 +29,7 @@
2829

2930
public class AbstractKafka {
3031

31-
private final ImmutableMap<String, String> properties = ImmutableMap.of(
32+
private static final ImmutableMap<String, String> PLAIN_PROPERTIES = ImmutableMap.of(
3233
AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
3334
"SASL_PLAINTEXT",
3435
SaslConfigs.SASL_MECHANISM,
@@ -37,16 +38,39 @@ public class AbstractKafka {
3738
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin\";"
3839
);
3940

41+
private static final ImmutableMap<String, String> SCRAM_PROPERTIES = ImmutableMap.of(
42+
AdminClientConfig.SECURITY_PROTOCOL_CONFIG,
43+
"SASL_PLAINTEXT",
44+
SaslConfigs.SASL_MECHANISM,
45+
"SCRAM-SHA-256",
46+
SaslConfigs.SASL_JAAS_CONFIG,
47+
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";"
48+
);
49+
4050
protected void testKafkaFunctionality(String bootstrapServers) throws Exception {
4151
testKafkaFunctionality(bootstrapServers, false, 1, 1);
4252
}
4353

44-
protected void testSecureKafkaFunctionality(String bootstrapServers) throws Exception {
45-
testKafkaFunctionality(bootstrapServers, true, 1, 1);
54+
protected void testSecurePlainKafkaFunctionality(String bootstrapServers) throws Exception {
55+
testKafkaFunctionality(bootstrapServers, true, PLAIN_PROPERTIES, 1, 1);
56+
}
57+
58+
protected void testSecureScramKafkaFunctionality(String bootstrapServers) throws Exception {
59+
testKafkaFunctionality(bootstrapServers, true, SCRAM_PROPERTIES, 1, 1);
4660
}
4761

4862
protected void testKafkaFunctionality(String bootstrapServers, boolean authenticated, int partitions, int rf)
4963
throws Exception {
64+
testKafkaFunctionality(bootstrapServers, authenticated, Collections.emptyMap(), partitions, rf);
65+
}
66+
67+
protected void testKafkaFunctionality(
68+
String bootstrapServers,
69+
boolean authenticated,
70+
Map<String, String> authProperties,
71+
int partitions,
72+
int rf
73+
) throws Exception {
5074
ImmutableMap<String, String> adminClientDefaultProperties = ImmutableMap.of(
5175
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
5276
bootstrapServers
@@ -75,9 +99,9 @@ protected void testKafkaFunctionality(String bootstrapServers, boolean authentic
7599
producerProperties.putAll(producerDefaultProperties);
76100

77101
if (authenticated) {
78-
adminClientProperties.putAll(this.properties);
79-
consumerProperties.putAll(this.properties);
80-
producerProperties.putAll(this.properties);
102+
adminClientProperties.putAll(authProperties);
103+
consumerProperties.putAll(authProperties);
104+
producerProperties.putAll(authProperties);
81105
}
82106
try (
83107
AdminClient adminClient = AdminClient.create(adminClientProperties);
@@ -116,4 +140,14 @@ protected void testKafkaFunctionality(String bootstrapServers, boolean authentic
116140
consumer.unsubscribe();
117141
}
118142
}
143+
144+
protected static String getJaasConfig() {
145+
String jaasConfig =
146+
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
147+
"username=\"admin\" " +
148+
"password=\"admin\" " +
149+
"user_admin=\"admin\" " +
150+
"user_test=\"secret\";";
151+
return jaasConfig;
152+
}
119153
}

modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.testcontainers.Testcontainers;
1515
import org.testcontainers.images.builder.Transferable;
1616
import org.testcontainers.utility.DockerImageName;
17+
import org.testcontainers.utility.MountableFile;
1718

1819
import java.util.Collection;
1920
import java.util.Collections;
@@ -229,7 +230,38 @@ public void shouldConfigureAuthenticationWithSaslUsingJaas() {
229230
) {
230231
kafka.start();
231232

232-
testSecureKafkaFunctionality(kafka.getBootstrapServers());
233+
testSecurePlainKafkaFunctionality(kafka.getBootstrapServers());
234+
}
235+
}
236+
237+
@SneakyThrows
238+
@Test
239+
public void shouldConfigureAuthenticationWithSaslScramUsingJaas() {
240+
try (
241+
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.7.0")) {
242+
protected String commandKraft() {
243+
String command = "sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure\n";
244+
command +=
245+
"echo 'kafka-storage format --ignore-formatted -t \"" +
246+
"$CLUSTER_ID" +
247+
"\" --add-scram SCRAM-SHA-256=[name=admin,password=admin] -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure\n";
248+
return command;
249+
}
250+
}
251+
.withKraft()
252+
.withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT")
253+
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256")
254+
.withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "SCRAM-SHA-256")
255+
.withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256")
256+
.withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf")
257+
.withCopyFileToContainer(
258+
MountableFile.forClasspathResource("kafka_server_jaas.conf"),
259+
"/etc/kafka/secrets/kafka_server_jaas.conf"
260+
)
261+
) {
262+
kafka.start();
263+
264+
testSecureScramKafkaFunctionality(kafka.getBootstrapServers());
233265
}
234266
}
235267

@@ -313,14 +345,4 @@ public void enableSaslAndWithAuthenticationError() {
313345
});
314346
}
315347
}
316-
317-
private static String getJaasConfig() {
318-
String jaasConfig =
319-
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
320-
"username=\"admin\" " +
321-
"password=\"admin\" " +
322-
"user_admin=\"admin\" " +
323-
"user_test=\"secret\";";
324-
return jaasConfig;
325-
}
326348
}

modules/kafka/src/test/java/org/testcontainers/kafka/ConfluentKafkaContainerTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package org.testcontainers.kafka;
22

3+
import com.github.dockerjava.api.command.InspectContainerResponse;
4+
import lombok.SneakyThrows;
35
import org.junit.Test;
46
import org.testcontainers.AbstractKafka;
57
import org.testcontainers.KCatContainer;
68
import org.testcontainers.containers.Network;
79
import org.testcontainers.containers.SocatContainer;
10+
import org.testcontainers.utility.MountableFile;
811

912
import static org.assertj.core.api.Assertions.assertThat;
1013

@@ -62,4 +65,60 @@ public void testUsageWithListenerFromProxy() throws Exception {
6265
testKafkaFunctionality(bootstrapServers);
6366
}
6467
}
68+
69+
@SneakyThrows
70+
@Test
71+
public void shouldConfigureAuthenticationWithSaslUsingJaas() {
72+
try (
73+
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.7.0")
74+
.withEnv(
75+
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
76+
"PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT"
77+
)
78+
.withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "PLAIN")
79+
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "PLAIN")
80+
.withEnv("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "PLAIN")
81+
.withEnv("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG", getJaasConfig())
82+
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG", getJaasConfig())
83+
) {
84+
kafka.start();
85+
86+
testSecurePlainKafkaFunctionality(kafka.getBootstrapServers());
87+
}
88+
}
89+
90+
@SneakyThrows
91+
@Test
92+
public void shouldConfigureAuthenticationWithSaslScramUsingJaas() {
93+
try (
94+
ConfluentKafkaContainer kafka = new ConfluentKafkaContainer("confluentinc/cp-kafka:7.7.0") {
95+
@SneakyThrows
96+
@Override
97+
protected void containerIsStarting(InspectContainerResponse containerInfo) {
98+
String command =
99+
"echo 'kafka-storage format --ignore-formatted -t \"" +
100+
"$CLUSTER_ID" +
101+
"\" --add-scram SCRAM-SHA-256=[name=admin,password=admin] -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure";
102+
execInContainer("bash", "-c", command);
103+
super.containerIsStarting(containerInfo);
104+
}
105+
}
106+
.withEnv(
107+
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
108+
"PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT"
109+
)
110+
.withEnv("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256")
111+
.withEnv("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "SCRAM-SHA-256")
112+
.withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256")
113+
.withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/secrets/kafka_server_jaas.conf")
114+
.withCopyFileToContainer(
115+
MountableFile.forClasspathResource("kafka_server_jaas.conf"),
116+
"/etc/kafka/secrets/kafka_server_jaas.conf"
117+
)
118+
) {
119+
kafka.start();
120+
121+
testSecureScramKafkaFunctionality(kafka.getBootstrapServers());
122+
}
123+
}
65124
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
KafkaServer {
2+
org.apache.kafka.common.security.scram.ScramLoginModule required
3+
username="admin"
4+
password="admin";
5+
};

0 commit comments

Comments
 (0)