Skip to content
This repository was archived by the owner on Oct 12, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/Microsoft.Azure.ServiceBus/MessageHandlerOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;
using Microsoft.Azure.ServiceBus.Core;

namespace Microsoft.Azure.ServiceBus
{
using System;
Expand All @@ -25,19 +28,40 @@ public sealed class MessageHandlerOptions
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler) : this(exceptionReceivedHandler, null)
{
}

/// <summary>Initializes a new instance of the <see cref="MessageHandlerOptions" /> class.
/// Default Values:
/// <see cref="MaxConcurrentCalls"/> = 1
/// <see cref="AutoComplete"/> = true
/// <see cref="ReceiveTimeOut"/> = 1 minute
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
/// <param name="propertiesToModifyOnExceptionHandler">An optional <see cref="Func{T1, TResult}"/> that is invoked before a message is abandoned because of an exception that happened within the message handler callback;
/// The returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler, Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> propertiesToModifyOnExceptionHandler)
{
this.MaxConcurrentCalls = 1;
this.AutoComplete = true;
this.ReceiveTimeOut = Constants.DefaultOperationTimeout;
this.MaxAutoRenewDuration = Constants.ClientPumpRenewLockTimeout;
this.ExceptionReceivedHandler = exceptionReceivedHandler ?? throw new ArgumentNullException(nameof(exceptionReceivedHandler));
this.PropertiesToModifyOnExceptionHandler = propertiesToModifyOnExceptionHandler;
}

/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the message pump.
/// When errors are received calls will automatically be retried, so this is informational. </summary>
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }

/// <summary>Occurs when a message is about to be abandoned because of an exception that happened within the message handler callback;
/// The returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.</summary>
public Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> PropertiesToModifyOnExceptionHandler { get; }

/// <summary>Gets or sets the maximum number of concurrent calls to the callback the message pump should initiate.</summary>
/// <value>The maximum number of concurrent calls to the callback.</value>
public int MaxConcurrentCalls
Expand Down
14 changes: 11 additions & 3 deletions src/Microsoft.Azure.ServiceBus/MessageReceivePump.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;

namespace Microsoft.Azure.ServiceBus
{
using System;
Expand Down Expand Up @@ -152,7 +154,7 @@ async Task MessageDispatchTask(Message message)
// Nothing much to do if UserCallback throws, Abandon message and Release semaphore.
if (!(exception is MessageLockLostException))
{
await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
await this.AbandonMessageIfNeededAsync(message, exception).ConfigureAwait(false);
}

if (ServiceBusDiagnosticSource.IsEnabled())
Expand Down Expand Up @@ -191,13 +193,19 @@ void CancelAutoRenewLock(object state)
}
}

async Task AbandonMessageIfNeededAsync(Message message)
async Task AbandonMessageIfNeededAsync(Message message, Exception callbackException)
{
try
{
if (this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock)
{
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
IDictionary<string, object> propertiesToModify = null;
if (this.registerHandlerOptions.PropertiesToModifyOnExceptionHandler != null)
{
var eventArgs = new ExceptionReceivedEventArgs(callbackException, ExceptionReceivedEventArgsAction.UserCallback, this.endpoint, this.messageReceiver.Path, this.messageReceiver.ClientId);
propertiesToModify = await this.registerHandlerOptions.PropertiesToModifyOnExceptionHandler(eventArgs).ConfigureAwait(false);
}
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down
26 changes: 25 additions & 1 deletion src/Microsoft.Azure.ServiceBus/SessionHandlerOptions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;
using Microsoft.Azure.ServiceBus.Core;

namespace Microsoft.Azure.ServiceBus
{
using System;
Expand All @@ -26,20 +29,41 @@ public sealed class SessionHandlerOptions
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler) : this(exceptionReceivedHandler, null)
{
}

/// <summary>Initializes a new instance of the <see cref="SessionHandlerOptions" /> class.
/// Default Values:
/// <see cref="MaxConcurrentSessions"/> = 2000
/// <see cref="AutoComplete"/> = true
/// <see cref="MessageWaitTimeout"/> = 1 minute
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
/// <param name="propertiesToModifyOnExceptionHandler">An optional <see cref="Func{T1, TResult}"/> that is invoked before a message is abandoned because of an exception that happened within the session handler callback;
/// The returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler, Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> propertiesToModifyOnExceptionHandler)
{
// These are default values
this.AutoComplete = true;
this.MaxConcurrentSessions = 2000;
this.MessageWaitTimeout = TimeSpan.FromMinutes(1);
this.MaxAutoRenewDuration = Constants.ClientPumpRenewLockTimeout;
this.ExceptionReceivedHandler = exceptionReceivedHandler ?? throw new ArgumentNullException(nameof(exceptionReceivedHandler));
this.PropertiesToModifyOnExceptionHandler = propertiesToModifyOnExceptionHandler;
}

/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the session pump.
/// When errors are received calls will automatically be retried, so this is informational. </summary>
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }

/// <summary>Occurs when a message is about to be abandoned because of an exception that happened within the session handler callback;
/// The returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.</summary>
public Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> PropertiesToModifyOnExceptionHandler { get; }

/// <summary>Gets or sets the duration for which the session lock will be renewed automatically.</summary>
/// <value>The duration for which the session renew its state.</value>
public TimeSpan MaxAutoRenewDuration
Expand Down
14 changes: 11 additions & 3 deletions src/Microsoft.Azure.ServiceBus/SessionReceivePump.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System.Collections.Generic;

namespace Microsoft.Azure.ServiceBus
{
using System;
Expand Down Expand Up @@ -96,13 +98,19 @@ async Task CompleteMessageIfNeededAsync(IMessageSession session, Message message
}
}

async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message)
async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message, Exception callbackException)
{
try
{
if (session.ReceiveMode == ReceiveMode.PeekLock)
{
await session.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
IDictionary<string, object> propertiesToModify = null;
if (this.sessionHandlerOptions.PropertiesToModifyOnExceptionHandler != null)
{
var eventArgs = new ExceptionReceivedEventArgs(callbackException, ExceptionReceivedEventArgsAction.UserCallback, this.endpoint, this.entityPath, this.clientId);
propertiesToModify = await this.sessionHandlerOptions.PropertiesToModifyOnExceptionHandler(eventArgs).ConfigureAwait(false);
}
await session.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down Expand Up @@ -243,7 +251,7 @@ async Task MessagePumpTaskAsync(IMessageSession session)
callbackExceptionOccurred = true;
if (!(exception is MessageLockLostException || exception is SessionLockLostException))
{
await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
await this.AbandonMessageIfNeededAsync(session, message, exception).ConfigureAwait(false);
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,12 @@ namespace Microsoft.Azure.ServiceBus
public sealed class MessageHandlerOptions
{
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler, System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> propertiesToModifyOnExceptionHandler) { }
public bool AutoComplete { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
public System.TimeSpan MaxAutoRenewDuration { get; set; }
public int MaxConcurrentCalls { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> PropertiesToModifyOnExceptionHandler { get; }
}
public sealed class MessageLockLostException : Microsoft.Azure.ServiceBus.ServiceBusException
{
Expand Down Expand Up @@ -379,11 +381,13 @@ namespace Microsoft.Azure.ServiceBus
public sealed class SessionHandlerOptions
{
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler, System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> propertiesToModifyOnExceptionHandler) { }
public bool AutoComplete { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
public System.TimeSpan MaxAutoRenewDuration { get; set; }
public int MaxConcurrentSessions { get; set; }
public System.TimeSpan MessageWaitTimeout { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> PropertiesToModifyOnExceptionHandler { get; }
}
public sealed class SessionLockLostException : Microsoft.Azure.ServiceBus.ServiceBusException
{
Expand Down