Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;
using Microsoft.Azure.ServiceBus.Core;

namespace Microsoft.Azure.ServiceBus
{
using System;
Expand All @@ -25,19 +28,40 @@ public sealed class MessageHandlerOptions
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler) : this(exceptionReceivedHandler, null)
{
}

/// <summary>Initializes a new instance of the <see cref="MessageHandlerOptions" /> class.
/// Default Values:
/// <see cref="MaxConcurrentCalls"/> = 1
/// <see cref="AutoComplete"/> = true
/// <see cref="ReceiveTimeOut"/> = 1 minute
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
/// <param name="propertiesToModifyOnExceptionHandler">An optional <see cref="Func{T1, TResult}"/> that is invoked before a message is abandoned because of an exception that happened within the message handler callback;
/// The returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler, Func<ExceptionReceivedEventArgs, IDictionary<string, object>> propertiesToModifyOnExceptionHandler)
{
this.MaxConcurrentCalls = 1;
this.AutoComplete = true;
this.ReceiveTimeOut = Constants.DefaultOperationTimeout;
this.MaxAutoRenewDuration = Constants.ClientPumpRenewLockTimeout;
this.ExceptionReceivedHandler = exceptionReceivedHandler ?? throw new ArgumentNullException(nameof(exceptionReceivedHandler));
this.PropertiesToModifyOnExceptionHandler = propertiesToModifyOnExceptionHandler;
}

/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the message pump.
/// When errors are received calls will automatically be retried, so this is informational. </summary>
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }

/// <summary>Occurs when a message is about to be abandoned because of an exception that happened within the message handler callback;
/// The returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.</summary>
public Func<ExceptionReceivedEventArgs, IDictionary<string, object>> PropertiesToModifyOnExceptionHandler { get; }

/// <summary>Gets or sets the maximum number of concurrent calls to the callback the message pump should initiate.</summary>
/// <value>The maximum number of concurrent calls to the callback.</value>
public int MaxConcurrentCalls
Expand Down
21 changes: 18 additions & 3 deletions src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;

namespace Microsoft.Azure.ServiceBus
{
using System;
Expand Down Expand Up @@ -152,7 +154,7 @@ async Task MessageDispatchTask(Message message)
// Nothing much to do if UserCallback throws, Abandon message and Release semaphore.
if (!(exception is MessageLockLostException))
{
await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
await this.AbandonMessageIfNeededAsync(message, exception).ConfigureAwait(false);
}

if (ServiceBusDiagnosticSource.IsEnabled())
Expand Down Expand Up @@ -191,13 +193,26 @@ void CancelAutoRenewLock(object state)
}
}

