Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions projects/RabbitMQ.Client/client/api/ShutdownEventArgs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ namespace RabbitMQ.Client
/// </remarks>
public class ShutdownEventArgs : EventArgs
{
private readonly Exception _exception;

/// <summary>
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters and
/// 0 for <see cref="ClassId"/> and <see cref="MethodId"/>.
/// </summary>
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, object cause = null)
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
object cause = null)
: this(initiator, replyCode, replyText, 0, 0, cause)
{
}
Expand All @@ -64,6 +67,26 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
Cause = cause;
}

/// <summary>
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
/// </summary>
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception)
: this(initiator, replyCode, replyText, 0, 0)
{
_exception = exception ?? throw new ArgumentNullException(nameof(exception));
}

/// <summary>
/// Exception causing the shutdown, or null if none.
/// </summary>
public Exception Exception
{
get
{
return _exception;
}
}

/// <summary>
/// Object causing the shutdown, or null if none.
/// </summary>
Expand Down Expand Up @@ -104,7 +127,8 @@ public override string ToString()
+ (ReplyText != null ? $", text='{ReplyText}'" : string.Empty)
+ $", classId={ClassId}"
+ $", methodId={MethodId}"
+ (Cause != null ? $", cause={Cause}" : string.Empty);
+ (Cause != null ? $", cause={Cause}" : string.Empty)
+ (_exception != null ? $", exception={_exception}" : string.Empty);
}
}
}
11 changes: 11 additions & 0 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ internal abstract class ChannelBase : IChannel, IRecoverable
///<summary>Only used to kick-start a connection open
///sequence. See <see cref="Connection.OpenAsync"/> </summary>
internal TaskCompletionSource<ConnectionStartDetails> m_connectionStartCell;
private Exception m_connectionStartException = null;

// AMQP only allows one RPC operation to be active at a time.
protected readonly SemaphoreSlim _rpcSemaphore = new SemaphoreSlim(1, 1);
Expand Down Expand Up @@ -171,6 +172,16 @@ public IBasicConsumer DefaultConsumer

public ISession Session { get; private set; }

public Exception ConnectionStartException => m_connectionStartException;

public void MaybeSetConnectionStartException(Exception ex)
{
if (m_connectionStartCell != null)
{
m_connectionStartException = ex;
}
}

protected void TakeOver(ChannelBase other)
{
_basicAcksWrapper.Takeover(other._basicAcksWrapper);
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.Commands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ await _frameHandler.SendProtocolHeaderAsync()

if (connectionStart is null)
{
throw new IOException("connection.start was never received, likely due to a network timeout");
const string msg = "connection.start was never received, likely due to a network timeout";
throw new IOException(msg, _channel0.ConnectionStartException);
}

ServerProperties = connectionStart.m_serverProperties;
Expand Down
30 changes: 25 additions & 5 deletions projects/RabbitMQ.Client/client/impl/Connection.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,34 @@ await ReceiveLoop()
catch (EndOfStreamException eose)
{
// Possible heartbeat exception
HandleMainLoopException(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose));
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
0, "End of stream",
exception: eose);
HandleMainLoopException(ea);
}
catch (HardProtocolException hpe)
{
await HardProtocolExceptionHandler(hpe)
.ConfigureAwait(false);
}
catch (FileLoadException fileLoadException)
{
/*
* https://github.com/rabbitmq/rabbitmq-dotnet-client/issues/1434
* Ensure that these exceptions eventually make it to application code
*/
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError, fileLoadException.Message,
exception: fileLoadException);
HandleMainLoopException(ea);
}
catch (Exception ex)
{
HandleMainLoopException(new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, "Unexpected Exception", ex));
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
Constants.InternalError,
$"Unexpected Exception: {ex.Message}",
exception: ex);
HandleMainLoopException(ea);
}

FinishClose();
Expand Down Expand Up @@ -146,15 +164,17 @@ private void HandleMainLoopException(ShutdownEventArgs reason)
{
if (!SetCloseReason(reason))
{
// TODO reason.Cause could be an Exception, should we use that?
LogCloseError("Unexpected Main Loop Exception while closing: " + reason, new Exception(reason.ToString()));
LogCloseError($"Unexpected Main Loop Exception while closing: {reason}", reason.Exception);
return;
}

_channel0.MaybeSetConnectionStartException(reason.Exception);

OnShutdown(reason);
LogCloseError($"Unexpected connection closure: {reason}", new Exception(reason.ToString()));
LogCloseError($"Unexpected connection closure: {reason}", reason.Exception);
}

// TODO rename Async, add cancellation token?
private async Task HardProtocolExceptionHandler(HardProtocolException hpe)
{
if (SetCloseReason(hpe.ShutdownReason))
Expand Down
2 changes: 2 additions & 0 deletions projects/Test/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -787,10 +787,12 @@ namespace RabbitMQ.Client
}
public class ShutdownEventArgs : System.EventArgs
{
public ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string replyText, System.Exception exception) { }
public ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string replyText, object cause = null) { }
public ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string replyText, ushort classId, ushort methodId, object cause = null) { }
public object Cause { get; }
public ushort ClassId { get; }
public System.Exception Exception { get; }
public RabbitMQ.Client.ShutdownInitiator Initiator { get; }
public ushort MethodId { get; }
public ushort ReplyCode { get; }
Expand Down