Skip to content

Commit 2271e49

Browse files
Make the accept session concurrency dynamic (Azure#23356)
* Make the accept session concurrency dynamic * PR FB
1 parent 31d7bde commit 2271e49

File tree

13 files changed

+71
-88
lines changed

13 files changed

+71
-88
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpClient.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ internal AmqpClient(
9999
credential,
100100
options.TransportType,
101101
options.WebProxy,
102-
options.EnableCrossEntityTransactions);
102+
options.EnableCrossEntityTransactions,
103+
options.RetryOptions.TryTimeout);
103104
}
104105

105106
/// <summary>

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpConnectionScope.cs

Lines changed: 12 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -167,28 +167,30 @@ public override bool IsDisposed
167167
private string _sendViaReceiverEntityPath;
168168

169169
private readonly object _syncLock = new();
170+
private readonly TimeSpan _operationTimeout;
170171

171172
/// <summary>
172173
/// Initializes a new instance of the <see cref="AmqpConnectionScope"/> class.
173174
/// </summary>
174-
///
175175
/// <param name="serviceEndpoint">Endpoint for the Service Bus service to which the scope is associated.</param>
176176
/// <param name="credential">The credential to use for authorization with the Service Bus service.</param>
177177
/// <param name="transport">The transport to use for communication.</param>
178178
/// <param name="proxy">The proxy, if any, to use for communication.</param>
179179
/// <param name="useSingleSession">If true, all links will use a single session.</param>
180-
///
180+
/// <param name="operationTimeout">The timeout for operations associated with the connection.</param>
181181
public AmqpConnectionScope(
182182
Uri serviceEndpoint,
183183
ServiceBusTokenCredential credential,
184184
ServiceBusTransportType transport,
185185
IWebProxy proxy,
186-
bool useSingleSession)
186+
bool useSingleSession,
187+
TimeSpan operationTimeout)
187188
{
188189
Argument.AssertNotNull(serviceEndpoint, nameof(serviceEndpoint));
189190
Argument.AssertNotNull(credential, nameof(credential));
190191
ValidateTransport(transport);
191192

193+
_operationTimeout = operationTimeout;
192194
ServiceEndpoint = serviceEndpoint;
193195
Transport = transport;
194196
Proxy = proxy;
@@ -528,6 +530,7 @@ protected virtual async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(
528530
var linkSettings = new AmqpLinkSettings();
529531
linkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime()).TotalMilliseconds);
530532
linkSettings.AddProperty(AmqpClientConstants.EntityTypeName, AmqpClientConstants.EntityTypeManagement);
533+
linkSettings.OperationTimeout = _operationTimeout;
531534
entityPath += '/' + AmqpClientConstants.ManagementAddress;
532535

533536
// Perform the initial authorization for the link.
@@ -661,7 +664,8 @@ protected virtual async Task<ReceivingAmqpLink> CreateReceivingLinkAsync(
661664
AutoSendFlow = prefetchCount > 0,
662665
SettleType = (receiveMode == ServiceBusReceiveMode.PeekLock) ? SettleMode.SettleOnDispose : SettleMode.SettleOnSend,
663666
Source = new Source { Address = endpoint.AbsolutePath, FilterSet = filters },
664-
Target = new Target { Address = Guid.NewGuid().ToString() }
667+
Target = new Target { Address = Guid.NewGuid().ToString() },
668+
OperationTimeout = _operationTimeout
665669
};
666670

667671
var link = new ReceivingAmqpLink(linkSettings);
@@ -799,7 +803,8 @@ protected virtual async Task<SendingAmqpLink> CreateSendingLinkAsync(
799803
Role = false,
800804
InitialDeliveryCount = 0,
801805
Source = new Source { Address = Guid.NewGuid().ToString() },
802-
Target = new Target { Address = destinationEndpoint.AbsolutePath }
806+
Target = new Target { Address = destinationEndpoint.AbsolutePath },
807+
OperationTimeout = _operationTimeout
803808
};
804809

