Skip to content

Commit bb44982

Browse files
Log retries as Verbose when attempting to accept sessions (Azure#28302)
* Log retries as Verbose when attempting to accept sessions * Fix property name in exception * Fix flaky test * Revert to warning for other retries * Add identifier/entity path filters to tests * Add identifier to error handler event * fix
1 parent ccfcd28 commit bb44982

File tree

9 files changed

+99
-42
lines changed

9 files changed

+99
-42
lines changed

sdk/servicebus/Azure.Messaging.ServiceBus/src/Administration/CreateSubscriptionOptions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public int MaxDeliveryCount
182182
Argument.AssertAtLeast(
183183
value,
184184
AdministrationClientConstants.MinAllowedMaxDeliveryCount,
185-
nameof(AutoDeleteOnIdle));
185+
nameof(MaxDeliveryCount));
186186

187187
_maxDeliveryCount = value;
188188
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1351,7 +1351,7 @@ private static TimeSpan UseMinimum(
13511351
(firstOption < secondOption) ? firstOption : secondOption;
13521352

13531353
/// <summary>
1354-
/// Opens an AMQP link for use with receiver operations.
1354+
/// Opens an AMQP link for use with session receiver operations.
13551355
/// </summary>
13561356
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
13571357
/// <returns>A task to be resolved on when the operation has completed.</returns>
@@ -1362,7 +1362,8 @@ static async (receiveLink, timeout, token) =>
13621362
await receiveLink.GetOrCreateAsync(timeout, token).ConfigureAwait(false),
13631363
_receiveLink,
13641364
_connectionScope,
1365-
cancellationToken).ConfigureAwait(false);
1365+
cancellationToken,
1366+
true).ConfigureAwait(false);
13661367
}
13671368

13681369
private bool HasLinkCommunicationError(ReceivingAmqpLink link) =>

sdk/servicebus/Azure.Messaging.ServiceBus/src/Diagnostics/ServiceBusEventSource.cs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ protected ServiceBusEventSource() : base(EventSourceName)
194194

195195
internal const int ProcessorStoppingCancellationWarningEvent = 113;
196196

197+
internal const int RunOperationExceptionVerboseEvent = 114;
198+
197199
#endregion
198200
// add new event numbers here incrementing from previous
199201

@@ -844,12 +846,12 @@ public virtual void ProcessorRenewSessionLockException(string identifier, string
844846
}
845847
}
846848

847-
[Event(ProcessorErrorHandlerThrewExceptionEvent, Level = EventLevel.Error, Message = "ExceptionReceivedHandler threw exception. Exception:{0}")]
848-
public void ProcessorErrorHandlerThrewException(string exception)
849+
[Event(ProcessorErrorHandlerThrewExceptionEvent, Level = EventLevel.Error, Message = "{1}: ExceptionReceivedHandler threw exception. Exception:{0}")]
850+
public void ProcessorErrorHandlerThrewException(string exception, string identifier)
849851
{
850852
if (IsEnabled())
851853
{
852-
WriteEvent(ProcessorErrorHandlerThrewExceptionEvent, exception);
854+
WriteEvent(ProcessorErrorHandlerThrewExceptionEvent, exception, identifier);
853855
}
854856
}
855857

