Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,11 @@
package com.azure.spring.cloud.autoconfigure.implementation.kafka;

import com.azure.spring.cloud.autoconfigure.implementation.context.properties.AzureGlobalProperties;
import com.azure.spring.cloud.autoconfigure.implementation.kafka.authentication.KafkaAuthenticationStrategy;
import com.azure.spring.cloud.autoconfigure.implementation.kafka.authentication.KafkaOAuth2AuthenticationStrategy;
import com.azure.spring.cloud.core.implementation.properties.PropertyMapper;
import com.azure.spring.cloud.core.properties.AzureProperties;
import com.azure.spring.cloud.service.implementation.jaas.Jaas;
import com.azure.spring.cloud.service.implementation.jaas.JaasResolver;
import com.azure.spring.cloud.service.implementation.kafka.AzureKafkaPropertiesUtils;
import com.azure.spring.cloud.service.implementation.kafka.KafkaOAuth2AuthenticateCallbackHandler;
import org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
Expand All @@ -25,48 +21,71 @@
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.azure.spring.cloud.core.implementation.util.AzureSpringIdentifier.AZURE_SPRING_EVENT_HUBS_KAFKA_OAUTH;
import static com.azure.spring.cloud.core.implementation.util.AzureSpringIdentifier.VERSION;
import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import static org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL;
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
import static org.springframework.util.StringUtils.delimitedListToStringArray;

/**
* Abstract base class for Kafka properties BeanPostProcessors.
* <p>
* This class provides common functionality for configuring Kafka authentication for Azure Event Hubs
* using different strategies. It uses the Strategy pattern to delegate authentication configuration
* to pluggable {@link KafkaAuthenticationStrategy} implementations.
* </p>
* <p>
* The processor intercepts Kafka properties beans during Spring bean initialization and applies
* authentication configuration to producer, consumer, and admin properties based on the configured
* strategy.
* </p>
* <p>
* <b>Authentication Flow:</b>
* <ol>
* <li>Bean post processor detects Kafka properties beans during initialization</li>
* <li>Retrieves AzureGlobalProperties for credential configuration</li>
* <li>For each set of properties (producer/consumer/admin):
* <ul>
* <li>Checks if authentication strategy should be applied via {@link KafkaAuthenticationStrategy#shouldApply}</li>
* <li>If applicable, applies authentication via {@link KafkaAuthenticationStrategy#applyAuthentication}</li>
* <li>Configures Kafka user agent for telemetry</li>
* <li>Clears Azure-specific properties from raw properties map</li>
* </ul>
* </li>
* </ol>
* </p>
* <p>
* By default, uses {@link KafkaOAuth2AuthenticationStrategy} for OAuth2/Microsoft Entra ID authentication.
* Subclasses can provide alternative strategies via the constructor.
* </p>
*
* @param <T> the type of Kafka properties bean to process (e.g., {@link KafkaProperties})
* @see KafkaAuthenticationStrategy
* @see KafkaOAuth2AuthenticationStrategy
*/
abstract class AbstractKafkaPropertiesBeanPostProcessor<T> implements BeanPostProcessor, ApplicationContextAware {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractKafkaPropertiesBeanPostProcessor.class);
static final String SECURITY_PROTOCOL_CONFIG_SASL = SASL_SSL.name();
static final String SASL_MECHANISM_OAUTH = OAUTHBEARER_MECHANISM;
static final String AZURE_CONFIGURED_JAAS_OPTIONS_KEY = "azure.configured";
static final String AZURE_CONFIGURED_JAAS_OPTIONS_VALUE = "true";
static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH =
KafkaOAuth2AuthenticateCallbackHandler.class.getName();
protected ApplicationContext applicationContext;
protected static final PropertyMapper PROPERTY_MAPPER = new PropertyMapper();
private static final Map<String, String> KAFKA_OAUTH_CONFIGS;
private static final String LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE = "OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.";
private static final String LOG_OAUTH_AUTOCONFIGURATION_CONFIGURE = "Spring Cloud Azure auto-configuration for Kafka OAUTHBEARER authentication will be loaded to configure your Kafka security and sasl properties to support Azure Identity credentials.";
private static final String LOG_OAUTH_AUTOCONFIGURATION_RECOMMENDATION = "Currently {} authentication mechanism is used, recommend to use Spring Cloud Azure auto-configuration for Kafka OAUTHBEARER authentication"
+ " which supports various Azure Identity credentials. To leverage the auto-configuration for OAuth2, you can just remove all your security, sasl and credential configurations of Kafka and Event Hubs."
+ " And configure Kafka bootstrap servers instead, which can be set as spring.kafka.boostrap-servers=EventHubsNamespacesFQDN:9093.";

