Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AM
- [x] Recovery queues on connection lost
- [x] Recovery publishers on connection lost
- [x] Recovery consumers on connection lost
- [x] Implement Environment to manage the connections
- [ ] Complete the consumer part with `pause` and `unpause`
- [ ] Complete the binding/unbinding with the special characters
- [ ] Complete the queues/exchanges name with the special characters
Expand Down
4 changes: 4 additions & 0 deletions RabbitMQ.AMQP.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ public interface IConnection : ILifeCycle
IConsumerBuilder ConsumerBuilder();

public ReadOnlyCollection<IPublisher> GetPublishers();

public ReadOnlyCollection<IConsumer> GetConsumers();

public long Id { get; set; }
}
3 changes: 3 additions & 0 deletions RabbitMQ.AMQP.Client/IConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public interface IConnectionSettings : IEquatable<IConnectionSettings>
bool UseSsl { get; }
SaslMechanism SaslMechanism { get; }
ITlsSettings? TlsSettings { get; }

IRecoveryConfiguration Recovery { get; }

}

/// <summary>
Expand Down
35 changes: 35 additions & 0 deletions RabbitMQ.AMQP.Client/IEnvironment.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System.Collections.ObjectModel;

namespace RabbitMQ.AMQP.Client;


/// <summary>
/// Interface to create IConnections and manage them.
/// </summary>
public interface IEnvironment
{
/// <summary>
/// Create a new connection with the given connection settings.
/// </summary>
/// <param name="connectionSettings"></param>
/// <returns>IConnection</returns>
public Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings);


/// <summary>
/// Create a new connection with the default connection settings.
/// </summary>
/// <returns>IConnection</returns>

public Task<IConnection> CreateConnectionAsync();


public ReadOnlyCollection<IConnection> GetConnections();

/// <summary>
/// Close all connections.
/// </summary>
/// <returns></returns>

Task CloseAsync();
}
2 changes: 2 additions & 0 deletions RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public interface IRecoveryConfiguration
/// <returns></returns>
IRecoveryConfiguration BackOffDelayPolicy(IBackOffDelayPolicy backOffDelayPolicy);

IBackOffDelayPolicy GetBackOffDelayPolicy();

/// <summary>
/// Define if the recovery of the topology is activated.
/// When Activated the connection will try to recover the topology after a reconnection.
Expand Down
42 changes: 24 additions & 18 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private async Task ReconnectConsumers()
}
}

private readonly ConnectionSettings _connectionSettings;
private readonly IConnectionSettings _connectionSettings;
internal readonly AmqpSessionManagement _nativePubSubSessions;

// TODO: Implement the semaphore to avoid multiple connections
Expand All @@ -116,6 +116,13 @@ public ReadOnlyCollection<IPublisher> GetPublishers()
return Publishers.Values.ToList().AsReadOnly();
}

public ReadOnlyCollection<IConsumer> GetConsumers()
{
return Consumers.Values.ToList().AsReadOnly();
}

public long Id { get; set; }

/// <summary>
/// Creates a new instance of <see cref="AmqpConnection"/>
/// Through the Connection is possible to create:
Expand All @@ -124,7 +131,7 @@ public ReadOnlyCollection<IPublisher> GetPublishers()
/// </summary>
/// <param name="connectionSettings"></param>
/// <returns></returns>
public static async Task<IConnection> CreateAsync(ConnectionSettings connectionSettings)
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings)
{
var connection = new AmqpConnection(connectionSettings);
await connection.OpenAsync()
Expand Down Expand Up @@ -158,7 +165,7 @@ await consumer.CloseAsync()
}
}

private AmqpConnection(ConnectionSettings connectionSettings)
private AmqpConnection(IConnectionSettings connectionSettings)
{
_connectionSettings = connectionSettings;
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
Expand Down Expand Up @@ -198,10 +205,7 @@ private async Task EnsureConnection()
var open = new Open
{
HostName = $"vhost:{_connectionSettings.VirtualHost}",
Properties = new Fields()
{
[new Symbol("connection_name")] = _connectionSettings.ConnectionName,
}
Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ConnectionName, }
};

void onOpened(Amqp.IConnection connection, Open open1)
Expand All @@ -224,12 +228,14 @@ void onOpened(Amqp.IConnection connection, Open open1)

if (_connectionSettings.TlsSettings.LocalCertificateSelectionCallback is not null)
{
cf.SSL.LocalCertificateSelectionCallback = _connectionSettings.TlsSettings.LocalCertificateSelectionCallback;
cf.SSL.LocalCertificateSelectionCallback =
_connectionSettings.TlsSettings.LocalCertificateSelectionCallback;
}

if (_connectionSettings.TlsSettings.RemoteCertificateValidationCallback is not null)
{
cf.SSL.RemoteCertificateValidationCallback = _connectionSettings.TlsSettings.RemoteCertificateValidationCallback;
cf.SSL.RemoteCertificateValidationCallback =
_connectionSettings.TlsSettings.RemoteCertificateValidationCallback;
}
}

Expand All @@ -240,7 +246,7 @@ void onOpened(Amqp.IConnection connection, Open open1)