async Task AbandonMessageIfNeededAsync(Message message)
async Task AbandonMessageIfNeededAsync(Message message, Exception callbackException)
{
try
{
if (this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock)
{
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
IDictionary<string, object> propertiesToModify = null;
if (this.registerHandlerOptions.PropertiesToModifyOnExceptionHandler != null)
{
var eventArgs = new ExceptionReceivedEventArgs(callbackException, ExceptionReceivedEventArgsAction.UserCallback, this.endpoint, this.messageReceiver.Path, this.messageReceiver.ClientId);
try
{
propertiesToModify = this.registerHandlerOptions.PropertiesToModifyOnExceptionHandler(eventArgs);
}
catch (Exception exception)
{
MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, message, exception);
}
}
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down
26 changes: 25 additions & 1 deletion src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;
using Microsoft.Azure.ServiceBus.Core;

namespace Microsoft.Azure.ServiceBus
{
using System;
Expand All @@ -26,20 +29,41 @@ public sealed class SessionHandlerOptions
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler) : this(exceptionReceivedHandler, null)
{
}

/// <summary>Initializes a new instance of the <see cref="SessionHandlerOptions" /> class.
/// Default Values:
/// <see cref="MaxConcurrentSessions"/> = 2000
/// <see cref="AutoComplete"/> = true
/// <see cref="MessageWaitTimeout"/> = 1 minute
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
/// <param name="propertiesToModifyOnExceptionHandler">An optional <see cref="Func{T1, TResult}"/> that is invoked before a message is abandoned because of an exception that happened within the session handler callback;
/// The returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler, Func<ExceptionReceivedEventArgs, IDictionary<string, object>> propertiesToModifyOnExceptionHandler)
{
// These are default values
this.AutoComplete = true;
this.MaxConcurrentSessions = 2000;
this.MessageWaitTimeout = TimeSpan.FromMinutes(1);
this.MaxAutoRenewDuration = Constants.ClientPumpRenewLockTimeout;
this.ExceptionReceivedHandler = exceptionReceivedHandler ?? throw new ArgumentNullException(nameof(exceptionReceivedHandler));
this.PropertiesToModifyOnExceptionHandler = propertiesToModifyOnExceptionHandler;
}

/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the session pump.
/// When errors are received calls will automatically be retried, so this is informational. </summary>
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }

/// <summary>Occurs when a message is about to be abandoned because of an exception that happened within the session handler callback;
/// The returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.</summary>
public Func<ExceptionReceivedEventArgs, IDictionary<string, object>> PropertiesToModifyOnExceptionHandler { get; }

/// <summary>Gets or sets the duration for which the session lock will be renewed automatically.</summary>
/// <value>The duration for which the session renew its state.</value>
public TimeSpan MaxAutoRenewDuration
Expand Down
21 changes: 18 additions & 3 deletions src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;

namespace Microsoft.Azure.ServiceBus
{
using System;
Expand Down Expand Up @@ -96,13 +98,26 @@ async Task CompleteMessageIfNeededAsync(IMessageSession session, Message message
}
}

async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message)
async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message, Exception callbackException)
{
try
{
if (session.ReceiveMode == ReceiveMode.PeekLock)
{
await session.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
IDictionary<string, object> propertiesToModify = null;
if (this.sessionHandlerOptions.PropertiesToModifyOnExceptionHandler != null)
{
var eventArgs = new ExceptionReceivedEventArgs(callbackException, ExceptionReceivedEventArgsAction.UserCallback, this.endpoint, this.entityPath, this.clientId);
try
{
propertiesToModify = this.sessionHandlerOptions.PropertiesToModifyOnExceptionHandler(eventArgs);
}
catch (Exception exception)
{
MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.clientId, message, exception);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this exception will look like? Will it contain enough info to understand that it's the user provided callback that has failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, the user callback threw an exception so the pump is abandoning the message. We let the user execute a second callback to return a dictionary of properties to modify on the message being abandoned, most likely to store details about the exception on the message itself. If that second callback fails, we don't want to fail abandoning the message so we just ignore the properties to modify and pass null (the default value). I'm not sure what else could be done, did you have something in mind?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging that exception? My thought is the following:
Processing callback fails, message is going to be abandoned. Properties modifier callback is failing, the same log message will be made with a different exception, using MessagingEventSource.Log.MessageReceiverPumpUserCallbackException() variant. Perhaps it should be a more specific Log method to distinguish between callbacks? That would suffice imo.

}
}
await session.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down Expand Up @@ -243,7 +258,7 @@ async Task MessagePumpTaskAsync(IMessageSession session)
callbackExceptionOccurred = true;
if (!(exception is MessageLockLostException || exception is SessionLockLostException))
{
await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
await this.AbandonMessageIfNeededAsync(session, message, exception).ConfigureAwait(false);
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,12 @@ namespace Microsoft.Azure.ServiceBus
public sealed class MessageHandlerOptions
{
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler, System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Collections.Generic.IDictionary<string, object>> propertiesToModifyOnExceptionHandler) { }
public bool AutoComplete { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
public System.TimeSpan MaxAutoRenewDuration { get; set; }
public int MaxConcurrentCalls { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Collections.Generic.IDictionary<string, object>> PropertiesToModifyOnExceptionHandler { get; }
}
public sealed class MessageLockLostException : Microsoft.Azure.ServiceBus.ServiceBusException
{
Expand Down Expand Up @@ -379,11 +381,13 @@ namespace Microsoft.Azure.ServiceBus
public sealed class SessionHandlerOptions
{
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler, System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Collections.Generic.IDictionary<string, object>> propertiesToModifyOnExceptionHandler) { }
public bool AutoComplete { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
public System.TimeSpan MaxAutoRenewDuration { get; set; }
public int MaxConcurrentSessions { get; set; }
public System.TimeSpan MessageWaitTimeout { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Collections.Generic.IDictionary<string, object>> PropertiesToModifyOnExceptionHandler { get; }
}
public sealed class SessionLockLostException : Microsoft.Azure.ServiceBus.ServiceBusException
{
Expand Down