11package org .testcontainers .containers ;
22
33import com .google .common .collect .ImmutableMap ;
4+ import lombok .SneakyThrows ;
45import org .apache .kafka .clients .admin .AdminClient ;
56import org .apache .kafka .clients .admin .AdminClientConfig ;
67import org .apache .kafka .clients .admin .NewTopic ;
1112import org .apache .kafka .clients .producer .KafkaProducer ;
1213import org .apache .kafka .clients .producer .ProducerConfig ;
1314import org .apache .kafka .clients .producer .ProducerRecord ;
15+ import org .apache .kafka .common .config .SaslConfigs ;
16+ import org .apache .kafka .common .errors .SaslAuthenticationException ;
17+ import org .apache .kafka .common .errors .TopicAuthorizationException ;
1418import org .apache .kafka .common .serialization .StringDeserializer ;
1519import org .apache .kafka .common .serialization .StringSerializer ;
20+ import org .awaitility .Awaitility ;
1621import org .junit .Test ;
1722import org .rnorth .ducttape .unreliables .Unreliables ;
1823import org .testcontainers .Testcontainers ;
2227import java .time .Duration ;
2328import java .util .Collection ;
2429import java .util .Collections ;
30+ import java .util .Properties ;
2531import java .util .UUID ;
2632import java .util .concurrent .TimeUnit ;
2733
2834import static org .assertj .core .api .Assertions .assertThat ;
35+ import static org .assertj .core .api .Assertions .assertThatThrownBy ;
2936import static org .assertj .core .api .Assertions .tuple ;
3037
3138public class KafkaContainerTest {
@@ -38,6 +45,15 @@ public class KafkaContainerTest {
3845 "confluentinc/cp-zookeeper:4.0.0"
3946 );
4047
48+ private final ImmutableMap <String , String > properties = ImmutableMap .of (
49+ AdminClientConfig .SECURITY_PROTOCOL_CONFIG ,
50+ "SASL_PLAINTEXT" ,
51+ SaslConfigs .SASL_MECHANISM ,
52+ "PLAIN" ,
53+ SaslConfigs .SASL_JAAS_CONFIG ,
54+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\" admin\" password=\" admin\" ;"
55+ );
56+
4157 @ Test
4258 public void testUsage () throws Exception {
4359 try (KafkaContainer kafka = new KafkaContainer (KAFKA_TEST_IMAGE )) {
@@ -220,34 +236,166 @@ public void testUsageWithListener() throws Exception {
220236 }
221237 }
222238
223- protected void testKafkaFunctionality (String bootstrapServers ) throws Exception {
224- testKafkaFunctionality (bootstrapServers , 1 , 1 );
239+ @ SneakyThrows
240+ @ Test
241+ public void shouldConfigureAuthenticationWithSaslUsingJaas () {
242+ try (
243+ KafkaContainer kafka = new KafkaContainer (DockerImageName .parse ("confluentinc/cp-kafka:6.2.1" ))
244+ .withEnv ("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" , "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT" )
245+ .withEnv ("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL" , "PLAIN" )
246+ .withEnv ("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS" , "PLAIN" )
247+ .withEnv ("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS" , "PLAIN" )
248+ .withEnv ("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG" , getJaasConfig ())
249+ .withEnv ("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG" , getJaasConfig ())
250+ ) {
251+ kafka .start ();
252+
253+ testSecureKafkaFunctionality (kafka .getBootstrapServers ());
254+ }
225255 }
226256
227- protected void testKafkaFunctionality (String bootstrapServers , int partitions , int rf ) throws Exception {
257+ @ SneakyThrows
258+ @ Test
259+ public void enableSaslWithUnsuccessfulTopicCreation () {
228260 try (
261+ KafkaContainer kafka = new KafkaContainer (DockerImageName .parse ("confluentinc/cp-kafka:6.2.1" ))
262+ .withEnv ("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" , "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT" )
263+ .withEnv ("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL" , "PLAIN" )
264+ .withEnv ("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS" , "PLAIN" )
265+ .withEnv ("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS" , "PLAIN" )
266+ .withEnv ("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG" , getJaasConfig ())
267+ .withEnv ("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG" , getJaasConfig ())
268+ .withEnv ("KAFKA_AUTHORIZER_CLASS_NAME" , "kafka.security.authorizer.AclAuthorizer" )
269+ .withEnv ("KAFKA_SUPER_USERS" , "User:admin" )
270+ ) {
271+ kafka .start ();
272+
229273 AdminClient adminClient = AdminClient .create (
230- ImmutableMap .of (AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG , bootstrapServers )
274+ ImmutableMap .of (
275+ AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG ,
276+ kafka .getBootstrapServers (),
277+ AdminClientConfig .SECURITY_PROTOCOL_CONFIG ,
278+ "SASL_PLAINTEXT" ,
279+ SaslConfigs .SASL_MECHANISM ,
280+ "PLAIN" ,
281+ SaslConfigs .SASL_JAAS_CONFIG ,
282+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\" test\" password=\" secret\" ;"
283+ )
231284 );
232- KafkaProducer <String , String > producer = new KafkaProducer <>(
285+
286+ String topicName = "messages-" + UUID .randomUUID ();
287+ Collection <NewTopic > topics = Collections .singletonList (new NewTopic (topicName , 1 , (short ) 1 ));
288+
289+ Awaitility
290+ .await ()
291+ .untilAsserted (() -> {
292+ assertThatThrownBy (() -> adminClient .createTopics (topics ).all ().get (30 , TimeUnit .SECONDS ))
293+ .hasCauseInstanceOf (TopicAuthorizationException .class );
294+ });
295+ }
296+ }
297+
298+ @ SneakyThrows
299+ @ Test
300+ public void enableSaslAndWithAuthenticationError () {
301+ String jaasConfig = getJaasConfig ();
302+ try (
303+ KafkaContainer kafka = new KafkaContainer (DockerImageName .parse ("confluentinc/cp-kafka:6.2.1" ))
304+ .withEnv ("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" , "PLAINTEXT:SASL_PLAINTEXT,BROKER:SASL_PLAINTEXT" )
305+ .withEnv ("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL" , "PLAIN" )
306+ .withEnv ("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS" , "PLAIN" )
307+ .withEnv ("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS" , "PLAIN" )
308+ .withEnv ("KAFKA_LISTENER_NAME_BROKER_PLAIN_SASL_JAAS_CONFIG" , jaasConfig )
309+ .withEnv ("KAFKA_LISTENER_NAME_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG" , jaasConfig )
310+ ) {
311+ kafka .start ();
312+
313+ AdminClient adminClient = AdminClient .create (
233314 ImmutableMap .of (
234- ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ,
235- bootstrapServers ,
236- ProducerConfig .CLIENT_ID_CONFIG ,
237- UUID .randomUUID ().toString ()
238- ),
315+ AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG ,
316+ kafka .getBootstrapServers (),
317+ AdminClientConfig .SECURITY_PROTOCOL_CONFIG ,
318+ "SASL_PLAINTEXT" ,
319+ SaslConfigs .SASL_MECHANISM ,
320+ "PLAIN" ,
321+ SaslConfigs .SASL_JAAS_CONFIG ,
322+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\" test\" password=\" secretx\" ;"
323+ )
324+ );
325+
326+ String topicName = "messages-" + UUID .randomUUID ();
327+ Collection <NewTopic > topics = Collections .singletonList (new NewTopic (topicName , 1 , (short ) 1 ));
328+
329+ Awaitility
330+ .await ()
331+ .untilAsserted (() -> {
332+ assertThatThrownBy (() -> adminClient .createTopics (topics ).all ().get (30 , TimeUnit .SECONDS ))
333+ .hasCauseInstanceOf (SaslAuthenticationException .class );
334+ });
335+ }
336+ }
337+
338+ private static String getJaasConfig () {
339+ String jaasConfig =
340+ "org.apache.kafka.common.security.plain.PlainLoginModule required " +
341+ "username=\" admin\" " +
342+ "password=\" admin\" " +
343+ "user_admin=\" admin\" " +
344+ "user_test=\" secret\" ;" ;
345+ return jaasConfig ;
346+ }
347+
348+ private void testKafkaFunctionality (String bootstrapServers ) throws Exception {
349+ testKafkaFunctionality (bootstrapServers , false , 1 , 1 );
350+ }
351+
352+ private void testSecureKafkaFunctionality (String bootstrapServers ) throws Exception {
353+ testKafkaFunctionality (bootstrapServers , true , 1 , 1 );
354+ }
355+
356+ private void testKafkaFunctionality (String bootstrapServers , boolean authenticated , int partitions , int rf )
357+ throws Exception {
358+ ImmutableMap <String , String > adminClientDefaultProperties = ImmutableMap .of (
359+ AdminClientConfig .BOOTSTRAP_SERVERS_CONFIG ,
360+ bootstrapServers
361+ );
362+ Properties adminClientProperties = new Properties ();
363+ adminClientProperties .putAll (adminClientDefaultProperties );
364+
365+ ImmutableMap <String , String > consumerDefaultProperties = ImmutableMap .of (
366+ ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ,
367+ bootstrapServers ,
368+ ConsumerConfig .GROUP_ID_CONFIG ,
369+ "tc-" + UUID .randomUUID (),
370+ ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ,
371+ "earliest"
372+ );
373+ Properties consumerProperties = new Properties ();
374+ consumerProperties .putAll (consumerDefaultProperties );
375+
376+ ImmutableMap <String , String > producerDefaultProperties = ImmutableMap .of (
377+ ProducerConfig .BOOTSTRAP_SERVERS_CONFIG ,
378+ bootstrapServers ,
379+ ProducerConfig .CLIENT_ID_CONFIG ,
380+ UUID .randomUUID ().toString ()
381+ );
382+ Properties producerProperties = new Properties ();
383+ producerProperties .putAll (producerDefaultProperties );
384+
385+ if (authenticated ) {
386+ adminClientProperties .putAll (this .properties );
387+ consumerProperties .putAll (this .properties );
388+ producerProperties .putAll (this .properties );
389+ }
390+ try (
391+ AdminClient adminClient = AdminClient .create (adminClientProperties );
392+ KafkaProducer <String , String > producer = new KafkaProducer <>(
393+ producerProperties ,
239394 new StringSerializer (),
240395 new StringSerializer ()
241396 );
242397 KafkaConsumer <String , String > consumer = new KafkaConsumer <>(
243- ImmutableMap .of (
244- ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG ,
245- bootstrapServers ,
246- ConsumerConfig .GROUP_ID_CONFIG ,
247- "tc-" + UUID .randomUUID (),
248- ConsumerConfig .AUTO_OFFSET_RESET_CONFIG ,
249- "earliest"
250- ),
398+ consumerProperties ,
251399 new StringDeserializer (),
252400 new StringDeserializer ()
253401 );
0 commit comments