static {
KAFKA_OAUTH_CONFIGS = Map.of(SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL_CONFIG_SASL, SASL_MECHANISM,
SASL_MECHANISM_OAUTH, SASL_LOGIN_CALLBACK_HANDLER_CLASS, SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH);
private AzureGlobalProperties azureGlobalProperties;
private final KafkaAuthenticationStrategy authenticationStrategy;

/**
* Constructor that initializes with the default OAuth2 authentication strategy.
*/
protected AbstractKafkaPropertiesBeanPostProcessor() {
this(new KafkaOAuth2AuthenticationStrategy());
}

private AzureGlobalProperties azureGlobalProperties;
/**
* Constructor that allows injection of a custom authentication strategy.
*
* @param authenticationStrategy the authentication strategy to use
*/
protected AbstractKafkaPropertiesBeanPostProcessor(KafkaAuthenticationStrategy authenticationStrategy) {
this.authenticationStrategy = authenticationStrategy;
}

@SuppressWarnings("unchecked")
@Override
Expand All @@ -76,14 +95,14 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
azureGlobalProperties = beanProvider.getIfAvailable();
if (azureGlobalProperties == null) {
LOGGER.debug("Cannot find a bean of type AzureGlobalProperties, "
+ "Spring Cloud Azure will skip performing JAAS enhancements on the {} bean.", beanName);
+ "Spring Cloud Azure will skip performing authentication configuration on the {} bean.", beanName);
return bean;
}