@@ -1357,14 +1359,23 @@ public virtual void RequestAuthorizationException(string identifier, string endp
13571359

13581360
#region Retries
13591361

1360-
[Event(RunOperationExceptionEvent, Level = EventLevel.Informational, Message = "RunOperation encountered an exception and will retry. Exception: {0}")]
1362+
[Event(RunOperationExceptionEvent, Level = EventLevel.Warning, Message = "RunOperation encountered an exception and will retry. Exception: {0}")]
13611363
public virtual void RunOperationExceptionEncountered(string exception)
13621364
{
13631365
if (IsEnabled())
13641366
{
13651367
WriteEvent(RunOperationExceptionEvent, exception);
13661368
}
13671369
}
1370+
1371+
[Event(RunOperationExceptionVerboseEvent, Level = EventLevel.Verbose, Message = "RunOperation encountered an exception and will retry. Exception: {0}")]
1372+
public virtual void RunOperationExceptionEncounteredVerbose(string exception)
1373+
{
1374+
if (IsEnabled())
1375+
{
1376+
WriteEvent(RunOperationExceptionVerboseEvent, exception);
1377+
}
1378+
}
13681379
#endregion
13691380

13701381
#region Client lifecycle

sdk/servicebus/Azure.Messaging.ServiceBus/src/Primitives/ServiceBusRetryPolicy.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ internal async ValueTask<TResult> RunOperation<T1, TResult>(
118118
Func<T1, TimeSpan, CancellationToken, ValueTask<TResult>> operation,
119119
T1 t1,
120120
TransportConnectionScope scope,
121-
CancellationToken cancellationToken)
121+
CancellationToken cancellationToken,
122+
bool logRetriesAsVerbose = false)
122123
{
123124
var failedAttemptCount = 0;
124125

@@ -160,7 +161,15 @@ internal async ValueTask<TResult> RunOperation<T1, TResult>(
160161
TimeSpan? retryDelay = CalculateRetryDelay(activeEx, failedAttemptCount);
161162
if (retryDelay.HasValue && !scope.IsDisposed && !cancellationToken.IsCancellationRequested)
162163
{
163-
Logger.RunOperationExceptionEncountered(activeEx.ToString());
164+
if (logRetriesAsVerbose)
165+
{
166+
Logger.RunOperationExceptionEncounteredVerbose(activeEx.ToString());
167+
}
168+
else
169+
{
170+
Logger.RunOperationExceptionEncountered(activeEx.ToString());
171+
}
172+
164173
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
165174
tryTimeout = CalculateTryTimeout(failedAttemptCount);
166175
}

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ReceiverManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ protected virtual async Task RaiseExceptionReceived(ProcessErrorEventArgs eventA
346346
catch (Exception exception)
347347
{
348348
// don't bubble up exceptions raised from customer exception handler
349-
ServiceBusEventSource.Log.ProcessorErrorHandlerThrewException(exception.ToString());
349+
ServiceBusEventSource.Log.ProcessorErrorHandlerThrewException(exception.ToString(), Processor.Identifier);
350350
}
351351
}
352352

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/ServiceBusProcessor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ await OnProcessErrorAsync(
892892
catch (Exception ex)
893893
{
894894
// Don't bubble up exceptions raised from customer exception handler.
895-
Logger.ProcessorErrorHandlerThrewException(ex.ToString());
895+
Logger.ProcessorErrorHandlerThrewException(ex.ToString(), Identifier);
896896
}
897897

898898
// This call will deadlock if awaited, as StopProcessingAsync awaits

sdk/servicebus/Azure.Messaging.ServiceBus/src/Processor/SessionReceiverManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,7 @@ protected override async Task RaiseExceptionReceived(ProcessErrorEventArgs event
411411
catch (Exception exception)
412412
{
413413
// don't bubble up exceptions raised from customer exception handler
414-
ServiceBusEventSource.Log.ProcessorErrorHandlerThrewException(exception.ToString());
414+
ServiceBusEventSource.Log.ProcessorErrorHandlerThrewException(exception.ToString(), Processor.Identifier);
415415
}
416416
}
417417

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Diagnostics/EventSourceLiveTests.cs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -315,18 +315,62 @@ Task ProcessMessage(ProcessMessageEventArgs args)
315315

316316
Task ExceptionHandler(ProcessErrorEventArgs args)
317317
{
318+
if (args.ErrorSource == ServiceBusErrorSource.ProcessMessageCallback)
319+
{
320+
throw new Exception();
321+
}
322+
323+
return Task.CompletedTask;
324+
}
325+
326+
processor.ProcessMessageAsync += ProcessMessage;
327+
processor.ProcessErrorAsync += ExceptionHandler;
328+
329+
await processor.StartProcessingAsync();
330+
await tcs.Task;
331+
await processor.StopProcessingAsync();
332+
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerStartEvent, args => args.Payload.Contains(processor.Identifier));
333+
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerExceptionEvent, args => args.Payload.Contains(processor.Identifier));
334+
_listener.SingleEventById(ServiceBusEventSource.ProcessorErrorHandlerThrewExceptionEvent, args => args.Payload.Contains(processor.Identifier));
335+
}
336+
}
337+
338+
[Test]
339+
public async Task LogsSessionProcessorExceptionEvent()
340+
{
341+
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
342+
{
343+
await using var client = CreateClient();
344+
var sender = client.CreateSender(scope.QueueName);
345+
await sender.SendMessageAsync(ServiceBusTestUtilities.GetMessage("sessionId"));
346+
await using var processor = client.CreateSessionProcessor(scope.QueueName);
347+
var tcs = new TaskCompletionSource<bool>();
348+
349+
Task ProcessMessage(ProcessSessionMessageEventArgs args)
350+
{
351+
tcs.SetResult(true);
318352
throw new Exception();
319353
}
320354

355+
Task ExceptionHandler(ProcessErrorEventArgs args)
356+
{
357+
if (args.ErrorSource == ServiceBusErrorSource.ProcessMessageCallback)
358+
{
359+
throw new Exception();
360+
}
361+
362+
return Task.CompletedTask;
363+
}
364+
321365
processor.ProcessMessageAsync += ProcessMessage;
322366
processor.ProcessErrorAsync += ExceptionHandler;
323367

324368
await processor.StartProcessingAsync();
325369
await tcs.Task;
326370
await processor.StopProcessingAsync();
327-
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerStartEvent);
328-
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerExceptionEvent);
329-
_listener.SingleEventById(ServiceBusEventSource.ProcessorErrorHandlerThrewExceptionEvent);
371+
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerStartEvent, args => args.Payload.Contains(processor.Identifier));
372+
_listener.SingleEventById(ServiceBusEventSource.ProcessorMessageHandlerExceptionEvent, args => args.Payload.Contains(processor.Identifier));
373+
_listener.SingleEventById(ServiceBusEventSource.ProcessorErrorHandlerThrewExceptionEvent, args => args.Payload.Contains(processor.Identifier));
330374
}
331375
}
332376

