Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Core;
using Primitives;

/// <summary>Provides options associated with message pump processing using
Expand All @@ -26,6 +28,22 @@ public sealed class MessageHandlerOptions
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
: this(async args => { await exceptionReceivedHandler(args); return null; })
{
}

/// <summary>Initializes a new instance of the <see cref="MessageHandlerOptions" /> class.
/// Default Values:
/// <see cref="MaxConcurrentCalls"/> = 1
/// <see cref="AutoComplete"/> = true
/// <see cref="ReceiveTimeOut"/> = 1 minute
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// When the exception happens during user callback, the returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
/// For other actions, the returned dictionary is ignored.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> exceptionReceivedHandler)
{
this.MaxConcurrentCalls = 1;
this.AutoComplete = true;
Expand All @@ -36,7 +54,7 @@ public MessageHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionRec

/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the message pump.
/// When errors are received calls will automatically be retried, so this is informational. </summary>
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }
public Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> ExceptionReceivedHandler { get; }

/// <summary>Gets or sets the maximum number of concurrent calls to the callback the message pump should initiate.</summary>
/// <value>The maximum number of concurrent calls to the callback.</value>
Expand Down Expand Up @@ -79,15 +97,16 @@ public TimeSpan MaxAutoRenewDuration

internal TimeSpan ReceiveTimeOut { get; }

internal async Task RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
internal async Task<IDictionary<string, object>> RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
{
try
{
await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
return await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
}
catch (Exception exception)
{
MessagingEventSource.Log.ExceptionReceivedHandlerThrewException(exception);
return null;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -47,7 +48,7 @@ bool ShouldRenewLock()
this.registerHandlerOptions.AutoRenewLock;
}

Task RaiseExceptionReceived(Exception e, string action)
Task<IDictionary<string, object>> RaiseExceptionReceived(Exception e, string action)
{
var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.messageReceiver.Path, this.messageReceiver.ClientId);
return this.registerHandlerOptions.RaiseExceptionReceived(eventArgs);
Expand Down Expand Up @@ -147,12 +148,12 @@ async Task MessageDispatchTask(Message message)
catch (Exception exception)
{
MessagingEventSource.Log.MessageReceiverPumpUserCallbackException(this.messageReceiver.ClientId, message, exception);
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
var propertiesToModify = await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);

// Nothing much to do if UserCallback throws, Abandon message and Release semaphore.
if (!(exception is MessageLockLostException))
{
await this.AbandonMessageIfNeededAsync(message).ConfigureAwait(false);
await this.AbandonMessageIfNeededAsync(message, propertiesToModify).ConfigureAwait(false);
}

if (ServiceBusDiagnosticSource.IsEnabled())
Expand Down Expand Up @@ -191,13 +192,13 @@ void CancelAutoRenewLock(object state)
}
}

