Skip to content

Commit dc3f12b

Browse files
authored
Fix kafka connection string auth configuration condition and improve BPP (Azure#43854)
1 parent 09f4641 commit dc3f12b

24 files changed

+205
-111
lines changed

sdk/spring/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ This section includes changes in `spring-cloud-azure-autoconfigure` module.
99

1010
#### Bugs Fixed
1111
- Fix bug: Registered the empty value for ineligible definition, it causes NPE when sending message via bean `StreamBridge`. [#43366](https://github.com/Azure/azure-sdk-for-java/issues/43366).
12+
- Fix bug: Not working when using Spring Kafka and Kafka Binder via connection string auth [#43853](https://github.com/Azure/azure-sdk-for-java/issues/43853).
1213

1314
### Spring Messaging Azure Service Bus
1415
This section includes changes in the `spring-messaging-azure-servicebus` module.

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/context/AzureServiceClientBuilderFactoryConfiguration.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
package com.azure.spring.cloud.autoconfigure.implementation.context;
55

66
import com.azure.spring.cloud.core.implementation.factory.AbstractAzureServiceClientBuilderFactory;
7-
import org.springframework.beans.factory.config.BeanDefinition;
87
import org.springframework.context.annotation.Bean;
98
import org.springframework.context.annotation.Configuration;
10-
import org.springframework.context.annotation.Role;
119

1210
/**
1311
* {@code @Configuration} class that registers a {@link AzureServiceClientBuilderFactoryPostProcessor}
@@ -16,15 +14,13 @@
1614
* @since 5.19.0
1715
*/
1816
@Configuration(proxyBeanMethods = false)
19-
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
2017
class AzureServiceClientBuilderFactoryConfiguration {
2118

2219
/**
2320
* The BeanPostProcessor to apply the default token credential and resolver to all service client builder factories.
2421
* @return the BPP.
2522
*/
2623
@Bean
27-
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
2824
static AzureServiceClientBuilderFactoryPostProcessor builderFactoryBeanPostProcessor() {
2925
return new AzureServiceClientBuilderFactoryPostProcessor();
3026
}

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfiguration.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@
1414
import org.slf4j.LoggerFactory;
1515
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
1616
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
17+
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
1718
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
1819
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
1920
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
2021
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
2122
import org.springframework.context.annotation.Bean;
2223
import org.springframework.context.annotation.Configuration;
23-
import org.springframework.context.annotation.Import;
2424
import org.springframework.core.env.Environment;
2525
import org.springframework.kafka.core.KafkaTemplate;
2626

@@ -36,7 +36,6 @@
3636
@ConditionalOnClass(KafkaTemplate.class)
3737
@ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.kafka.enabled", havingValue = "true", matchIfMissing = true)
3838
@AutoConfigureAfter({ AzureEventHubsAutoConfiguration.class, AzureEventHubsResourceManagerAutoConfiguration.class })
39-
@Import(KafkaPropertiesConfiguration.class)
4039
public class AzureEventHubsKafkaAutoConfiguration {
4140

4241
private static final Logger LOGGER = LoggerFactory.getLogger(AzureEventHubsKafkaAutoConfiguration.class);
@@ -56,4 +55,10 @@ StaticConnectionStringProvider<AzureServiceType.EventHubs> eventHubsKafkaConnect
5655

5756
return new StaticConnectionStringProvider<>(AzureServiceType.EVENT_HUBS, connectionString);
5857
}
58+
59+
@Bean
60+
@ConditionalOnBean(value = AzureServiceType.EventHubs.class, parameterizedContainer = ServiceConnectionStringProvider.class)
61+
static KafkaPropertiesBeanPostProcessor kafkaPropertiesBeanPostProcessor() {
62+
return new KafkaPropertiesBeanPostProcessor();
63+
}
5964
}

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesBeanPostProcessor.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
1010
import org.springframework.beans.BeansException;
11+
import org.springframework.beans.factory.ObjectProvider;
1112
import org.springframework.beans.factory.config.BeanPostProcessor;
1213
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
14+
import org.springframework.context.ApplicationContext;
15+
import org.springframework.context.ApplicationContextAware;
16+
import org.springframework.core.ResolvableType;
1317

1418
import java.util.ArrayList;
1519
import java.util.Collections;
@@ -22,16 +26,12 @@
2226
/**
2327
* {@link BeanPostProcessor} for {@link KafkaProperties} to configure connection string credentials.
2428
*/
25-
class KafkaPropertiesBeanPostProcessor implements BeanPostProcessor {
29+
class KafkaPropertiesBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware {
2630

2731
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPropertiesBeanPostProcessor.class);
2832
private static final String SASL_CONFIG_VALUE = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";%s";
2933

30-
private final ServiceConnectionStringProvider<AzureServiceType.EventHubs> connectionStringProvider;
31-
32-
KafkaPropertiesBeanPostProcessor(ServiceConnectionStringProvider<AzureServiceType.EventHubs> connectionStringProvider) {
33-
this.connectionStringProvider = connectionStringProvider;
34-
}
34+
private ApplicationContext applicationContext;
3535

3636
@Override
3737
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
@@ -43,8 +43,17 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
4343
+ " instead, which can be set as spring.kafka.boostrap-servers=EventHubsNamespacesFQDN:9093.");
4444

4545
KafkaProperties kafkaProperties = (KafkaProperties) bean;
46-
String connectionString = connectionStringProvider.getConnectionString();
46+
ResolvableType provider = ResolvableType.forClassWithGenerics(ServiceConnectionStringProvider.class, AzureServiceType.EventHubs.class);
47+
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.EventHubs>> beanProvider = applicationContext.getBeanProvider(provider);
48+
49+
ServiceConnectionStringProvider<AzureServiceType.EventHubs> connectionStringProvider = beanProvider.getIfAvailable();
50+
if (connectionStringProvider == null) {
51+
LOGGER.debug("Cannot find a bean of type ServiceConnectionStringProvider<AzureServiceType.EventHubs>, "
52+
+ "Spring Cloud Azure will skip performing JAAS enhancements on the KafkaProperties bean.");
53+
return bean;
54+
}
4755

56+
String connectionString = connectionStringProvider.getConnectionString();
4857
String bootstrapServer = new EventHubsConnectionString(connectionString).getFullyQualifiedNamespace() + ":9093";
4958
kafkaProperties.setBootstrapServers(new ArrayList<>(Collections.singletonList(bootstrapServer)));
5059
kafkaProperties.getProperties().put(SECURITY_PROTOCOL_CONFIG, SASL_SSL.name());
@@ -55,4 +64,8 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
5564
return bean;
5665
}
5766

67+
@Override
68+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
69+
this.applicationContext = applicationContext;
70+
}
5871
}

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesConfiguration.java