805810
linkSettings.AddProperty(AmqpClientConstants.TimeoutName, (uint)timeout.CalculateRemaining(stopWatch.GetElapsedTime()).TotalMilliseconds);
@@ -1076,40 +1081,9 @@ protected virtual async Task OpenAmqpObjectAsync(
10761081
string entityPath = default,
10771082
bool isProcessor = default)
10781083
{
1079-
CancellationTokenRegistration registration;
10801084
try
10811085
{
1082-
var openObjectCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
1083-
// Only allow cancelling in-flight opens when it is from a processor.
1084-
// This would occur when the processor is stopped or closed by the user.
1085-
if (isProcessor)
1086-
{
1087-
// use a static delegate with tuple state to avoid allocating a closure
1088-
registration = cancellationToken.Register(static state =>
1089-
{
1090-
var (tcs, target) = ((TaskCompletionSource<object>, AmqpObject))state;
1091-
if (tcs.TrySetCanceled())
1092-
{
1093-
target.SafeClose();
1094-
}
1095-
}, (openObjectCompletionSource, target), useSynchronizationContext: false);
1096-
}
1097-
1098-
static async Task Open(AmqpObject target, TimeSpan timeout, TaskCompletionSource<object> openObjectCompletionSource)
1099-
{
1100-
try
1101-
{
1102-
await target.OpenAsync(timeout).ConfigureAwait(false);
1103-
openObjectCompletionSource.TrySetResult(null);
1104-
}
1105-
catch (Exception ex)
1106-
{
1107-
openObjectCompletionSource.TrySetException(ex);
1108-
}
1109-
}
1110-
1111-
_ = Open(target, timeout, openObjectCompletionSource);
1112-
await openObjectCompletionSource.Task.ConfigureAwait(false);
1086+
await target.OpenAsync(cancellationToken).ConfigureAwait(false);
11131087
}
11141088
catch (Exception ex)
11151089
{
@@ -1140,13 +1114,6 @@ static async Task Open(AmqpObject target, TimeSpan timeout, TaskCompletionSource
11401114
throw;
11411115
}
11421116
}
1143-
finally
1144-
{
1145-
if (isProcessor)
1146-
{
1147-
registration.Dispose();
1148-
}
1149-
}
11501117
}
11511118

11521119
/// <summary>
@@ -1321,4 +1288,4 @@ private static void ValidateTransport(ServiceBusTransportType transport)
13211288
}
13221289
}
13231290
}
1324-
}
1291+
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,7 @@ public void ProcessorClientClosedException(string identifier)
881881
}
882882
}
883883

884-
[Event(ProcessorStoppingReceiveCanceledEvent, Level = EventLevel.Verbose, Message = "A receive operation was cancelled while stopping the processor. (Identifier '{0}'). Error Message: '{1}'")]
884+
[Event(ProcessorStoppingReceiveCanceledEvent, Level = EventLevel.Verbose, Message = "A receive operation was cancelled while stopping the processor or scaling down concurrency. (Identifier '{0}'). Error Message: '{1}'")]
885885
public void ProcessorStoppingReceiveCanceled(string identifier, string exception)
886886
{
887887
if (IsEnabled())
@@ -890,7 +890,7 @@ public void ProcessorStoppingReceiveCanceled(string identifier, string exception
890890
}
891891
}
892892