@@ -366,7 +410,7 @@ public async Task LogsProcessorClientClosedExceptionEvent()
366410
await Task.Delay(500, cancellationSource.Token);
367411
}
368412

369-
_listener.SingleEventById(ServiceBusEventSource.ProcessorClientClosedExceptionEvent);
413+
_listener.SingleEventById(ServiceBusEventSource.ProcessorClientClosedExceptionEvent, args => args.Payload.Contains(processor.Identifier));
370414
}
371415
}
372416

@@ -375,22 +419,26 @@ public async Task DoesNotLogAcceptSessionTimeoutAsError()
375419
{
376420
await using (var scope = await ServiceBusScope.CreateWithQueue(enablePartitioning: false, enableSession: true))
377421
{
378-
await using var client = CreateNoRetryClient(5);
422+
await using var client = CreateClient(tryTimeout: 5, maxRetries: 1);
379423
await using var processor = client.CreateSessionProcessor(scope.QueueName);
380424

381425
processor.ProcessMessageAsync += args => Task.CompletedTask;
382426
processor.ProcessErrorAsync += args => Task.CompletedTask;
383427

384428
await processor.StartProcessingAsync();
385429

386-
// wait twice as long as the try timeout to ensure that the Accept session will timeout
387-
await Task.Delay(TimeSpan.FromSeconds(10));
430+
// wait long enough to ensure that the Accept session will timeout accounting for retries
431+
await Task.Delay(TimeSpan.FromSeconds(20));
388432

389433
await processor.StopProcessingAsync();
390434

391-
Assert.False(_listener.EventsById(ServiceBusEventSource.CreateReceiveLinkExceptionEvent).Any());
392-
Assert.False(_listener.EventsById(ServiceBusEventSource.ClientCreateExceptionEvent).Any());
393-
Assert.True(_listener.EventsById(ServiceBusEventSource.ProcessorAcceptSessionTimeoutEvent).Any(e => e.Level == EventLevel.Verbose));
435+
bool ContainsEntityPath(EventWrittenEventArgs args) => args.Payload.Contains(processor.EntityPath);
436+
437+
Assert.False(_listener.EventsById(ServiceBusEventSource.CreateReceiveLinkExceptionEvent).Any(ContainsEntityPath));
438+
Assert.False(_listener.EventsById(ServiceBusEventSource.ClientCreateExceptionEvent).Any(ContainsEntityPath));
439+
Assert.True(_listener.EventsById(ServiceBusEventSource.ProcessorAcceptSessionTimeoutEvent).Any(e => e.Level == EventLevel.Verbose && ContainsEntityPath(e)));
440+
Assert.True(_listener.EventsById(ServiceBusEventSource.RunOperationExceptionVerboseEvent).Any(e => e.Level == EventLevel.Verbose));
441+
Assert.False(_listener.EventsById(ServiceBusEventSource.RunOperationExceptionEvent).Any(ContainsEntityPath));
394442
}
395443
}
396444