try
{
_nativeConnection = await cf.CreateAsync(_connectionSettings.Address, open: open, onOpened: onOpened)
_nativeConnection = await cf.CreateAsync((_connectionSettings as ConnectionSettings)?.Address, open: open, onOpened: onOpened)
.ConfigureAwait(false);
}
catch (Exception ex)
Expand Down Expand Up @@ -294,7 +300,7 @@ private ClosedCallback MaybeRecoverConnection()
// we have to check if the recovery is active.
// The user may want to disable the recovery mechanism
// the user can use the lifecycle callback to handle the error
if (!_connectionSettings.RecoveryConfiguration.IsActivate())
if (!_connectionSettings.Recovery.IsActivate())
{
OnNewStatus(State.Closed, Utils.ConvertError(error));
ChangeEntitiesStatus(State.Closed, Utils.ConvertError(error));
Expand All @@ -317,19 +323,19 @@ await Task.Run(async () =>
// the user may want to disable the backoff policy or
// the backoff policy is not active due of some condition
// for example: Reaching the maximum number of retries and avoid the forever loop
_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().IsActive() &&
_connectionSettings.Recovery.GetBackOffDelayPolicy().IsActive() &&

// even we set the status to reconnecting up, we need to check if the connection is still in the
// reconnecting status. The user may close the connection in the meanwhile
State == State.Reconnecting)
{
try
{
int next = _connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Delay();
int next = _connectionSettings.Recovery.GetBackOffDelayPolicy().Delay();

Trace.WriteLine(TraceLevel.Information,
$"Trying Recovering connection in {next} milliseconds, " +
$"attempt: {_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().CurrentAttempt}. " +
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. " +
$"Info: {ToString()})");

await Task.Delay(TimeSpan.FromMilliseconds(next))
Expand All @@ -346,7 +352,7 @@ await EnsureConnection()
}
}

_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Reset();
_connectionSettings.Recovery.GetBackOffDelayPolicy().Reset();
string connectionDescription = connected ? "recovered" : "not recovered";
Trace.WriteLine(TraceLevel.Information,
$"Connection {connectionDescription}. Info: {ToString()}");
Expand All @@ -356,15 +362,15 @@ await EnsureConnection()
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
OnNewStatus(State.Closed,
new Error(ConnectionNotRecoveredCode,
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}"));
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));

ChangeEntitiesStatus(State.Closed, new Error(ConnectionNotRecoveredCode,
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}"));
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));

return;
}

if (_connectionSettings.RecoveryConfiguration.IsTopologyActive())
if (_connectionSettings.Recovery.IsTopologyActive())
{
Trace.WriteLine(TraceLevel.Information, $"Recovering topology. Info: {ToString()}");
var visitor = new Visitor(_management);
Expand Down
56 changes: 56 additions & 0 deletions RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using System.Collections.Concurrent;
using System.Collections.ObjectModel;

namespace RabbitMQ.AMQP.Client.Impl;

public class AmqpEnvironment : IEnvironment
{
private IConnectionSettings? ConnectionSettings { get; }
private long _sequentialId = 0;
private readonly ConcurrentDictionary<long, IConnection> _connections = [];

private AmqpEnvironment(IConnectionSettings connectionSettings)
{
ConnectionSettings = connectionSettings;
}

public static Task<IEnvironment> CreateAsync(IConnectionSettings connectionSettings)
{
return Task.FromResult<IEnvironment>(new AmqpEnvironment(connectionSettings));
}

public async Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings)
{
IConnection c = await AmqpConnection.CreateAsync(connectionSettings).ConfigureAwait(false);
c.Id = Interlocked.Increment(ref _sequentialId);
_connections.TryAdd(c.Id, c);
c.ChangeState += (sender, previousState, currentState, failureCause) =>
{
if (currentState != State.Closed)
{
return;
}

if (sender is IConnection connection)
{
_connections.TryRemove(connection.Id, out _);
}
};
return c;
}

public async Task<IConnection> CreateConnectionAsync()
{
if (ConnectionSettings != null)
{
return await CreateConnectionAsync(ConnectionSettings).ConfigureAwait(false);
}

throw new ConnectionException("Connection settings are not set");
}

public ReadOnlyCollection<IConnection> GetConnections() =>
new(_connections.Values.ToList());

public Task CloseAsync() => Task.WhenAll(_connections.Values.Select(c => c.CloseAsync()));
}
6 changes: 4 additions & 2 deletions RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public ConnectionSettings Build()
_password, _virtualHost,
_scheme, _connectionName, _saslMechanism)
{
RecoveryConfiguration = (RecoveryConfiguration)_recoveryConfiguration
Recovery = (RecoveryConfiguration)_recoveryConfiguration
};

return c;
Expand Down Expand Up @@ -153,6 +153,8 @@ public ConnectionSettings(string host, int port,
public SaslMechanism SaslMechanism => _saslMechanism;

public ITlsSettings? TlsSettings => _tlsSettings;
public IRecoveryConfiguration Recovery { get; init; } = RecoveryConfiguration.Create();


public override string ToString()
{
Expand Down Expand Up @@ -220,7 +222,7 @@ public bool Equals(IConnectionSettings? other)

internal Address Address => _address;

public RecoveryConfiguration RecoveryConfiguration { get; set; } = RecoveryConfiguration.Create();
// public RecoveryConfiguration RecoveryConfiguration { get; set; } = RecoveryConfiguration.Create();
}

/// <summary>
Expand Down
Loading