Skip to content

[Bug]: #9784

@Valery-Pechur

Description

@Valery-Pechur

Module

Kafka

Testcontainers version

1.20.4

Using the latest Testcontainers version?

Yes

Host OS

Windows

Host Arch

x64

Docker version

Docker version 27.0.3

What happened?

Authentication error when starting test container

@UtilityClass
@Slf4j
public class KafkaInitializer {

    private static final String EMP_KAFKA_BOOTSTRAP_SERVERS_PROPERTY_NAME = "kafka.bootstrap-servers=";
    private static final String AUTH_USERNAME = "kafka-test";
    private static final String AUTH_PASSWORD = "kafka-test-password";

    private static final Map<String, String> KAFKA_ENV_WITH_AUTHORIZATION =  Map.ofEntries(

            Map.entry("KAFKA_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256,PLAIN"),
            Map.entry("KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256"),
            Map.entry("KAFKA_LISTENER_NAME_BROKER_SASL_ENABLED_MECHANISMS", "SCRAM-SHA-256"),
            Map.entry("KAFKA_LISTENER_NAME_CONTROLLER_SASL_ENABLED_MECHANISMS", "PLAIN"),
            Map.entry("KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL", "SCRAM-SHA-256"),
            Map.entry("KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL", "PLAIN"),

            Map.entry("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"),

            Map.entry("KAFKA_LISTENER_NAME_PLAINTEXT_SCRAM-SHA-256_SASL_JAAS_CONFIG",
                      "org.apache.kafka.common.security.scram.ScramLoginModule required username='%s' password='%s';"
                              .formatted(AUTH_USERNAME, AUTH_PASSWORD)),

            Map.entry("KAFKA_LISTENER_NAME_BROKER_SCRAM-SHA-256_SASL_JAAS_CONFIG",
                      "org.apache.kafka.common.security.scram.ScramLoginModule required username='%s' password='%s';"
                              .formatted(AUTH_USERNAME, AUTH_PASSWORD)),

            Map.entry("KAFKA_LISTENER_NAME_CONTROLLER_PLAIN_SASL_JAAS_CONFIG",
                      "org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='%s';"
                              .formatted(AUTH_USERNAME, AUTH_PASSWORD))
    );

    @Container
    private static final ConfluentKafkaContainer KAFKA_CONTAINER = new ConfluentKafkaContainer(
            DockerImageName.parse("confluentinc/cp-kafka:7.7.0")
    );

    public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

        @Override
        public void initialize(@NotNull ConfigurableApplicationContext applicationContext) {
            try {
           
                KAFKA_CONTAINER.withEnv(KAFKA_ENV_WITH_AUTHORIZATION)
                        .start();


            KAFKA_CONTAINER.execInContainer(
                        "/usr/bin/kafka-storage", "format", "-t", " $CLUSTER_ID",
                        "--config", "/etc/kafka/kraft/server.properties",
                        "--add-scram", "SCRAM-SHA-256=[name=%s,password=%s]".formatted(AUTH_USERNAME, AUTH_PASSWORD),
                        "--ignore-formatted"
                );

                log.info("KafkaContainer for tests started");
            } catch (Throwable throwable) {
                log.error("KafkaContainer not started: {}", throwable.getMessage());
            }

            try {
                String propertyBootstrapServers = EMP_KAFKA_BOOTSTRAP_SERVERS_PROPERTY_NAME + KAFKA_CONTAINER.getBootstrapServers();
                TestPropertyValues.of(
                        propertyBootstrapServers
                ).applyTo(applicationContext.getEnvironment());
                log.info("TestPropertyValues of KafkaContainer: propertyBootstrapServers = {} was applied", propertyBootstrapServers);
            } catch (Throwable throwable) {
                log.error(
                        "TestPropertyValues of KafkaContainer don't apply for applicationContext: {}",
                        throwable.getMessage()
                );
                throw throwable;
            }
        }
    }
}

Additionally, the following settings are set:

  if (authUsername != null && authPassword != null) {
            result.put(SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            result.put(SASL_MECHANISM, "SCRAM-SHA-256");
            result.put(SASL_JAAS_CONFIG, String.format(SASL_JAAS_CONFIG_TEMPLATE, authUsername, authPassword));
        }

These settings are set in DefaultKafkaConsumerFactory<>(consumerProperties, new StringDeserializer(), deserializer); and in ProducerFactory

Relevant log output

[ERROR] [Consumer clientId=consumer-user-role-test-1, groupId=user-role-test] Connection to node -1 (localhost/127.0.0.1:51382) failed authentication due to: Unexpected handshake request with client mechanism SCRAM-SHA-256, enabled mechanisms are []
[ERROR] Fatal consumer exception; stopping container
[ERROR] [Producer clientId=producer-1] Connection to node -1 (localhost/127.0.0.1:51382) failed authentication due to: Unexpected handshake request with client mechanism SCRAM-SHA-256, enabled mechanisms are []

Additional Information

Before changing the container, the following code worked:

 private static final Map<String, String> KAFKA_ENV_WITH_AUTHORIZATION = Map.of(
            "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:SASL_PLAINTEXT",
            "KAFKA_LISTENER_NAME_PLAINTEXT_SASL_ENABLED_MECHANISMS", ScramMechanism.SCRAM_SHA_256.mechanismName(),
            "KAFKA_LISTENER_NAME_PLAINTEXT_SCRAM-SHA-256_SASL_JAAS_CONFIG",
            "org.apache.kafka.common.security.scram.ScramLoginModule required username='%s' password='%s';"
                    .formatted(AUTH_USERNAME, AUTH_PASSWORD)
    );

   @Container
    private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(
            DockerImageName.parse("confluentinc/cp-kafka:7.7.0")
    );

    public static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

        @Override
        public void initialize(@NotNull ConfigurableApplicationContext applicationContext) {
            try {
      
                KAFKA_CONTAINER.withEnv(KAFKA_ENV_WITH_AUTHORIZATION).start();

                KAFKA_CONTAINER.execInContainer(
                        "kafka-configs", "--zookeeper", "localhost:2181", "--alter",
                        "--add-config", "SCRAM-SHA-256=[iterations=4096,password=%s]".formatted(AUTH_PASSWORD),
                        "--entity-type", "users", "--entity-name", AUTH_USERNAME
                );
                log.info("KafkaContainer for tests started");

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions