diff --git a/src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs b/src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs index 07e5ee08..8e8d1c15 100644 --- a/src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs +++ b/src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs @@ -1,6 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Collections.Generic; +using Microsoft.Azure.ServiceBus.Core; + namespace Microsoft.Azure.ServiceBus { using System; @@ -25,19 +28,40 @@ public sealed class MessageHandlerOptions /// /// A that is invoked during exceptions. /// contains contextual information regarding the exception. - public MessageHandlerOptions(Func exceptionReceivedHandler) + public MessageHandlerOptions(Func exceptionReceivedHandler) : this(exceptionReceivedHandler, null) + { + } + + /// Initializes a new instance of the class. + /// Default Values: + /// = 1 + /// = true + /// = 1 minute + /// = 5 minutes + /// + /// A that is invoked during exceptions. + /// contains contextual information regarding the exception. + /// An optional that is invoked before a message is abandoned because of an exception that happened within the message handler callback; + /// The returned dictionary is passed to . + /// contains contextual information regarding the exception. + public MessageHandlerOptions(Func exceptionReceivedHandler, Func> propertiesToModifyOnExceptionHandler) { this.MaxConcurrentCalls = 1; this.AutoComplete = true; this.ReceiveTimeOut = Constants.DefaultOperationTimeout; this.MaxAutoRenewDuration = Constants.ClientPumpRenewLockTimeout; this.ExceptionReceivedHandler = exceptionReceivedHandler ?? throw new ArgumentNullException(nameof(exceptionReceivedHandler)); + this.PropertiesToModifyOnExceptionHandler = propertiesToModifyOnExceptionHandler; } /// Occurs when an exception is received. Enables you to be notified of any errors encountered by the message pump. /// When errors are received calls will automatically be retried, so this is informational. public Func ExceptionReceivedHandler { get; } + /// Occurs when a message is about to be abandoned because of an exception that happened within the message handler callback; + /// The returned dictionary is passed to . + public Func> PropertiesToModifyOnExceptionHandler { get; } + /// Gets or sets the maximum number of concurrent calls to the callback the message pump should initiate. /// The maximum number of concurrent calls to the callback. public int MaxConcurrentCalls diff --git a/src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs b/src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs index fa517e95..142cc255 100644 --- a/src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs +++ b/src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Collections.Generic; + namespace Microsoft.Azure.ServiceBus { using System; @@ -152,7 +154,7 @@ async Task MessageDispatchTask(Message message) // Nothing much to do if UserCallback throws, Abandon message and Release semaphore. if (!(exception is MessageLockLostException)) { - await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false); + await this.AbandonMessageIfNeededAsync(message, exception).ConfigureAwait(false); } if (ServiceBusDiagnosticSource.IsEnabled()) @@ -191,13 +193,26 @@ void CancelAutoRenewLock(object state) } } - async Task AbandonMessageIfNeededAsync(Message message) + async Task AbandonMessageIfNeededAsync(Message message, Exception callbackException) { try { if (this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock) { - await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false); + IDictionary propertiesToModify = null; + if (this.registerHandlerOptions.PropertiesToModifyOnExceptionHandler != null) + { + var eventArgs = new ExceptionReceivedEventArgs(callbackException, ExceptionReceivedEventArgsAction.UserCallback, this.endpoint, this.messageReceiver.Path, this.messageReceiver.ClientId); + try + { + propertiesToModify = this.registerHandlerOptions.PropertiesToModifyOnExceptionHandler(eventArgs); + } + catch (Exception exception) + { + MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, message, exception); + } + } + await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false); } } catch (Exception exception) diff --git a/src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs b/src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs index 185a24a9..ae1f8c88 100644 --- a/src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs +++ b/src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs @@ -1,6 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Collections.Generic; +using Microsoft.Azure.ServiceBus.Core; + namespace Microsoft.Azure.ServiceBus { using System; @@ -26,7 +29,23 @@ public sealed class SessionHandlerOptions /// /// A that is invoked during exceptions. /// contains contextual information regarding the exception. - public SessionHandlerOptions(Func exceptionReceivedHandler) + public SessionHandlerOptions(Func exceptionReceivedHandler) : this(exceptionReceivedHandler, null) + { + } + + /// Initializes a new instance of the class. + /// Default Values: + /// = 2000 + /// = true + /// = 1 minute + /// = 5 minutes + /// + /// A that is invoked during exceptions. + /// contains contextual information regarding the exception. + /// An optional that is invoked before a message is abandoned because of an exception that happened within the session handler callback; + /// The returned dictionary is passed to . + /// contains contextual information regarding the exception. + public SessionHandlerOptions(Func exceptionReceivedHandler, Func> propertiesToModifyOnExceptionHandler) { // These are default values this.AutoComplete = true; @@ -34,12 +53,17 @@ public SessionHandlerOptions(Func exceptionRec this.MessageWaitTimeout = TimeSpan.FromMinutes(1); this.MaxAutoRenewDuration = Constants.ClientPumpRenewLockTimeout; this.ExceptionReceivedHandler = exceptionReceivedHandler ?? throw new ArgumentNullException(nameof(exceptionReceivedHandler)); + this.PropertiesToModifyOnExceptionHandler = propertiesToModifyOnExceptionHandler; } /// Occurs when an exception is received. Enables you to be notified of any errors encountered by the session pump. /// When errors are received calls will automatically be retried, so this is informational. public Func ExceptionReceivedHandler { get; } + /// Occurs when a message is about to be abandoned because of an exception that happened within the session handler callback; + /// The returned dictionary is passed to . + public Func> PropertiesToModifyOnExceptionHandler { get; } + /// Gets or sets the duration for which the session lock will be renewed automatically. /// The duration for which the session renew its state. public TimeSpan MaxAutoRenewDuration diff --git a/src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs b/src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs index d611ae6c..234a5d60 100644 --- a/src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs +++ b/src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs @@ -1,6 +1,8 @@ // Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. +using System.Collections.Generic; + namespace Microsoft.Azure.ServiceBus { using System; @@ -96,13 +98,26 @@ async Task CompleteMessageIfNeededAsync(IMessageSession session, Message message } } - async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message) + async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message, Exception callbackException) { try { if (session.ReceiveMode == ReceiveMode.PeekLock) { - await session.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false); + IDictionary propertiesToModify = null; + if (this.sessionHandlerOptions.PropertiesToModifyOnExceptionHandler != null) + { + var eventArgs = new ExceptionReceivedEventArgs(callbackException, ExceptionReceivedEventArgsAction.UserCallback, this.endpoint, this.entityPath, this.clientId); + try + { + propertiesToModify = this.sessionHandlerOptions.PropertiesToModifyOnExceptionHandler(eventArgs); + } + catch (Exception exception) + { + MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.clientId, message, exception); + } + } + await session.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false); } } catch (Exception exception) @@ -243,7 +258,7 @@ async Task MessagePumpTaskAsync(IMessageSession session) callbackExceptionOccurred = true; if (!(exception is MessageLockLostException || exception is SessionLockLostException)) { - await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false); + await this.AbandonMessageIfNeededAsync(session, message, exception).ConfigureAwait(false); } } finally diff --git a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt index 19f35d01..82d0d178 100644 --- a/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt +++ b/test/Microsoft.Azure.ServiceBus.UnitTests/API/ApiApprovals.ApproveAzureServiceBus.approved.txt @@ -179,10 +179,12 @@ namespace Microsoft.Azure.ServiceBus public sealed class MessageHandlerOptions { public MessageHandlerOptions(System.Func exceptionReceivedHandler) { } + public MessageHandlerOptions(System.Func exceptionReceivedHandler, System.Func> propertiesToModifyOnExceptionHandler) { } public bool AutoComplete { get; set; } public System.Func ExceptionReceivedHandler { get; } public System.TimeSpan MaxAutoRenewDuration { get; set; } public int MaxConcurrentCalls { get; set; } + public System.Func> PropertiesToModifyOnExceptionHandler { get; } } public sealed class MessageLockLostException : Microsoft.Azure.ServiceBus.ServiceBusException { @@ -379,11 +381,13 @@ namespace Microsoft.Azure.ServiceBus public sealed class SessionHandlerOptions { public SessionHandlerOptions(System.Func exceptionReceivedHandler) { } + public SessionHandlerOptions(System.Func exceptionReceivedHandler, System.Func> propertiesToModifyOnExceptionHandler) { } public bool AutoComplete { get; set; } public System.Func ExceptionReceivedHandler { get; } public System.TimeSpan MaxAutoRenewDuration { get; set; } public int MaxConcurrentSessions { get; set; } public System.TimeSpan MessageWaitTimeout { get; set; } + public System.Func> PropertiesToModifyOnExceptionHandler { get; } } public sealed class SessionLockLostException : Microsoft.Azure.ServiceBus.ServiceBusException {