Skip to content

Commit bd0e04e

Browse files
authored
Add request-reply pattern of ServiceBusTemplate (Azure#44859)
1 parent a458671 commit bd0e04e

File tree

12 files changed

+806
-19
lines changed

12 files changed

+806
-19
lines changed

sdk/spring/CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,19 @@
11
# Release History
22

3+
## 5.22.0-beta.1 (Unreleased)
4+
5+
### Spring Cloud Azure Autoconfigure
6+
This section includes changes in `spring-cloud-azure-autoconfigure` module.
7+
8+
#### Features Added
9+
- Register a new bean `ServiceBusConsumerFactory` to support request-reply pattern of `ServiceBusTemplate`.
10+
11+
### Spring Messaging Azure Service Bus
12+
This section includes changes in the `spring-messaging-azure-servicebus` module.
13+
14+
#### Features Added
15+
- `ServiceBusTemplate` supports request-reply pattern.
16+
317
## 5.21.0 (2025-03-20)
418
- This release is compatible with Spring Boot 3.4.0-3.4.2, 3.3.0-3.3.6, 3.2.0-3.2.12, 3.1.0-3.1.12, 3.0.0-3.0.13. (Note: 3.4.x (x>2), 3.3.y (y>6) and 3.2.z (z>12) should be supported, but they aren't tested with this release.)
519
- This release is compatible with Spring Cloud 2024.0.0, 2023.0.0-2023.0.4, 2022.0.0-2022.0.5. (Note: 2024.0.x(x>0) and 2023.0.y (y>4) should be supported, but they aren't tested with this release.)

sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/AzureServiceBusMessagingAutoConfiguration.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
import com.azure.spring.messaging.PropertiesSupplier;
1818
import com.azure.spring.messaging.converter.AzureMessageConverter;
1919
import com.azure.spring.messaging.implementation.converter.ObjectMapperHolder;
20+
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceConsumerFactory;
2021
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProcessorFactory;
2122
import com.azure.spring.messaging.servicebus.core.DefaultServiceBusNamespaceProducerFactory;
23+
import com.azure.spring.messaging.servicebus.core.ServiceBusConsumerFactory;
2224
import com.azure.spring.messaging.servicebus.core.ServiceBusProcessorFactory;
2325
import com.azure.spring.messaging.servicebus.core.ServiceBusProducerFactory;
2426
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
27+
import com.azure.spring.messaging.servicebus.core.properties.ConsumerProperties;
2528
import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties;
2629
import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties;
2730
import com.azure.spring.messaging.servicebus.core.properties.ProducerProperties;
@@ -103,6 +106,27 @@ ServiceBusProcessorFactory defaultServiceBusNamespaceProcessorFactory(
103106
}
104107
}
105108

109+
@Configuration(proxyBeanMethods = false)
110+
static class ConsumerContainerConfiguration {
111+
112+
@Bean
113+
@ConditionalOnMissingBean
114+
ServiceBusConsumerFactory defaultServiceBusNamespaceConsumerFactory(
115+
NamespaceProperties properties,
116+
ObjectProvider<PropertiesSupplier<ConsumerIdentifier, ConsumerProperties>> suppliers,
117+
ObjectProvider<AzureTokenCredentialResolver> tokenCredentialResolvers,
118+
ObjectProvider<TokenCredential> defaultTokenCredentials,
119+
ObjectProvider<AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder>> customizers,
120+
ObjectProvider<AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder>> sessionReceiverCustomizers) {
121+
DefaultServiceBusNamespaceConsumerFactory factory = new DefaultServiceBusNamespaceConsumerFactory(properties, suppliers.getIfAvailable());
122+
factory.setDefaultCredential(defaultTokenCredentials.getIfAvailable());
123+
factory.setTokenCredentialResolver(tokenCredentialResolvers.getIfAvailable());
124+
customizers.orderedStream().forEach(factory::addCustomizer);
125+
sessionReceiverCustomizers.orderedStream().forEach(factory::addSessionReceiverCustomizer);
126+
return factory;
127+
}
128+
}
129+
106130
@Configuration(proxyBeanMethods = false)
107131
static class ServiceBusTemplateConfiguration {
108132

@@ -141,9 +165,10 @@ AzureMessageConverter<ServiceBusReceivedMessage, ServiceBusMessage> serviceBusMe
141165
@ConditionalOnMissingBean
142166
@ConditionalOnBean(ServiceBusProducerFactory.class)
143167
ServiceBusTemplate serviceBusTemplate(AzureServiceBusProperties properties,
144-
ServiceBusProducerFactory senderClientFactory,
168+
ServiceBusProducerFactory producerFactory,
169+
ServiceBusConsumerFactory consumerFactory,
145170
AzureMessageConverter<ServiceBusReceivedMessage, ServiceBusMessage> messageConverter) {
146-
ServiceBusTemplate serviceBusTemplate = new ServiceBusTemplate(senderClientFactory);
171+
ServiceBusTemplate serviceBusTemplate = new ServiceBusTemplate(producerFactory, consumerFactory);
147172
serviceBusTemplate.setMessageConverter(messageConverter);
148173
if (properties.getProducer().getEntityType() != null) {
149174
serviceBusTemplate.setDefaultEntityType(properties.getProducer().getEntityType());

sdk/spring/spring-cloud-azure-autoconfigure/src/test/java/com/azure/spring/cloud/autoconfigure/implementation/servicebus/AzureServiceBusMessagingAutoConfigurationTests.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@ void disableServiceBusShouldNotConfigure() {
2929
"spring.cloud.azure.servicebus.enabled=false",
3030
"spring.cloud.azure.servicebus.namespace=test-namespace"
3131
)
32-
.run(context -> assertThat(context).doesNotHaveBean(AzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class));
32+
.run(context -> {
33+
assertThat(context).doesNotHaveBean(AzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class);
34+
assertThat(context).doesNotHaveBean(AzureServiceBusMessagingAutoConfiguration.ConsumerContainerConfiguration.class);
35+
});
3336
}
3437

3538
@Test
@@ -39,18 +42,24 @@ void withoutServiceBusTemplateShouldNotConfigure() {
3942
.withPropertyValues(
4043
"spring.cloud.azure.servicebus.namespace=test-namespace"
4144
)
42-
.run(context -> assertThat(context).doesNotHaveBean(AzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class));
45+
.run(context -> {
46+
assertThat(context).doesNotHaveBean(AzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class);
47+
assertThat(context).doesNotHaveBean(AzureServiceBusMessagingAutoConfiguration.ConsumerContainerConfiguration.class);
48+
});
4349
}
4450

4551
@Test
4652
void withoutServiceBusConnectionShouldNotConfigure() {
4753
this.contextRunner
4854
.withUserConfiguration(AzureServiceBusPropertiesTestConfiguration.class)
49-
.run(context -> assertThat(context).doesNotHaveBean(AzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class));
55+
.run(context -> {
56+
assertThat(context).doesNotHaveBean(AzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class);
57+
assertThat(context).doesNotHaveBean(AzureServiceBusMessagingAutoConfiguration.ConsumerContainerConfiguration.class);
58+
});
5059
}
5160

5261
@Test
53-
void connectionInfoAndCheckpointStoreProvidedShouldConfigure() {
62+
void connectionInfoProvidedShouldConfigure() {
5463
this.contextRunner
5564
.withPropertyValues(
5665
"spring.cloud.azure.servicebus.connection-string=" + String.format(CONNECTION_STRING_FORMAT, "test-namespace")
@@ -59,6 +68,7 @@ void connectionInfoAndCheckpointStoreProvidedShouldConfigure() {
5968
.run(context -> {
6069
assertThat(context).hasSingleBean(ServiceBusProcessorFactory.class);
6170
assertThat(context).hasSingleBean(AzureServiceBusMessagingAutoConfiguration.ProcessorContainerConfiguration.class);
71+
assertThat(context).hasSingleBean(AzureServiceBusMessagingAutoConfiguration.ConsumerContainerConfiguration.class);
6272
});
6373
}
6474

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.messaging.servicebus.core;
5+
6+
import com.azure.core.credential.TokenCredential;
7+
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
8+
import com.azure.messaging.servicebus.ServiceBusSessionReceiverClient;
9+
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
10+
import com.azure.spring.cloud.core.credential.AzureCredentialResolver;
11+
import com.azure.spring.cloud.core.customizer.AzureServiceClientBuilderCustomizer;
12+
import com.azure.spring.cloud.core.implementation.util.AzureSpringIdentifier;
13+
import com.azure.spring.cloud.service.implementation.servicebus.factory.ServiceBusSessionReceiverClientBuilderFactory;
14+
import com.azure.spring.cloud.service.servicebus.properties.ServiceBusEntityType;
15+
import com.azure.spring.messaging.ConsumerIdentifier;
16+
import com.azure.spring.messaging.PropertiesSupplier;
17+
import com.azure.spring.messaging.servicebus.core.properties.ConsumerProperties;
18+
import com.azure.spring.messaging.servicebus.core.properties.NamespaceProperties;
19+
import com.azure.spring.messaging.servicebus.implementation.properties.merger.ConsumerPropertiesParentMerger;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
import org.springframework.beans.factory.DisposableBean;
23+
import org.springframework.lang.Nullable;
24+
25+
import java.util.ArrayList;
26+
import java.util.HashMap;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
31+
/**
32+
* The {@link ServiceBusConsumerFactory} implementation to produce new {@link ServiceBusSessionReceiverClient} instances
33+
* for provided {@link NamespaceProperties} and optional producer {@link PropertiesSupplier} on each
34+
* {@link #createReceiver} invocation.
35+
* <p>
36+
* {@link ServiceBusSessionReceiverClient} produced by this factory will share the same namespace level configuration, but
37+
* if a configuration entry is provided at both producer and namespace level, the producer level configuration will
38+
* take advantage.
39+
* </p>
40+
* @since 5.22.0
41+
*/
42+
public final class DefaultServiceBusNamespaceConsumerFactory implements ServiceBusConsumerFactory, DisposableBean {
43+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultServiceBusNamespaceConsumerFactory.class);
44+
private final List<Listener> listeners = new ArrayList<>();
45+
private final NamespaceProperties namespaceProperties;
46+
private final PropertiesSupplier<ConsumerIdentifier, ConsumerProperties> propertiesSupplier;
47+
private final Map<String, ServiceBusSessionReceiverClient> clients = new ConcurrentHashMap<>();
48+
private final ConsumerPropertiesParentMerger parentMerger = new ConsumerPropertiesParentMerger();
49+
private final List<AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder>> customizers = new ArrayList<>();
50+
private final List<AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder>> sessionReceiverCustomizers = new ArrayList<>();
51+
private final Map<String, List<AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder>>> dedicatedSessionReceiverCustomizers = new HashMap<>();
52+
private AzureCredentialResolver<TokenCredential> tokenCredentialResolver = null;
53+
private TokenCredential defaultCredential = null;
54+
55+
/**
56+
* Construct a factory with the provided namespace level configuration.
57+
* @param namespaceProperties the namespace properties
58+
*/
59+
public DefaultServiceBusNamespaceConsumerFactory(NamespaceProperties namespaceProperties) {
60+
this(namespaceProperties, key -> null);
61+
}
62+
63+
/**
64+
* Construct a factory with the provided namespace level configuration and producer {@link PropertiesSupplier}.
65+
* @param namespaceProperties the namespace properties.
66+
* @param supplier the {@link PropertiesSupplier} to supply {@link ConsumerProperties} for each queue/topic entity.
67+
*/
68+
public DefaultServiceBusNamespaceConsumerFactory(NamespaceProperties namespaceProperties,
69+
PropertiesSupplier<ConsumerIdentifier, ConsumerProperties> supplier) {
70+
this.namespaceProperties = namespaceProperties;
71+
this.propertiesSupplier = supplier == null ? key -> null : supplier;
72+
}
73+
74+
@Override
75+
public ServiceBusSessionReceiverClient createReceiver(String name) {
76+
return doCreateReceiver(name, null);
77+
}
78+
79+
@Override
80+
public ServiceBusSessionReceiverClient createReceiver(String name, ServiceBusEntityType entityType) {
81+
ConsumerProperties consumerProperties = this.propertiesSupplier.getProperties(new ConsumerIdentifier(name)) != null
82+
? this.propertiesSupplier.getProperties(new ConsumerIdentifier(name)) : new ConsumerProperties();
83+
if (entityType != null) {
84+
consumerProperties.setEntityType(entityType);
85+
}
86+
return doCreateReceiver(name, consumerProperties);
87+
}
88+
89+
@Override
90+
public void addListener(Listener listener) {
91+
this.listeners.add(listener);
92+
}
93+
94+
@Override
95+
public boolean removeListener(Listener listener) {
96+
return this.listeners.remove(listener);
97+
}
98+
99+
@Override
100+
public void destroy() {
101+
clients.forEach((name, receiver) -> {
102+
listeners.forEach(l -> l.receiverRemoved(name, receiver));
103+
receiver.close();
104+
});
105+
this.clients.clear();
106+
this.listeners.clear();
107+
}
108+
109+
private ServiceBusSessionReceiverClient doCreateReceiver(String name, @Nullable ConsumerProperties properties) {
110+
return clients.computeIfAbsent(name, entityName -> {
111+
ConsumerProperties consumerProperties = parentMerger.merge(properties, this.namespaceProperties);
112+
consumerProperties.setEntityName(entityName);
113+
114+
ServiceBusSessionReceiverClient receiverClient;
115+
if (Boolean.TRUE.equals(consumerProperties.getSessionEnabled())) {
116+
ServiceBusSessionReceiverClientBuilderFactory factory =
117+
new ServiceBusSessionReceiverClientBuilderFactory(consumerProperties, this.customizers);
118+
119+
factory.setDefaultTokenCredential(this.defaultCredential);
120+
factory.setTokenCredentialResolver(this.tokenCredentialResolver);
121+
factory.setSpringIdentifier(AzureSpringIdentifier.AZURE_SPRING_INTEGRATION_SERVICE_BUS);
122+
123+
ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder builder = factory.build();
124+
125+
customizeBuilder(name, builder);
126+
127+
builder.receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE);
128+
builder.disableAutoComplete();
129+
LOGGER.debug("Set RECEIVE_AND_DELETE mode for request-reply-pattern receiver client, "
130+
+ "'enableAutoComplete' is not needed in for RECEIVE_AND_DELETE mode.");
131+
132+
receiverClient = builder.buildClient();
133+
134+
this.listeners.forEach(l -> l.receiverAdded(entityName, receiverClient));
135+
} else {
136+
receiverClient = null;
137+
LOGGER.warn("Receiver client is null. Define a bean PropertiesSupplier<ConsumerIdentifier, ConsumerProperties> to enable consumer 'session-enabled'.");
138+
}
139+
return receiverClient;
140+
});
141+
}
142+
143+
/**
144+
* Set the token credential resolver.
145+
*
146+
* @param tokenCredentialResolver The token credential resolver.
147+
*/
148+
public void setTokenCredentialResolver(AzureCredentialResolver<TokenCredential> tokenCredentialResolver) {
149+
this.tokenCredentialResolver = tokenCredentialResolver;
150+
}
151+
152+
/**
153+
* Set the default credential for all clients generated from this factory.
154+
*
155+
* @param defaultCredential The default credential.
156+
*/
157+
public void setDefaultCredential(TokenCredential defaultCredential) {
158+
this.defaultCredential = defaultCredential;
159+
}
160+
161+
/**
162+
* Add a {@link ServiceBusClientBuilder}
163+
* customizer to customize the shared client builder created in this factory, it's used to build other sender clients.
164+
*
165+
* @param customizer the provided builder customizer.
166+
*/
167+
public void addCustomizer(AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder> customizer) {
168+
if (customizer == null) {
169+
LOGGER.debug("The provided '{}' customizer is null, will ignore it.", ServiceBusClientBuilder.class.getName());
170+
} else {
171+
this.customizers.add(customizer);
172+
}
173+
}
174+
175+
/**
176+
* Add a {@link ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder}
177+
* customizer to customize all the session clients created from this factory.
178+
* @param customizer the provided builder customizer.
179+
*/
180+
public void addSessionReceiverCustomizer(AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder> customizer) {
181+
if (customizer == null) {
182+
LOGGER.debug("The provided '{}' customizer is null, will ignore it.",
183+
ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder.class.getName());
184+
return;
185+
}
186+
this.sessionReceiverCustomizers.add(customizer);
187+
}
188+
189+
/**
190+
* Add a session receiver client builder customizer to customize the clients created from this factory with Service Bus
191+
* entity name of value {@code entityName}.
192+
*
193+
* @param entityName the entity name of the client.
194+
* @param customizer the provided customizer.
195+
*/
196+
public void addDedicatedSessionReceiverCustomizer(String entityName,
197+
AzureServiceClientBuilderCustomizer<ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder> customizer) {
198+
if (customizer == null) {
199+
LOGGER.debug("The provided '{}' dedicated customizer is null, will ignore it.",
200+
ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder.class.getName());
201+
} else {
202+
this.dedicatedSessionReceiverCustomizers
203+
.computeIfAbsent(entityName, key -> new ArrayList<>())
204+
.add(customizer);
205+
}
206+
}
207+
208+
private void customizeBuilder(String entityName, ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder builder) {
209+
this.sessionReceiverCustomizers.forEach(customizer -> customizer.customize(builder));
210+
this.dedicatedSessionReceiverCustomizers.getOrDefault(entityName, new ArrayList<>())
211+
.forEach(customizer -> customizer.customize(builder));
212+
}
213+
214+
}

0 commit comments

Comments
 (0)