@@ -413,8 +461,9 @@ public async Task DoesNotLogStoppingAcceptSessionCanceledAsError()
413461
await processor.StopProcessingAsync();
414462

415463
Assert.False(_listener.EventsById(ServiceBusEventSource.CreateReceiveLinkExceptionEvent).Any());
416-
Assert.False(_listener.EventsById(ServiceBusEventSource.ClientCreateExceptionEvent).Any());
417-
Assert.True(_listener.EventsById(ServiceBusEventSource.ProcessorStoppingAcceptSessionCanceledEvent).Any(e => e.Level == EventLevel.Verbose));
464+
Assert.False(_listener.EventsById(ServiceBusEventSource.ClientCreateExceptionEvent).Any(e => e.Payload.Contains(processor.EntityPath)));
465+
Assert.True(_listener.EventsById(ServiceBusEventSource.ProcessorStoppingAcceptSessionCanceledEvent)
466+
.Any(e => e.Level == EventLevel.Verbose));
418467
}
419468
}
420469

@@ -437,7 +486,7 @@ public async Task StoppingProcessorDoesNotLogTaskCanceledExceptions()
437486
await processor.StopProcessingAsync();
438487

439488
Assert.False(_listener.EventsById(ServiceBusEventSource.CreateReceiveLinkExceptionEvent).Any());
440-
Assert.False(_listener.EventsById(ServiceBusEventSource.ClientCreateExceptionEvent).Any());
489+
Assert.False(_listener.EventsById(ServiceBusEventSource.ClientCreateExceptionEvent).Any(e => e.Payload.Contains(processor.EntityPath)));
441490
Assert.True(_listener.EventsById(ServiceBusEventSource.ProcessorStoppingReceiveCanceledEvent).Any());
442491
}
443492
}
@@ -467,7 +516,7 @@ public async Task StoppingSessionProcessorDoesNotLogTaskCanceledExceptions()
467516
await processor.StopProcessingAsync();
468517

469518
Assert.False(_listener.EventsById(ServiceBusEventSource.CreateReceiveLinkExceptionEvent).Any());
470-
Assert.False(_listener.EventsById(ServiceBusEventSource.ClientCreateExceptionEvent).Any());
519+
Assert.False(_listener.EventsById(ServiceBusEventSource.ClientCreateExceptionEvent).Any(e => e.Payload.Contains(processor.EntityPath)));
471520
Assert.True(_listener.EventsById(ServiceBusEventSource.ProcessorStoppingReceiveCanceledEvent).Any());
472521
}
473522
}

sdk/servicebus/Azure.Messaging.ServiceBus/tests/Infrastructure/ServiceBusLiveTestBase.cs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,17 @@ public abstract class ServiceBusLiveTestBase : LiveTestBase<ServiceBusTestEnviro
1818

1919
protected TimeSpan ShortLockDuration = TimeSpan.FromSeconds(10);
2020

21-
protected ServiceBusClient CreateNoRetryClient(int tryTimeout = DefaultTryTimeout)
22-
{
23-
var options =
24-
new ServiceBusClientOptions
25-
{
26-
RetryOptions = new ServiceBusRetryOptions
27-
{
28-
TryTimeout = TimeSpan.FromSeconds(tryTimeout),
29-
MaxRetries = 0
30-
}
31-
};
32-
return new ServiceBusClient(
33-
TestEnvironment.ServiceBusConnectionString,
34-
options);
35-
}
21+
protected ServiceBusClient CreateNoRetryClient(int tryTimeout = DefaultTryTimeout) => CreateClient(tryTimeout, 0);
3622

37-
protected ServiceBusClient CreateClient(int tryTimeout = DefaultTryTimeout)
23+
protected ServiceBusClient CreateClient(int tryTimeout = DefaultTryTimeout, int maxRetries = 3)
3824
{
3925
var options =
4026
new ServiceBusClientOptions
4127
{
4228
RetryOptions = new ServiceBusRetryOptions
4329
{
4430
TryTimeout = TimeSpan.FromSeconds(tryTimeout),
31+
MaxRetries = maxRetries
4532
}
4633
};
4734
return new ServiceBusClient(

0 commit comments

Comments
 (0)