async Task AbandonMessageIfNeededAsync(Message message)
async Task AbandonMessageIfNeededAsync(Message message, IDictionary<string, object> propertiesToModify)
{
try
{
if (this.messageReceiver.ReceiveMode == ReceiveMode.PeekLock)
{
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
await this.messageReceiver.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Core;
using Primitives;

/// <summary>Provides options associated with session pump processing using
Expand All @@ -27,6 +29,22 @@ public sealed class SessionHandlerOptions
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionReceivedHandler)
: this(async args => { await exceptionReceivedHandler(args); return null; })
{
}

/// <summary>Initializes a new instance of the <see cref="SessionHandlerOptions" /> class.
/// Default Values:
/// <see cref="MaxConcurrentSessions"/> = 2000
/// <see cref="AutoComplete"/> = true
/// <see cref="MessageWaitTimeout"/> = 1 minute
/// <see cref="MaxAutoRenewDuration"/> = 5 minutes
/// </summary>
/// <param name="exceptionReceivedHandler">A <see cref="Func{T1, TResult}"/> that is invoked during exceptions.
/// When the exception happens during user callback, the returned dictionary is passed to <see cref="IReceiverClient.AbandonAsync"/>.
/// For other actions, the returned dictionary is ignored.
/// <see cref="ExceptionReceivedEventArgs"/> contains contextual information regarding the exception.</param>
public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> exceptionReceivedHandler)
{
// These are default values
this.AutoComplete = true;
Expand All @@ -38,7 +56,7 @@ public SessionHandlerOptions(Func<ExceptionReceivedEventArgs, Task> exceptionRec

/// <summary>Occurs when an exception is received. Enables you to be notified of any errors encountered by the session pump.
/// When errors are received calls will automatically be retried, so this is informational. </summary>
public Func<ExceptionReceivedEventArgs, Task> ExceptionReceivedHandler { get; }
public Func<ExceptionReceivedEventArgs, Task<IDictionary<string, object>>> ExceptionReceivedHandler { get; }

/// <summary>Gets or sets the duration for which the session lock will be renewed automatically.</summary>
/// <value>The duration for which the session renew its state.</value>
Expand Down Expand Up @@ -92,15 +110,16 @@ public int MaxConcurrentSessions

internal int MaxConcurrentAcceptSessionCalls { get; set; }

internal async Task RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
internal async Task<IDictionary<string, object>> RaiseExceptionReceived(ExceptionReceivedEventArgs eventArgs)
{
try
{
await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
return await this.ExceptionReceivedHandler(eventArgs).ConfigureAwait(false);
}
catch (Exception exception)
{
MessagingEventSource.Log.ExceptionReceivedHandlerThrewException(exception);
return null;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
namespace Microsoft.Azure.ServiceBus
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -74,7 +75,7 @@ bool ShouldRenewSessionLock()
this.sessionHandlerOptions.AutoRenewLock;
}

Task RaiseExceptionReceived(Exception e, string action)
Task<IDictionary<string, object>> RaiseExceptionReceived(Exception e, string action)
{
var eventArgs = new ExceptionReceivedEventArgs(e, action, this.endpoint, this.entityPath, this.clientId);
return this.sessionHandlerOptions.RaiseExceptionReceived(eventArgs);
Expand All @@ -96,13 +97,13 @@ async Task CompleteMessageIfNeededAsync(IMessageSession session, Message message
}
}

async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message)
async Task AbandonMessageIfNeededAsync(IMessageSession session, Message message, IDictionary<string, object> propertiesToModify)
{
try
{
if (session.ReceiveMode == ReceiveMode.PeekLock)
{
await session.AbandonAsync(message.SystemProperties.LockToken).ConfigureAwait(false);
await session.AbandonAsync(message.SystemProperties.LockToken, propertiesToModify).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down Expand Up @@ -239,11 +240,11 @@ async Task MessagePumpTaskAsync(IMessageSession session)
}

MessagingEventSource.Log.MessageReceivePumpTaskException(this.clientId, session.SessionId, exception);
await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
var propertiesToModify = await this.RaiseExceptionReceived(exception, ExceptionReceivedEventArgsAction.UserCallback).ConfigureAwait(false);
callbackExceptionOccurred = true;
if (!(exception is MessageLockLostException || exception is SessionLockLostException))
{
await this.AbandonMessageIfNeededAsync(session, message).ConfigureAwait(false);
await this.AbandonMessageIfNeededAsync(session, message, propertiesToModify).ConfigureAwait(false);
}
}
finally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,9 @@ namespace Microsoft.Azure.ServiceBus
public sealed class MessageHandlerOptions
{
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public MessageHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> exceptionReceivedHandler) { }
public bool AutoComplete { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> ExceptionReceivedHandler { get; }
public System.TimeSpan MaxAutoRenewDuration { get; set; }
public int MaxConcurrentCalls { get; set; }
}
Expand Down Expand Up @@ -389,8 +390,9 @@ namespace Microsoft.Azure.ServiceBus
public sealed class SessionHandlerOptions
{
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> exceptionReceivedHandler) { }
public SessionHandlerOptions(System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> exceptionReceivedHandler) { }
public bool AutoComplete { get; set; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task> ExceptionReceivedHandler { get; }
public System.Func<Microsoft.Azure.ServiceBus.ExceptionReceivedEventArgs, System.Threading.Tasks.Task<System.Collections.Generic.IDictionary<string, object>>> ExceptionReceivedHandler { get; }
public System.TimeSpan MaxAutoRenewDuration { get; set; }
public int MaxConcurrentSessions { get; set; }
public System.TimeSpan MessageWaitTimeout { get; set; }
Expand Down