893-
[Event(ProcessorStoppingAcceptSessionCanceledEvent, Level = EventLevel.Verbose, Message = "An accept session operation was cancelled while stopping the processor. (Namespace '{0}', Entity path '{1}'). Error Message: '{2}'")]
893+
[Event(ProcessorStoppingAcceptSessionCanceledEvent, Level = EventLevel.Verbose, Message = "An accept session operation was cancelled while stopping the processor or scaling down concurrency. (Namespace '{0}', Entity path '{1}'). Error Message: '{2}'")]
894894
public void ProcessorStoppingAcceptSessionCanceled(string fullyQualifiedNamespace, string entityPath, string exception)
895895
{
896896
if (IsEnabled())

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ public class ServiceBusProcessor : IAsyncDisposable
4646
/// The primitive for ensuring that the service is not overloaded with
4747
/// accept session requests.
4848
/// </summary>
49-
private SemaphoreSlim MaxConcurrentAcceptSessionsSemaphore { get; }
49+
private readonly SemaphoreSlim _maxConcurrentAcceptSessionsSemaphore = new(0, int.MaxValue);
5050

5151
/// <summary>The primitive for synchronizing access during start and close operations.</summary>
52-
private readonly SemaphoreSlim _processingStartStopSemaphore = new SemaphoreSlim(1, 1);
52+
private readonly SemaphoreSlim _processingStartStopSemaphore = new(1, 1);
5353

5454
private CancellationTokenSource RunningTaskTokenSource { get; set; }
5555

@@ -123,6 +123,8 @@ public class ServiceBusProcessor : IAsyncDisposable
123123
internal int MaxConcurrentCallsPerSession => _maxConcurrentCallsPerSession;
124124
private volatile int _maxConcurrentCallsPerSession;
125125

126+
private int _currentAcceptSessions;
127+
126128
internal TimeSpan? MaxReceiveWaitTime { get; }
127129

128130
/// <summary>
@@ -184,6 +186,7 @@ public virtual bool IsClosed
184186
internal readonly List<(Task Task, CancellationTokenSource Cts, ReceiverManager ReceiverManager)> _tasks = new();
185187
private readonly List<ReceiverManager> _orphanedReceiverManagers = new();
186188
private CancellationTokenSource _handlerCts = new();
189+
private readonly int _processorCount = Environment.ProcessorCount;
187190

188191
/// <summary>
189192
/// Initializes a new instance of the <see cref="ServiceBusProcessor"/> class.
@@ -237,11 +240,6 @@ internal ServiceBusProcessor(
237240
: _maxConcurrentSessions * _maxConcurrentCallsPerSession;
238241
}
239242

240-
var maxAcceptSessions = Math.Min(_maxConcurrentCalls, 2 * Environment.ProcessorCount);
241-
MaxConcurrentAcceptSessionsSemaphore = new SemaphoreSlim(
242-
maxAcceptSessions,
243-
maxAcceptSessions);
244-
245243
AutoCompleteMessages = Options.AutoCompleteMessages;
246244

247245
IsSessionProcessor = isSessionEntity;
@@ -607,7 +605,7 @@ private void ReconcileReceiverManagers(int maxConcurrentSessions)
607605
new SessionReceiverManager(
608606
_sessionProcessor,
609607
sessionId,
610-
MaxConcurrentAcceptSessionsSemaphore,
608+
_maxConcurrentAcceptSessionsSemaphore,
611609
_scopeFactory,
612610
KeepOpenOnReceiveTimeout));
613611
}
@@ -634,7 +632,7 @@ private void ReconcileReceiverManagers(int maxConcurrentSessions)
634632
new SessionReceiverManager(
635633
_sessionProcessor,
636634
null,
637-
MaxConcurrentAcceptSessionsSemaphore,
635+
_maxConcurrentAcceptSessionsSemaphore,
638636
_scopeFactory,
639637
KeepOpenOnReceiveTimeout));
640638
}
@@ -1031,6 +1029,25 @@ private async Task ReconcileConcurrencyAsync()
10311029
}
10321030
}
10331031

1032+
if (IsSessionProcessor)
1033+
{
1034+
int maxAcceptSessions = Math.Min(maxConcurrentCalls, 2 * _processorCount);
1035+
int diffAcceptSessions = maxAcceptSessions - _currentAcceptSessions;
1036+
if (diffAcceptSessions > 0)
1037+
{
1038+
_maxConcurrentAcceptSessionsSemaphore.Release(diffAcceptSessions);
1039+
}
1040+
else
1041+
{
1042+
int diffAcceptLimit = Math.Abs(diffAcceptSessions);
1043+
for (int i = 0; i < diffAcceptLimit; i++)
1044+
{
1045+
await _maxConcurrentAcceptSessionsSemaphore.WaitAsync().ConfigureAwait(false);
1046+
}
1047+
}
1048+
_currentAcceptSessions = maxAcceptSessions;
1049+
}
1050+
10341051
ReconcileReceiverManagers(maxConcurrentSessions);
10351052