Lines changed: 0 additions & 35 deletions
This file was deleted.

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jdbc/AzureJdbcAutoConfiguration.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
package com.azure.spring.cloud.autoconfigure.implementation.jdbc;
44

55
import com.azure.identity.extensions.implementation.template.AzureAuthenticationTemplate;
6-
import org.springframework.beans.factory.config.BeanDefinition;
76
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
87
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
98
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
@@ -13,7 +12,6 @@
1312
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
1413
import org.springframework.context.annotation.Bean;
1514
import org.springframework.context.annotation.Configuration;
16-
import org.springframework.context.annotation.Role;
1715

1816

1917
/**
@@ -23,15 +21,13 @@
2321
* @since 4.5.0
2422
*/
2523
@Configuration(proxyBeanMethods = false)
26-
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
2724
@ConditionalOnBean(DataSourceProperties.class)
2825
@ConditionalOnClass(AzureAuthenticationTemplate.class)
2926
@AutoConfigureAfter(DataSourceAutoConfiguration.class)
3027
public class AzureJdbcAutoConfiguration {
3128

3229
@Bean
3330
@ConditionalOnMissingBean
34-
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
3531
static JdbcPropertiesBeanPostProcessor jdbcConfigurationPropertiesBeanPostProcessor() {
3632
return new JdbcPropertiesBeanPostProcessor();
3733
}

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/AzureServiceBusJmsPropertiesBeanPostProcessor.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,27 @@
99
import org.springframework.beans.BeansException;
1010
import org.springframework.beans.factory.ObjectProvider;
1111
import org.springframework.beans.factory.config.BeanPostProcessor;
12+
import org.springframework.context.ApplicationContext;
13+
import org.springframework.context.ApplicationContextAware;
14+
import org.springframework.core.ResolvableType;
1215
import org.springframework.util.StringUtils;
1316

14-
class AzureServiceBusJmsPropertiesBeanPostProcessor implements BeanPostProcessor {
17+
class AzureServiceBusJmsPropertiesBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware {
1518

16-
private final ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders;
19+
private ApplicationContext applicationContext;
1720

18-
AzureServiceBusJmsPropertiesBeanPostProcessor(ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders) {
19-
this.connectionStringProviders = connectionStringProviders;
21+
@Override
22+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
23+
this.applicationContext = applicationContext;
2024
}
2125

2226
@Override
2327
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
2428
if (bean instanceof AzureServiceBusJmsProperties) {
2529
AzureServiceBusJmsProperties jmsProperties = (AzureServiceBusJmsProperties) bean;
2630
if (!StringUtils.hasText(jmsProperties.getConnectionString())) {
31+
ResolvableType providerType = ResolvableType.forClassWithGenerics(ServiceConnectionStringProvider.class, AzureServiceType.ServiceBus.class);
32+
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders = applicationContext.getBeanProvider(providerType);
2733
connectionStringProviders.ifAvailable(provider -> jmsProperties.setConnectionString(provider.getConnectionString()));
2834
}
2935
}

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/jms/ServiceBusJmsPropertiesConfiguration.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,9 @@
55

66
import com.azure.spring.cloud.autoconfigure.implementation.condition.ConditionalOnMissingProperty;
77
import com.azure.spring.cloud.autoconfigure.implementation.jms.properties.AzureServiceBusJmsProperties;
8-
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
9-
import com.azure.spring.cloud.core.service.AzureServiceType;
10-
import org.springframework.beans.factory.ObjectProvider;
11-
import org.springframework.beans.factory.config.BeanDefinition;
128
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
139
import org.springframework.context.annotation.Bean;
1410
import org.springframework.context.annotation.Configuration;
15-
import org.springframework.context.annotation.Role;
1611

1712
/**
1813
* {@code @Configuration} class that registers a {@link AzureServiceBusJmsPropertiesBeanPostProcessor}
@@ -21,15 +16,12 @@
2116
* @since 5.19.0
2217
*/
2318
@Configuration(proxyBeanMethods = false)
24-
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
2519
class ServiceBusJmsPropertiesConfiguration {
2620

2721
@Bean
2822
@ConditionalOnMissingBean
29-
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
3023
@ConditionalOnMissingProperty(prefix = "spring.jms.servicebus", name = "connection-string")
31-
static AzureServiceBusJmsPropertiesBeanPostProcessor azureServiceBusJmsPropertiesBeanPostProcessor(
32-
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.ServiceBus>> connectionStringProviders) {
33-
return new AzureServiceBusJmsPropertiesBeanPostProcessor(connectionStringProviders);
24+
static AzureServiceBusJmsPropertiesBeanPostProcessor azureServiceBusJmsPropertiesBeanPostProcessor() {
25+
return new AzureServiceBusJmsPropertiesBeanPostProcessor();
3426
}
3527
}

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaPropertiesBeanPostProcessor.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@
1313
import org.apache.kafka.common.requests.ApiVersionsRequest;
1414
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
1515
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
1617
import org.springframework.beans.BeansException;
18+
import org.springframework.beans.factory.ObjectProvider;
1719
import org.springframework.beans.factory.config.BeanPostProcessor;
1820
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
21+
import org.springframework.context.ApplicationContext;
22+
import org.springframework.context.ApplicationContextAware;
1923
import org.springframework.util.ReflectionUtils;
2024

2125
import java.lang.reflect.InvocationTargetException;
@@ -39,14 +43,16 @@
3943
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
4044
import static org.springframework.util.StringUtils.delimitedListToStringArray;
4145

42-
abstract class AbstractKafkaPropertiesBeanPostProcessor<T> implements BeanPostProcessor {
46+
abstract class AbstractKafkaPropertiesBeanPostProcessor<T> implements BeanPostProcessor, ApplicationContextAware {
4347

48+
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractKafkaPropertiesBeanPostProcessor.class);
4449
static final String SECURITY_PROTOCOL_CONFIG_SASL = SASL_SSL.name();
4550
static final String SASL_MECHANISM_OAUTH = OAUTHBEARER_MECHANISM;
4651
static final String AZURE_CONFIGURED_JAAS_OPTIONS_KEY = "azure.configured";
4752
static final String AZURE_CONFIGURED_JAAS_OPTIONS_VALUE = "true";
4853
static final String SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH =
4954
KafkaOAuth2AuthenticateCallbackHandler.class.getName();
55+
protected ApplicationContext applicationContext;
5056
protected static final PropertyMapper PROPERTY_MAPPER = new PropertyMapper();
5157
private static final Map<String, String> KAFKA_OAUTH_CONFIGS;
5258
private static final String LOG_OAUTH_DETAILED_PROPERTY_CONFIGURE = "OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.";
@@ -63,18 +69,21 @@ abstract class AbstractKafkaPropertiesBeanPostProcessor<T> implements BeanPostPr
6369
KAFKA_OAUTH_CONFIGS = Collections.unmodifiableMap(configs);
6470
}
6571

66-
private final AzureGlobalProperties azureGlobalProperties;
67-
68-
AbstractKafkaPropertiesBeanPostProcessor(AzureGlobalProperties azureGlobalProperties) {
69-
this.azureGlobalProperties = azureGlobalProperties;
70-
}
72+
private AzureGlobalProperties azureGlobalProperties;
7173

7274
@SuppressWarnings("unchecked")
7375
@Override
7476
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
7577
if (needsPostProcess(bean)) {
76-
T properties = (T) bean;
78+
ObjectProvider<AzureGlobalProperties> beanProvider = applicationContext.getBeanProvider(AzureGlobalProperties.class);
79+
azureGlobalProperties = beanProvider.getIfAvailable();
80+
if (azureGlobalProperties == null) {
81+
LOGGER.debug("Cannot find a bean of type AzureGlobalProperties, "
82+
+ "Spring Cloud Azure will skip performing JAAS enhancements on the {} bean.", beanName);
83+
return bean;
84+
}
7785

86+
T properties = (T) bean;
7887
replaceAzurePropertiesWithJaas(getMergedProducerProperties(properties), getRawProducerProperties(properties));
7988
replaceAzurePropertiesWithJaas(getMergedConsumerProperties(properties), getRawConsumerProperties(properties));
8089
replaceAzurePropertiesWithJaas(getMergedAdminProperties(properties), getRawAdminProperties(properties));
@@ -129,6 +138,11 @@ public Object postProcessBeforeInitialization(Object bean, String beanName) thro
129138

130139
protected abstract Logger getLogger();
131140

141+
@Override
142+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
143+
this.applicationContext = applicationContext;
144+
}
145+
132146
/**
133147
* Process Kafka Spring properties for any customized operations.
134148
* @param properties the Kafka Spring properties

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,12 @@
22
// Licensed under the MIT License.
33
package com.azure.spring.cloud.autoconfigure.implementation.kafka;
44

5-
import com.azure.spring.cloud.autoconfigure.implementation.context.properties.AzureGlobalProperties;
6-
import org.springframework.beans.factory.config.BeanDefinition;
75
import org.springframework.beans.factory.config.BeanPostProcessor;
86
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
97
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
108
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
119
import org.springframework.context.annotation.Bean;
1210
import org.springframework.context.annotation.Configuration;
13-
import org.springframework.context.annotation.Role;
1411
import org.springframework.kafka.core.KafkaTemplate;
1512

1613
import static com.azure.spring.cloud.autoconfigure.implementation.context.AzureContextUtils.PASSWORDLESS_KAFKA_PROPERTIES_BEAN_POST_PROCESSOR_BEAN_NAME;
@@ -22,14 +19,12 @@
2219
* @since 4.3.0
2320
*/
2421
@Configuration(proxyBeanMethods = false)
25-
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
2622
@ConditionalOnClass(KafkaTemplate.class)
2723
@ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.kafka.enabled", havingValue = "true", matchIfMissing = true)
2824
public class AzureEventHubsKafkaOAuth2AutoConfiguration {
2925

3026
@Bean(PASSWORDLESS_KAFKA_PROPERTIES_BEAN_POST_PROCESSOR_BEAN_NAME)
31-
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
32-
static BeanPostProcessor kafkaPropertiesBeanPostProcessor(AzureGlobalProperties properties) {
33-
return new KafkaPropertiesBeanPostProcessor(properties);
27+
static BeanPostProcessor kafkaPropertiesBeanPostProcessor() {
28+
return new KafkaPropertiesBeanPostProcessor();
3429
}
3530
}

0 commit comments

Comments
 (0)