T properties = (T) bean;
replaceAzurePropertiesWithJaas(getMergedProducerProperties(properties), getRawProducerProperties(properties));
replaceAzurePropertiesWithJaas(getMergedConsumerProperties(properties), getRawConsumerProperties(properties));
replaceAzurePropertiesWithJaas(getMergedAdminProperties(properties), getRawAdminProperties(properties));
applyAuthentication(getMergedProducerProperties(properties), getRawProducerProperties(properties));
applyAuthentication(getMergedConsumerProperties(properties), getRawConsumerProperties(properties));
applyAuthentication(getMergedAdminProperties(properties), getRawAdminProperties(properties));
customizeProcess(properties);
}
return bean;
Expand Down Expand Up @@ -149,8 +168,7 @@ protected void customizeProcess(T properties) {


protected void clearAzureProperties(Map<String, String> properties) {
AzureKafkaPropertiesUtils.AzureKafkaPasswordlessPropertiesMapping.getPropertyKeys()
.forEach(properties::remove);
authenticationStrategy.clearAzureProperties(properties);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -178,75 +196,32 @@ protected Map<String, Object> invokeBuildKafkaProperties(KafkaProperties kafkaPr
}

/**
* This method executes two operations:
* <p>
* 1. When this configuration meets Azure Kafka passwordless startup requirements, convert all Azure properties
* in Kafka to {@link Jaas}, and configure the JAAS configuration back to Kafka.
* </p>
* <p>
* 2. Clear any Azure properties in Kafka properties.
* </p>
* @param mergedProperties the merged Kafka properties which can contain Azure properties to resolve JAAS from
* @param rawPropertiesMap the raw Kafka properties Map to configure JAAS to and remove Azure Properties from
* Applies authentication configuration to the Kafka properties.
* This method uses the configured authentication strategy to:
* 1. Apply authentication settings if the strategy determines it should
* 2. Configure Kafka user agent
* 3. Clear any Azure-specific properties from the raw properties map
*
* @param mergedProperties the merged Kafka properties which can contain Azure properties
* @param rawPropertiesMap the raw Kafka properties Map to configure authentication to
*/
private void replaceAzurePropertiesWithJaas(Map<String, Object> mergedProperties, Map<String, String> rawPropertiesMap) {
resolveJaasForAzure(mergedProperties)
.ifPresent(jaas -> {
configJaasToKafkaRawProperties(jaas, rawPropertiesMap);
logConfigureOAuthProperties();
configureKafkaUserAgent();
});
clearAzureProperties(rawPropertiesMap);
}

private Optional<Jaas> resolveJaasForAzure(Map<String, Object> mergedProperties) {
if (needConfigureSaslOAuth(mergedProperties)) {
JaasResolver resolver = new JaasResolver();
Jaas jaas = resolver.resolve((String) mergedProperties.get(SASL_JAAS_CONFIG))
.orElse(new Jaas(OAuthBearerLoginModule.class.getName()));
setAzurePropertiesToJaasOptionsIfAbsent(azureGlobalProperties, jaas);
setKafkaPropertiesToJaasOptions(mergedProperties, jaas);
jaas.getOptions().put(AZURE_CONFIGURED_JAAS_OPTIONS_KEY, AZURE_CONFIGURED_JAAS_OPTIONS_VALUE);
return Optional.of(jaas);
} else {
return Optional.empty();
private void applyAuthentication(Map<String, Object> mergedProperties, Map<String, String> rawPropertiesMap) {
if (authenticationStrategy.shouldApply(mergedProperties)) {
authenticationStrategy.applyAuthentication(mergedProperties, rawPropertiesMap, azureGlobalProperties);
configureKafkaUserAgent();
}
}

private void configJaasToKafkaRawProperties(Jaas jaas, Map<String, String> rawPropertiesMap) {
rawPropertiesMap.putAll(KAFKA_OAUTH_CONFIGS);
rawPropertiesMap.put(SASL_JAAS_CONFIG, jaas.toString());
authenticationStrategy.clearAzureProperties(rawPropertiesMap);
}

/**
* Configure necessary OAuth properties for kafka properties and log for the changes.
* Check if SASL OAuth authentication should be configured.
* This method delegates to the authentication strategy.
*
* @param sourceProperties the source kafka properties for admin/consumer/producer to detect
* @return whether OAuth authentication should be configured
*/
private void logConfigureOAuthProperties() {
getLogger().info(LOG_OAUTH_AUTOCONFIGURATION_CONFIGURE);
getLogger().debug(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL_CONFIG_SASL);
getLogger().debug(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SASL_MECHANISM, SASL_MECHANISM_OAUTH);
getLogger().debug(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SASL_JAAS_CONFIG, "***the value involves credentials and will not be logged***");
getLogger().debug(LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE, SASL_LOGIN_CALLBACK_HANDLER_CLASS,
SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH);
}


private void setKafkaPropertiesToJaasOptions(Map<String, ?> properties, Jaas jaas) {
AzureKafkaPropertiesUtils.AzureKafkaPasswordlessPropertiesMapping.getPropertyKeys()
.forEach(k -> PROPERTY_MAPPER.from(properties.get(k)).to(p -> jaas.getOptions().put(k, (String) p)));
}

private void setAzurePropertiesToJaasOptionsIfAbsent(AzureProperties azureProperties, Jaas jaas) {
convertAzurePropertiesToMap(azureProperties)
.forEach((k, v) -> jaas.getOptions().putIfAbsent(k, v));
}

private Map<String, String> convertAzurePropertiesToMap(AzureProperties properties) {
Map<String, String> configs = new HashMap<>();
for (AzureKafkaPropertiesUtils.AzureKafkaPasswordlessPropertiesMapping m : AzureKafkaPropertiesUtils.AzureKafkaPasswordlessPropertiesMapping.values()) {
PROPERTY_MAPPER.from(m.getter().apply(properties)).to(p -> configs.put(m.propertyKey(), p));
}
return configs;
boolean needConfigureSaslOAuth(Map<String, Object> sourceProperties) {
return authenticationStrategy.shouldApply(sourceProperties);
}

/**
Expand Down Expand Up @@ -280,68 +255,4 @@ static synchronized void configureKafkaUserAgent() {
}
}

/**
* Detect whether we need to configure SASL/OAUTHBEARER properties for {@link KafkaProperties}. Will configure when
* the security protocol is not configured, or it's set as SASL_SSL with sasl mechanism as null or OAUTHBEAR.
*
* @param sourceProperties the source kafka properties for admin/consumer/producer to detect
* @return whether we need to configure with Spring Cloud Azure MSI support or not.
*/
boolean needConfigureSaslOAuth(Map<String, Object> sourceProperties) {
return meetAzureBootstrapServerConditions(sourceProperties) && meetSaslOAuthConditions(sourceProperties);
}

private boolean meetSaslOAuthConditions(Map<String, Object> sourceProperties) {
String securityProtocol = (String) sourceProperties.get(SECURITY_PROTOCOL_CONFIG);
String saslMechanism = (String) sourceProperties.get(SASL_MECHANISM);
String jaasConfig = (String) sourceProperties.get(SASL_JAAS_CONFIG);
if (meetSaslProtocolConditions(securityProtocol) && meetSaslOAuth2MechanismConditions(saslMechanism)
&& meetJaasConditions(jaasConfig)) {
return true;
}
getLogger().info(LOG_OAUTH_AUTOCONFIGURATION_RECOMMENDATION, saslMechanism);
return false;
}

private boolean meetSaslProtocolConditions(String securityProtocol) {
return securityProtocol == null || SECURITY_PROTOCOL_CONFIG_SASL.equalsIgnoreCase(securityProtocol);
}

private boolean meetSaslOAuth2MechanismConditions(String saslMechanism) {
return saslMechanism == null || SASL_MECHANISM_OAUTH.equalsIgnoreCase(saslMechanism);
}
private boolean meetJaasConditions(String jaasConfig) {
if (jaasConfig == null) {
return true;
}
JaasResolver resolver = new JaasResolver();
return resolver.resolve(jaasConfig)
.map(jaas -> AZURE_CONFIGURED_JAAS_OPTIONS_VALUE.equals(
jaas.getOptions().get(AZURE_CONFIGURED_JAAS_OPTIONS_KEY)))
.orElse(false);
}

private boolean meetAzureBootstrapServerConditions(Map<String, Object> sourceProperties) {
Object bootstrapServers = sourceProperties.get(BOOTSTRAP_SERVERS_CONFIG);
List<String> serverList;
if (bootstrapServers instanceof String) {
serverList = Arrays.asList(delimitedListToStringArray((String) bootstrapServers, ","));
} else if (bootstrapServers instanceof Iterable<?>) {
serverList = new ArrayList<>();
for (Object obj : (Iterable) bootstrapServers) {
if (obj instanceof String) {
serverList.add((String) obj);
} else {
getLogger().debug("Kafka bootstrap server configuration doesn't meet passwordless requirements.");
return false;
}
}
} else {
getLogger().debug("Kafka bootstrap server configuration doesn't meet passwordless requirements.");
return false;
}

return serverList.size() == 1 && serverList.get(0).endsWith(":9093");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.spring.cloud.autoconfigure.implementation.kafka.authentication;

import com.azure.spring.cloud.autoconfigure.implementation.context.properties.AzureGlobalProperties;

import java.util.Map;

/**
* Strategy interface for Kafka authentication configuration.
* Implementations of this interface handle different authentication methods
* for connecting to Kafka/Event Hubs.
*
* @since 6.1.0
*/
public interface KafkaAuthenticationStrategy {

/**
* Determines if this authentication strategy should be applied based on the provided Kafka properties.
*
* @param kafkaProperties the merged Kafka properties (producer/consumer/admin)
* @return true if this strategy should be applied, false otherwise
*/
boolean shouldApply(Map<String, Object> kafkaProperties);

/**
* Applies the authentication configuration to the raw Kafka properties map.
*
* @param mergedProperties the merged Kafka properties which may contain Azure-specific properties
* @param rawPropertiesMap the raw Kafka properties Map to configure authentication settings to
* @param azureGlobalProperties the global Azure properties for credential configuration
*/
void applyAuthentication(Map<String, Object> mergedProperties,
Map<String, String> rawPropertiesMap,
AzureGlobalProperties azureGlobalProperties);

/**
* Removes any Azure-specific properties from the raw properties map.
* This is called after authentication has been applied.
*
* @param rawPropertiesMap the raw Kafka properties Map to clean
*/
void clearAzureProperties(Map<String, String> rawPropertiesMap);
}
Loading
Loading