|
3 | 3 |
|
4 | 4 | package com.azure.spring.cloud.stream.binder.eventhubs.implementation.config; |
5 | 5 |
|
| 6 | +import com.azure.core.credential.TokenCredential; |
6 | 7 | import com.azure.messaging.eventhubs.CheckpointStore; |
7 | 8 | import com.azure.messaging.eventhubs.EventHubClientBuilder; |
8 | 9 | import com.azure.messaging.eventhubs.EventProcessorClientBuilder; |
9 | 10 | import com.azure.spring.cloud.autoconfigure.implementation.eventhubs.properties.AzureEventHubsProperties; |
| 11 | +import com.azure.spring.cloud.core.credential.AzureCredentialResolver; |
10 | 12 | import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer; |
11 | 13 | import com.azure.spring.cloud.resourcemanager.implementation.provisioning.EventHubsProvisioner; |
12 | 14 | import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProcessorFactoryCustomizer; |
|
19 | 21 | import com.azure.spring.cloud.stream.binder.eventhubs.core.implementation.provisioning.EventHubsChannelProvisioner; |
20 | 22 | import com.azure.spring.cloud.stream.binder.eventhubs.implementation.provisioning.EventHubsChannelResourceManagerProvisioner; |
21 | 23 | import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter; |
| 24 | +import com.azure.spring.messaging.eventhubs.core.DefaultEventHubsNamespaceProducerFactory; |
| 25 | +import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate; |
22 | 26 | import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode; |
23 | 27 | import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer; |
24 | 28 | import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties; |
|
32 | 36 | import org.springframework.cloud.stream.binder.Binder; |
33 | 37 | import org.springframework.context.annotation.Bean; |
34 | 38 | import org.springframework.context.annotation.Configuration; |
| 39 | +import org.springframework.test.util.ReflectionTestUtils; |
35 | 40 |
|
| 41 | +import java.lang.reflect.Field; |
36 | 42 | import java.time.Duration; |
37 | 43 | import java.time.Instant; |
38 | 44 |
|
@@ -93,9 +99,6 @@ void shouldConfigureArmChannelProvisionerWhenResourceManagerProvided() { |
93 | 99 | }); |
94 | 100 | } |
95 | 101 |
|
96 | | - // conniey: Remove warning suppression when azure-messaging-eventhubs is updated to 5.21.0. |
97 | | - // https://github.com/Azure/azure-sdk-for-java/issues/46359 |
98 | | - @SuppressWarnings("deprecation") |
99 | 102 | @Test |
100 | 103 | void testExtendedBindingPropertiesShouldBind() { |
101 | 104 | String producerConnectionString = String.format(CONNECTION_STRING_FORMAT, "fake-producer-namespace"); |
@@ -281,4 +284,116 @@ public void customize(Object builder) { |
281 | 284 | } |
282 | 285 | } |
283 | 286 |
|
| 287 | + @Test |
| 288 | + void testCustomTokenCredentialConfiguration() { |
| 289 | + String connectionString = String.format(CONNECTION_STRING_FORMAT, "test-namespace"); |
| 290 | + |
| 291 | + this.contextRunner |
| 292 | + .withConfiguration(AutoConfigurations.of(CustomTokenCredentialConfiguration.class)) |
| 293 | + .withPropertyValues( |
| 294 | + "spring.cloud.azure.eventhubs.connection-string=" + connectionString, |
| 295 | + "spring.cloud.azure.eventhubs.credential.token-credential-bean-name=customTokenCredential" |
| 296 | + ) |
| 297 | + .run(context -> { |
| 298 | + // Verify that the custom token credential bean exists |
| 299 | + assertThat(context).hasBean("customTokenCredential"); |
| 300 | + TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class); |
| 301 | + assertThat(customCredential).isNotNull(); |
| 302 | + |
| 303 | + // Verify that the properties contain the correct credential bean name |
| 304 | + AzureEventHubsProperties eventHubsProperties = context.getBean(AzureEventHubsProperties.class); |
| 305 | + assertThat(eventHubsProperties).isNotNull(); |
| 306 | + assertThat(eventHubsProperties.getCredential()).isNotNull(); |
| 307 | + assertThat(eventHubsProperties.getCredential().getTokenCredentialBeanName()) |
| 308 | + .as("The token-credential-bean-name property should be set to customTokenCredential") |
| 309 | + .isEqualTo("customTokenCredential"); |
| 310 | + |
| 311 | + // Verify the EventHubsProducerFactoryCustomizer is configured and can apply credential settings |
| 312 | + assertThat(context).hasSingleBean(EventHubsProducerFactoryCustomizer.class); |
| 313 | + EventHubsProducerFactoryCustomizer producerFactoryCustomizer = |
| 314 | + context.getBean(EventHubsProducerFactoryCustomizer.class); |
| 315 | + assertThat(producerFactoryCustomizer).isNotNull(); |
| 316 | + |
| 317 | + // Verify it's the default customizer with token credential resolver |
| 318 | + assertThat(producerFactoryCustomizer) |
| 319 | + .isInstanceOf(EventHubsBinderConfiguration.DefaultProducerFactoryCustomizer.class); |
| 320 | + }); |
| 321 | + } |
| 322 | + |
| 323 | + @Test |
| 324 | + void testCustomTokenCredentialConfigurationWithBinder() { |
| 325 | + String connectionString = String.format(CONNECTION_STRING_FORMAT, "test-namespace"); |
| 326 | + |
| 327 | + this.contextRunner |
| 328 | + .withConfiguration(AutoConfigurations.of(CustomTokenCredentialConfiguration.class)) |
| 329 | + .withBean(CheckpointStore.class, () -> mock(CheckpointStore.class)) |
| 330 | + .withPropertyValues( |
| 331 | + "spring.cloud.azure.eventhubs.connection-string=" + connectionString, |
| 332 | + "spring.cloud.azure.eventhubs.credential.token-credential-bean-name=customTokenCredential", |
| 333 | + "spring.cloud.azure.eventhubs.namespace=test-namespace" |
| 334 | + ) |
| 335 | + .run(context -> { |
| 336 | + assertThat(context).hasSingleBean(EventHubsMessageChannelBinder.class); |
| 337 | + EventHubsMessageChannelBinder binder = context.getBean(EventHubsMessageChannelBinder.class); |
| 338 | + |
| 339 | + TokenCredential customCredential = context.getBean("customTokenCredential", TokenCredential.class); |
| 340 | + AzureEventHubsProperties eventHubsProperties = context.getBean(AzureEventHubsProperties.class); |
| 341 | + |
| 342 | + // Test Producer Factory |
| 343 | + // Verify that credential resolver is properly configured in the producer factory created by binder |
| 344 | + EventHubsTemplate eventHubsTemplate = ReflectionTestUtils.invokeMethod(binder, "getEventHubTemplate"); |
| 345 | + assertThat(eventHubsTemplate).isNotNull(); |
| 346 | + |
| 347 | + DefaultEventHubsNamespaceProducerFactory producerFactory = (DefaultEventHubsNamespaceProducerFactory) ReflectionTestUtils.getField(eventHubsTemplate, "producerFactory"); |
| 348 | + assertThat(producerFactory).isNotNull(); |
| 349 | + |
| 350 | + // Use reflection to access the tokenCredentialResolver field in producer factory |
| 351 | + Field producerResolverField = producerFactory.getClass().getDeclaredField("tokenCredentialResolver"); |
| 352 | + producerResolverField.setAccessible(true); |
| 353 | + Object producerResolver = producerResolverField.get(producerFactory); |
| 354 | + assertThat(producerResolver) |
| 355 | + .as("TokenCredentialResolver should be configured in the binder's producer factory") |
| 356 | + .isNotNull(); |
| 357 | + |
| 358 | + // Verify that producer resolver can resolve the custom credential |
| 359 | + @SuppressWarnings("unchecked") |
| 360 | + AzureCredentialResolver<TokenCredential> typedProducerResolver = |
| 361 | + (AzureCredentialResolver<TokenCredential>) producerResolver; |
| 362 | + TokenCredential producerResolvedCredential = typedProducerResolver.resolve(eventHubsProperties); |
| 363 | + assertThat(producerResolvedCredential) |
| 364 | + .as("The resolved credential in binder's producer factory should be the customTokenCredential bean") |
| 365 | + .isSameAs(customCredential); |
| 366 | + |
| 367 | + // Test Processor Factory |
| 368 | + // Get the ProcessorFactory through reflection (it's created lazily in getProcessorFactory) |
| 369 | + Object processorFactory = ReflectionTestUtils.invokeMethod(binder, "getProcessorFactory"); |
| 370 | + assertThat(processorFactory).isNotNull(); |
| 371 | + |
| 372 | + // Use reflection to access the tokenCredentialResolver field in processor factory |
| 373 | + Field processorResolverField = processorFactory.getClass().getDeclaredField("tokenCredentialResolver"); |
| 374 | + processorResolverField.setAccessible(true); |
| 375 | + Object processorResolver = processorResolverField.get(processorFactory); |
| 376 | + assertThat(processorResolver) |
| 377 | + .as("TokenCredentialResolver should be configured in the binder's processor factory") |
| 378 | + .isNotNull(); |
| 379 | + |
| 380 | + // Verify that processor resolver can resolve the custom credential |
| 381 | + @SuppressWarnings("unchecked") |
| 382 | + AzureCredentialResolver<TokenCredential> typedProcessorResolver = |
| 383 | + (AzureCredentialResolver<TokenCredential>) processorResolver; |
| 384 | + TokenCredential processorResolvedCredential = typedProcessorResolver.resolve(eventHubsProperties); |
| 385 | + assertThat(processorResolvedCredential) |
| 386 | + .as("The resolved credential in binder's processor factory should be the customTokenCredential bean") |
| 387 | + .isSameAs(customCredential); |
| 388 | + }); |
| 389 | + } |
| 390 | + |
| 391 | + @Configuration |
| 392 | + public static class CustomTokenCredentialConfiguration { |
| 393 | + @Bean |
| 394 | + public TokenCredential customTokenCredential() { |
| 395 | + return mock(TokenCredential.class); |
| 396 | + } |
| 397 | + } |
| 398 | + |
284 | 399 | } |
0 commit comments