10361053
_currentConcurrentCalls = maxConcurrentCalls;

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,7 @@ await ProcessOneMessageWithinScopeAsync(
295295
}
296296
}
297297
catch (Exception ex)
298-
when (ex is not TaskCanceledException ||
299-
// If the user manually throws a TCE, then we should log it.
300-
(!_sessionCancellationSource.IsCancellationRequested &&
301-
!processorCancellationToken.IsCancellationRequested))
298+
when (ex is not TaskCanceledException)
302299
{
303300
if (ex is ServiceBusException sbException)
304301
{

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Amqp/AmqpConnectionScopeTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public MockConnectionScope(
214214
Uri serviceEndpoint,
215215
ServiceBusTokenCredential credential,
216216
ServiceBusTransportType transport,
217-
IWebProxy proxy) : base(serviceEndpoint, credential, transport, proxy, false)
217+
IWebProxy proxy) : base(serviceEndpoint, credential, transport, proxy, false, default)
218218
{
219219
MockConnection = new Mock<AmqpConnection>(new MockTransport(), CreateMockAmqpSettings(), new AmqpConnectionSettings());
220220
}

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Client/ServiceBusClientLiveTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ public async Task GetChildClientFromParentSucceedsOnOpenConnection(bool useSessi
198198
}
199199

200200
[Test]
201-
[Ignore("reverted cancellation support outside of processor")]
202201
public async Task AcceptNextSessionRespectsCancellation()
203202
{
204203
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ public async Task LogsProcessorClientClosedExceptionEvent()
360360
}
361361

362362
[Test]
363+
[Ignore("issue with AMQP lib not using link level timeout")]
363364
public async Task DoesNotLogAcceptSessionTimeoutAsError()
364365
{
365366
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorLiveTests.cs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -853,8 +853,8 @@ public async Task AutoLockRenewalContinuesUntilProcessingCompletes()
853853

854854
async Task ProcessMessage(ProcessMessageEventArgs args)
855855
{
856-
var ct = Interlocked.Increment(ref receivedCount);
857-
if (ct == messageCount)
856+
var count = Interlocked.Increment(ref receivedCount);
857+
if (count == messageCount)
858858
{
859859
tcs.SetResult(true);
860860
}
@@ -896,27 +896,27 @@ async Task ProcessMessage(ProcessMessageEventArgs args)
896896
await args.AbandonMessageAsync(args.Message);
897897
}
898898

899-
var ct = Interlocked.Increment(ref receivedCount);
900-
if (ct == messageCount)
899+
var count = Interlocked.Increment(ref receivedCount);
900+
if (count == messageCount)
901901
{
902902
tcs.SetResult(true);
903903
}
904904

905905
// decrease concurrency
906-
if (ct == 100)
906+
if (count == 100)
907907
{
908908
processor.UpdateConcurrency(1);
909909
Assert.AreEqual(1, processor.MaxConcurrentCalls);
910910
}
911911

912912
// increase concurrency
913-
if (ct == 150)
913+
if (count == 150)
914914
{
915915
Assert.LessOrEqual(processor._tasks.Where(t => !t.Task.IsCompleted).Count(), 1);
916916
processor.UpdateConcurrency(10);
917917
Assert.AreEqual(10, processor.MaxConcurrentCalls);
918918
}
919-
if (ct == 175)
919+
if (count == 175)
920920
{
921921
Assert.GreaterOrEqual(processor._tasks.Where(t => !t.Task.IsCompleted).Count(), 5);
922922
}

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Processor/ProcessorTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ public async Task CloseRespectsCancellationToken()
425425
var cts = new CancellationTokenSource();
426426

427427
// mutate the cancellation token to distinguish it from CancellationToken.None
428-
cts.CancelAfter(100);
428+
cts.CancelAfter(500);
429429

430430
await mockProcessor.Object.CloseAsync(cts.Token);
431431
mockProcessor.Verify(p => p.StopProcessingAsync(It.Is<CancellationToken>(ct => ct == cts.Token)));

0 commit comments

Comments
 (0)