Skip to content

Commit c81f596

Browse files
authored
Allow broker session acquire timeout to propagate to acceptSession sync api (Azure#42838)
* Allow broker session acquire timeout to propagate to acceptSession sync api * Cleanup the java-doc, code comments * cleanup code, and doc * changelog, comment cleanup * add feature entry * readability improvements * adding unit tests for ServiceBusSessionAcquirer * Improve existing behaviour of throwing IllegalStateException by including cause as TimeoutException * spot check, minor log tweak
1 parent cd80cf8 commit c81f596

File tree

8 files changed

+423
-65
lines changed

8 files changed

+423
-65
lines changed

sdk/servicebus/azure-messaging-servicebus/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
### Features Added
66

77
- Enabled RequestResponseChannelCache (CBS, Management channel cache) and ReactorSessionCache by default. ([42641](https://github.com/Azure/azure-sdk-for-java/pull/42641))
8+
- Improves the synchronous `acceptNextSession` and `acceptSession` APIs of `ServiceBusSessionReceiverClient` to reduce the chances of the broker holding session lock for some time when client-side timeout occurs. ([42838](https://github.com/Azure/azure-sdk-for-java/pull/42838))
89

910
### Breaking Changes
1011

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusClientBuilder.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2108,8 +2108,12 @@ SessionsMessagePump buildPumpForProcessor(ClientLogger logger,
21082108
final ConnectionCacheWrapper connectionCacheWrapper = new ConnectionCacheWrapper(
21092109
getOrCreateConnectionCache(messageSerializer, meter, useSessionChannelCache));
21102110

2111-
final ServiceBusSessionAcquirer sessionAcquirer = new ServiceBusSessionAcquirer(logger, clientIdentifier,
2112-
entityPath, entityType, receiveMode, retryOptions.getTryTimeout(), connectionCacheWrapper);
2111+
// For session enabled ServiceBusProcessorClient, the session acquire should be retried if broker timeout
2112+
// due to no session (see the type ServiceBusSessionAcquirer).
2113+
final boolean timeoutRetryDisabled = false;
2114+
final ServiceBusSessionAcquirer sessionAcquirer
2115+
= new ServiceBusSessionAcquirer(logger, clientIdentifier, entityPath, entityType, receiveMode,
2116+
retryOptions.getTryTimeout(), timeoutRetryDisabled, connectionCacheWrapper);
21132117

21142118
final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(
21152119
createTracer(clientOptions), meter, connectionCacheWrapper.getFullyQualifiedNamespace(), entityPath,
@@ -2138,9 +2142,8 @@ SessionsMessagePump buildPumpForProcessor(ClientLogger logger,
21382142
* queueName()} or {@link #topicName(String) topicName()}, respectively.
21392143
*/
21402144
public ServiceBusSessionReceiverAsyncClient buildAsyncClient() {
2141-
final boolean isSessionReactorReceiveOnV2
2142-
= v2StackSupport.isSessionReactorAsyncReceiveEnabled(configuration);
2143-
return buildAsyncClient(true, isSessionReactorReceiveOnV2);
2145+
final boolean isV2 = v2StackSupport.isSessionReactorAsyncReceiveEnabled(configuration);
2146+
return buildAsyncClient(false, isV2);
21442147
}
21452148

21462149
/**
@@ -2157,19 +2160,27 @@ public ServiceBusSessionReceiverAsyncClient buildAsyncClient() {
21572160
* queueName()} or {@link #topicName(String) topicName()}, respectively.
21582161
*/
21592162
public ServiceBusSessionReceiverClient buildClient() {
2160-
final boolean isSessionSyncReceiveOnV2 = v2StackSupport.isSessionSyncReceiveEnabled(configuration);
2163+
final boolean isV2 = v2StackSupport.isSessionSyncReceiveEnabled(configuration);
21612164
final boolean isPrefetchDisabled = prefetchCount == 0;
2162-
return new ServiceBusSessionReceiverClient(buildAsyncClient(false, isSessionSyncReceiveOnV2),
2163-
isPrefetchDisabled, MessageUtils.getTotalTimeout(retryOptions));
2165+
return new ServiceBusSessionReceiverClient(buildAsyncClient(true, isV2), isPrefetchDisabled,
2166+
MessageUtils.getTotalTimeout(retryOptions));
21642167
}
21652168

2166-
// Common function to build Session-Enabled Receiver-Client - For Async[Reactor]Client Or to back SyncClient.
2167-
private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed, boolean isV2) {
2169+
/**
2170+
* Common function to build a {@link ServiceBusSessionReceiverAsyncClient} which is either used directly
2171+
* as asynchronous client or to back a synchronous {@link ServiceBusSessionReceiverClient}.
2172+
*
2173+
* @param isForSyncMode {@code true} if the client is build to back synchronous client.
2174+
* @param isV2 whether V2 stack should be enabled.
2175+
*
2176+
* @return async client to obtain session from session enabled entity.
2177+
*/
2178+
private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isForSyncMode, boolean isV2) {
21682179
final MessagingEntityType entityType
21692180
= validateEntityPaths(connectionStringEntityName, topicName, queueName);
21702181
final String entityPath = getEntityPath(entityType, queueName, topicName, subscriptionName, SubQueue.NONE);
21712182

2172-
if (!isAutoCompleteAllowed && enableAutoComplete) {
2183+
if (isForSyncMode && enableAutoComplete) {
21732184
LOGGER.warning(
21742185
"'enableAutoComplete' is not supported in synchronous client except through callback receive.");
21752186
enableAutoComplete = false;
@@ -2208,12 +2219,16 @@ private ServiceBusSessionReceiverAsyncClient buildAsyncClient(boolean isAutoComp
22082219
clientIdentifier = UUID.randomUUID().toString();
22092220
}
22102221

2222+
// For ServiceBusSessionReceiverClient, the session acquire should not be retried if broker timeout due to
2223+
// no session (see the type ServiceBusSessionAcquirer), such timeout are propagated to the acceptNextSession() caller.
2224+
final boolean timeoutRetryDisabled = isV2 && isForSyncMode;
2225+
22112226
final ServiceBusReceiverInstrumentation instrumentation = new ServiceBusReceiverInstrumentation(
22122227
createTracer(clientOptions), meter, connectionCacheWrapper.getFullyQualifiedNamespace(), entityPath,
22132228
subscriptionName, ReceiverKind.ASYNC_RECEIVER);
22142229
return new ServiceBusSessionReceiverAsyncClient(connectionCacheWrapper.getFullyQualifiedNamespace(),
22152230
entityPath, entityType, receiverOptions, connectionCacheWrapper, instrumentation, messageSerializer,
2216-
onClientClose, clientIdentifier);
2231+
onClientClose, clientIdentifier, timeoutRetryDisabled);
22172232
}
22182233
}
22192234

sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusSessionAcquirer.java

Lines changed: 164 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33

44
package com.azure.messaging.servicebus;
55

6+
import com.azure.core.amqp.AmqpClientOptions;
7+
import com.azure.core.amqp.AmqpRetryOptions;
68
import com.azure.core.amqp.exception.AmqpErrorCondition;
79
import com.azure.core.amqp.exception.AmqpException;
810
import com.azure.core.amqp.implementation.StringUtil;
11+
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler2;
912
import com.azure.core.util.logging.ClientLogger;
1013
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
1114
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
@@ -26,26 +29,59 @@
2629

2730
import static com.azure.core.amqp.implementation.ClientConstants.ENTITY_PATH_KEY;
2831

32+
/**
33+
* A type to acquire a session from a session enabled Service Bus entity. If the broker cannot find a session within
34+
* the timeout, it returns a timeout error. The acquirer retries on timeout unless disabled via {@code timeoutRetryDisabled}.
35+
* <p>
36+
* The {@code timeoutRetryDisabled} is true when the session acquirer is used for synchronous {@link ServiceBusSessionReceiverClient}.
37+
* This allows the synchronous 'acceptNextSession()' API to propagate the broker timeout error if no session is available.
38+
* The 'acceptNextSession()' has a client-side timeout that is set slightly longer than the broker's timeout, ensuring
39+
* the broker's timeout usually triggers first (the client-side timeout still helps in case of unexpected hanging).
40+
* For ServiceBusSessionReceiverClient, if the library retries session-acquire on broker timeout, the client-side sync
41+
* timeout might expire while waiting. When client-side timeout expires like this, library cannot cancel the outstanding
42+
* acquire request to the broker, which means, the broker may still lock a session for an acquire request that nobody
43+
* is waiting on, resulting that session to be unavailable for any other 'acceptNextSession()' until initial broker
44+
* lock expires. Hence, library propagate the broker timeout error in ServiceBusSessionReceiverClient case.
45+
* </p>
46+
* <p>
47+
* For session enabled {@link ServiceBusProcessorClient} and {@link ServiceBusSessionReceiverAsyncClient},
48+
* the {@code timeoutRetryDisabled} is false, hence session acquirer retries on broker timeout.
49+
* </p>
50+
*/
2951
final class ServiceBusSessionAcquirer {
3052
private static final String TRACKING_ID_KEY = "trackingId";
3153
private final ClientLogger logger;
3254
private final String identifier;
3355
private final String entityPath;
3456
private final MessagingEntityType entityType;
35-
private final Duration sessionActiveTimeout;
57+
private final Duration tryTimeout;
58+
private final boolean timeoutRetryDisabled;
3659
private final ServiceBusReceiveMode receiveMode;
3760
private final ConnectionCacheWrapper connectionCacheWrapper;
3861
private final Mono<ServiceBusManagementNode> sessionManagement;
3962

63+
/**
64+
* Creates ServiceBusSessionAcquirer to acquire session from a session enabled entity.
65+
*
66+
* @param logger the logger to use.
67+
* @param identifier the client identifier, currently callsites uses {@link AmqpClientOptions#getIdentifier()}.
68+
* @param entityPath path to the session enabled entity.
69+
* @param entityType the entity type (e.g., queue, topic subscription)
70+
* @param receiveMode the mode of receiving messages from the acquired session.
71+
* @param tryTimeout the try timeout, currently callsites uses {@link AmqpRetryOptions#getTryTimeout()}}.
72+
* @param timeoutRetryDisabled if session acquire retry should be disabled when broker timeout on no session.
73+
* @param connectionCacheWrapper the connection cache.
74+
*/
4075
ServiceBusSessionAcquirer(ClientLogger logger, String identifier, String entityPath, MessagingEntityType entityType,
41-
ServiceBusReceiveMode receiveMode, Duration sessionActiveTimeout,
76+
ServiceBusReceiveMode receiveMode, Duration tryTimeout, boolean timeoutRetryDisabled,
4277
ConnectionCacheWrapper connectionCacheWrapper) {
4378
assert connectionCacheWrapper.isV2();
4479
this.logger = logger;
4580
this.identifier = identifier;
4681
this.entityPath = entityPath;
4782
this.entityType = entityType;
48-
this.sessionActiveTimeout = sessionActiveTimeout;
83+
this.tryTimeout = tryTimeout;
84+
this.timeoutRetryDisabled = timeoutRetryDisabled;
4985
this.receiveMode = receiveMode;
5086
this.connectionCacheWrapper = connectionCacheWrapper;
5187
this.sessionManagement = connectionCacheWrapper.getConnection()
@@ -78,53 +114,135 @@ Mono<Session> acquire(String sessionId) {
78114
return acquireIntern(sessionId);
79115
}
80116

117+
/**
118+
* Tries to acquire a session from the broker by opening an AMQP receive link. When the acquire attempt timeout then
119+
* the api will retry if {@code timeoutRetryDisabled} is set to {@code false}. In case an error needs to be propagated,
120+
* api publishes the error using bounded-elastic Thread.
121+
*
122+
* @param sessionId the unique id of the specific session to acquire, a value {@code null} means acquire any free session.
123+
* @return A Mono that completes with the acquired session, the Mono can emit {@link AmqpException} if the acquirer
124+
* is already disposed or {@link TimeoutException} if session acquire timeouts and {@code timeoutRetryDisabled} set to true.
125+
*/
81126
private Mono<Session> acquireIntern(String sessionId) {
82-
return Mono
83-
.defer(() -> createSessionReceiveLink(sessionId).flatMap(sessionLink -> sessionLink.getSessionProperties() // Await for sessionLink to "ACTIVE" then reads its properties
84-
.flatMap(sessionProperties -> {
85-
return Mono.just(new Session(sessionLink, sessionProperties, sessionManagement));
86-
})))
87-
.timeout(sessionActiveTimeout)
88-
.retryWhen(Retry.from(retrySignals -> retrySignals.flatMap(signal -> {
89-
final Throwable failure = signal.failure();
90-
logger.atInfo()
91-
.addKeyValue(ENTITY_PATH_KEY, entityPath)
92-
.addKeyValue("attempt", signal.totalRetriesInARow())
93-
.log(sessionId == null
94-
? "Error occurred while getting unnamed session."
95-
: "Error occurred while getting session " + sessionId, failure);
96-
97-
if (failure instanceof TimeoutException) {
98-
return Mono.delay(Duration.ZERO);
99-
} else if (failure instanceof AmqpException
100-
&& ((AmqpException) failure).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) {
101-
// The link closed remotely with 'Detach {errorCondition:com.microsoft:timeout}' frame because
102-
// the broker waited for N seconds (60 sec hard limit today) but there was no free or new session.
103-
//
104-
// Given N seconds elapsed since the last session acquire attempt, request for a session on
105-
// the 'parallel' Scheduler and free the QPid thread for other IO.
106-
//
107-
return Mono.delay(Duration.ZERO);
108-
} else {
109-
final long id = System.nanoTime();
110-
logger.atInfo().addKeyValue(TRACKING_ID_KEY, id).log("Unable to acquire a session.", failure);
111-
// The link-endpoint-state publisher will emit error on the QPid Thread, that is a non-blocking Thread,
112-
// publish the error on the (blockable) bounded-elastic thread to free QPid thread and to allow
113-
// any blocking operation that downstream may do.
114-
return Mono.<Long>error(failure)
115-
.publishOn(Schedulers.boundedElastic())
116-
.doOnError(e -> logger.atInfo()
117-
.addKeyValue(TRACKING_ID_KEY, id)
118-
.log("Emitting the error signal received for session acquire attempt.", e));
127+
if (timeoutRetryDisabled) {
128+
return acquireSession(sessionId).onErrorResume(t -> {
129+
if (isBrokerTimeoutError(t)) {
130+
// map the broker timeout to application-friendly TimeoutException.
131+
final Throwable e = new TimeoutException("com.microsoft:timeout").initCause(t);
132+
return publishError(sessionId, e, false);
119133
}
120-
})));
134+
return publishError(sessionId, t, true);
135+
});
136+
} else {
137+
return acquireSession(sessionId).timeout(tryTimeout)
138+
.retryWhen(Retry.from(signals -> signals.flatMap(signal -> {
139+
final Throwable t = signal.failure();
140+
if (isTimeoutError(t)) {
141+
logger.atVerbose()
142+
.addKeyValue(ENTITY_PATH_KEY, entityPath)
143+
.addKeyValue("attempt", signal.totalRetriesInARow())
144+
.log("Timeout while acquiring session '{}'.", sessionName(sessionId), t);
145+
// retry session acquire using Schedulers.parallel() and free the QPid thread.
146+
return Mono.delay(Duration.ZERO);
147+
}
148+
return publishError(sessionId, t, true);
149+
})));
150+
}
151+
}
152+
153+
/**
154+
* Tries to acquire a session from the broker by opening an AMQP receive link.
155+
*
156+
* @param sessionId the unique id of the session to acquire, a value {@code null} means acquire any free session.
157+
*
158+
* @return the acquired session.
159+
*/
160+
private Mono<Session> acquireSession(String sessionId) {
161+
return Mono.defer(() -> {
162+
final Mono<ServiceBusReceiveLink> createLink = connectionCacheWrapper.getConnection()
163+
.flatMap(connection -> connection.createReceiveLink(linkName(sessionId), entityPath, receiveMode, null,
164+
entityType, identifier, sessionId));
165+
return createLink.flatMap(link -> {
166+
// ServiceBusReceiveLink::getSessionProperties() await for link to "ACTIVE" then reads its properties.
167+
return link.getSessionProperties()
168+
.flatMap(sessionProperties -> Mono.just(new Session(link, sessionProperties, sessionManagement)));
169+
});
170+
});
171+
}
172+
173+
/**
174+
* Publish the session acquire error using a bounded-elastic Thread.
175+
* <p>
176+
* The link-endpoint-state publisher ({@link ReceiveLinkHandler2#getEndpointStates()}) will emit error on the QPid
177+
* Thread, which is a non-block-able Thread. Publishing the error on the (block-able) bounded-elastic Thread will free
178+
* QPid Thread and to allow any blocking operation that downstream may do. If library do not publish in bounded-elastic
179+
* Thread and downstream happens to make a blocking call on non-block-able QPid Thread then reactor-core will error
180+
* - 'IllegalStateException(..*operation* are blocking, which is not supported in thread ...').
181+
* </p>
182+
*
183+
* @param sessionId the session id.
184+
* @param t the error to publish.
185+
* @param logAtInfo indicates if session acquire error should be logged at "info" level from the current thread, most of
186+
* the time, the broker timeout is the reason for session acquisition failure, in case, the acquire fails
187+
* due to any other reasons, that least expected error is logged in the "info" level.
188+
* @return a Mono that publishes the given error using a bounded-elastic Thread.
189+
* @param <T> the type
190+
*/
191+
private <T> Mono<T> publishError(String sessionId, Throwable t, boolean logAtInfo) {
192+
final long id = System.nanoTime();
193+
if (logAtInfo) {
194+
logger.atInfo()
195+
.addKeyValue(ENTITY_PATH_KEY, entityPath)
196+
.addKeyValue(TRACKING_ID_KEY, id)
197+
.log("Unable to acquire session '{}'.", sessionName(sessionId), t);
198+
}
199+
return Mono.<T>error(t)
200+
.publishOn(Schedulers.boundedElastic())
201+
.doOnError(ignored -> logger.atVerbose()
202+
.addKeyValue(TRACKING_ID_KEY, id)
203+
.log("Emitting session acquire error" + (logAtInfo ? "." : ": " + t.getMessage())));
121204
}
122205

123-
private Mono<ServiceBusReceiveLink> createSessionReceiveLink(String sessionId) {
124-
final String linkName = (sessionId != null) ? sessionId : StringUtil.getRandomString("session-");
125-
return connectionCacheWrapper.getConnection()
126-
.flatMap(connection -> connection.createReceiveLink(linkName, entityPath, receiveMode, null, entityType,
127-
identifier, sessionId));
206+
/**
207+
* Check if the given error is a remote link detach with '{errorCondition:com.microsoft:timeout}' indicating the broker
208+
* waited for N seconds (60 sec default) but there was no free or new session.
209+
*
210+
* @param t the error to test.
211+
* @return {@code true} if the error represents broker timeout.
212+
*/
213+
private static boolean isBrokerTimeoutError(Throwable t) {
214+
return t instanceof AmqpException
215+
&& ((AmqpException) t).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR;
216+
}
217+
218+
/**
219+
* Checks if the given error is a timeout error.
220+
*
221+
* @param t the error to test.
222+
* @return {@code true} if the error represents timeout.
223+
*/
224+
private static boolean isTimeoutError(Throwable t) {
225+
return t instanceof TimeoutException || isBrokerTimeoutError(t);
226+
}
227+
228+
/**
229+
* Obtain the name for the AMQP link that channels messages from a session.
230+
*
231+
* @param sessionId the session to channel messages from.
232+
* @return name for the AMQP link.
233+
*/
234+
private static String linkName(String sessionId) {
235+
return (sessionId != null) ? sessionId : StringUtil.getRandomString("session-");
236+
}
237+
238+
/**
239+
* Get the session name for simple local logging purpose.
240+
*
241+
* @param sessionId the unique id of the session or {@code null}, if session id is unknown.
242+
* @return the session name.
243+
*/
244+
private static String sessionName(String sessionId) {
245+
return sessionId == null ? "unnamed" : sessionId;
128246
}
129247

130248
/**

0 commit comments

Comments
 (0)