Skip to content

Commit 64564b2

Browse files
authored
Implement the Environment class to manage the connections (#36)
* Implement the Environment Closes #35 --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 173387d commit 64564b2

File tree

13 files changed

+259
-37
lines changed

13 files changed

+259
-37
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ The client is distributed via [NuGet](https://www.nuget.org/packages/RabbitMQ.AM
2929
- [x] Recovery queues on connection lost
3030
- [x] Recovery publishers on connection lost
3131
- [x] Recovery consumers on connection lost
32+
- [x] Implement Environment to manage the connections
3233
- [ ] Complete the consumer part with `pause` and `unpause`
3334
- [ ] Complete the binding/unbinding with the special characters
3435
- [ ] Complete the queues/exchanges name with the special characters

RabbitMQ.AMQP.Client/IConnection.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,8 @@ public interface IConnection : ILifeCycle
2222
IConsumerBuilder ConsumerBuilder();
2323

2424
public ReadOnlyCollection<IPublisher> GetPublishers();
25+
26+
public ReadOnlyCollection<IConsumer> GetConsumers();
27+
28+
public long Id { get; set; }
2529
}

RabbitMQ.AMQP.Client/IConnectionSettings.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ public interface IConnectionSettings : IEquatable<IConnectionSettings>
1717
bool UseSsl { get; }
1818
SaslMechanism SaslMechanism { get; }
1919
ITlsSettings? TlsSettings { get; }
20+
21+
IRecoveryConfiguration Recovery { get; }
22+
2023
}
2124

2225
/// <summary>
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System.Collections.ObjectModel;
2+
3+
namespace RabbitMQ.AMQP.Client;
4+
5+
6+
/// <summary>
7+
/// Interface to create IConnections and manage them.
8+
/// </summary>
9+
public interface IEnvironment
10+
{
11+
/// <summary>
12+
/// Create a new connection with the given connection settings.
13+
/// </summary>
14+
/// <param name="connectionSettings"></param>
15+
/// <returns>IConnection</returns>
16+
public Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings);
17+
18+
19+
/// <summary>
20+
/// Create a new connection with the default connection settings.
21+
/// </summary>
22+
/// <returns>IConnection</returns>
23+
24+
public Task<IConnection> CreateConnectionAsync();
25+
26+
27+
public ReadOnlyCollection<IConnection> GetConnections();
28+
29+
/// <summary>
30+
/// Close all connections.
31+
/// </summary>
32+
/// <returns></returns>
33+
34+
Task CloseAsync();
35+
}

RabbitMQ.AMQP.Client/IRecoveryConfiguration.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public interface IRecoveryConfiguration
2424
/// <returns></returns>
2525
IRecoveryConfiguration BackOffDelayPolicy(IBackOffDelayPolicy backOffDelayPolicy);
2626

27+
IBackOffDelayPolicy GetBackOffDelayPolicy();
28+
2729
/// <summary>
2830
/// Define if the recovery of the topology is activated.
2931
/// When Activated the connection will try to recover the topology after a reconnection.

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ private async Task ReconnectConsumers()
9595
}
9696
}
9797

98-
private readonly ConnectionSettings _connectionSettings;
98+
private readonly IConnectionSettings _connectionSettings;
9999
internal readonly AmqpSessionManagement _nativePubSubSessions;
100100

101101
// TODO: Implement the semaphore to avoid multiple connections
@@ -116,6 +116,13 @@ public ReadOnlyCollection<IPublisher> GetPublishers()
116116
return Publishers.Values.ToList().AsReadOnly();
117117
}
118118

119+
public ReadOnlyCollection<IConsumer> GetConsumers()
120+
{
121+
return Consumers.Values.ToList().AsReadOnly();
122+
}
123+
124+
public long Id { get; set; }
125+
119126
/// <summary>
120127
/// Creates a new instance of <see cref="AmqpConnection"/>
121128
/// Through the Connection is possible to create:
@@ -124,7 +131,7 @@ public ReadOnlyCollection<IPublisher> GetPublishers()
124131
/// </summary>
125132
/// <param name="connectionSettings"></param>
126133
/// <returns></returns>
127-
public static async Task<IConnection> CreateAsync(ConnectionSettings connectionSettings)
134+
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings)
128135
{
129136
var connection = new AmqpConnection(connectionSettings);
130137
await connection.OpenAsync()
@@ -158,7 +165,7 @@ await consumer.CloseAsync()
158165
}
159166
}
160167

161-
private AmqpConnection(ConnectionSettings connectionSettings)
168+
private AmqpConnection(IConnectionSettings connectionSettings)
162169
{
163170
_connectionSettings = connectionSettings;
164171
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
@@ -198,10 +205,7 @@ private async Task EnsureConnection()
198205
var open = new Open
199206
{
200207
HostName = $"vhost:{_connectionSettings.VirtualHost}",
201-
Properties = new Fields()
202-
{
203-
[new Symbol("connection_name")] = _connectionSettings.ConnectionName,
204-
}
208+
Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ConnectionName, }
205209
};
206210

207211
void onOpened(Amqp.IConnection connection, Open open1)
@@ -224,12 +228,14 @@ void onOpened(Amqp.IConnection connection, Open open1)
224228

225229
if (_connectionSettings.TlsSettings.LocalCertificateSelectionCallback is not null)
226230
{
227-
cf.SSL.LocalCertificateSelectionCallback = _connectionSettings.TlsSettings.LocalCertificateSelectionCallback;
231+
cf.SSL.LocalCertificateSelectionCallback =
232+
_connectionSettings.TlsSettings.LocalCertificateSelectionCallback;
228233
}
229234

230235
if (_connectionSettings.TlsSettings.RemoteCertificateValidationCallback is not null)
231236
{
232-
cf.SSL.RemoteCertificateValidationCallback = _connectionSettings.TlsSettings.RemoteCertificateValidationCallback;
237+
cf.SSL.RemoteCertificateValidationCallback =
238+
_connectionSettings.TlsSettings.RemoteCertificateValidationCallback;
233239
}
234240
}
235241

@@ -240,7 +246,7 @@ void onOpened(Amqp.IConnection connection, Open open1)
240246

241247
try
242248
{
243-
_nativeConnection = await cf.CreateAsync(_connectionSettings.Address, open: open, onOpened: onOpened)
249+
_nativeConnection = await cf.CreateAsync((_connectionSettings as ConnectionSettings)?.Address, open: open, onOpened: onOpened)
244250
.ConfigureAwait(false);
245251
}
246252
catch (Exception ex)
@@ -294,7 +300,7 @@ private ClosedCallback MaybeRecoverConnection()
294300
// we have to check if the recovery is active.
295301
// The user may want to disable the recovery mechanism
296302
// the user can use the lifecycle callback to handle the error
297-
if (!_connectionSettings.RecoveryConfiguration.IsActivate())
303+
if (!_connectionSettings.Recovery.IsActivate())
298304
{
299305
OnNewStatus(State.Closed, Utils.ConvertError(error));
300306
ChangeEntitiesStatus(State.Closed, Utils.ConvertError(error));
@@ -317,19 +323,19 @@ await Task.Run(async () =>
317323
// the user may want to disable the backoff policy or
318324
// the backoff policy is not active due of some condition
319325
// for example: Reaching the maximum number of retries and avoid the forever loop
320-
_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().IsActive() &&
326+
_connectionSettings.Recovery.GetBackOffDelayPolicy().IsActive() &&
321327

322328
// even we set the status to reconnecting up, we need to check if the connection is still in the
323329
// reconnecting status. The user may close the connection in the meanwhile
324330
State == State.Reconnecting)
325331
{
326332
try
327333
{
328-
int next = _connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Delay();
334+
int next = _connectionSettings.Recovery.GetBackOffDelayPolicy().Delay();
329335

330336
Trace.WriteLine(TraceLevel.Information,
331337
$"Trying Recovering connection in {next} milliseconds, " +
332-
$"attempt: {_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().CurrentAttempt}. " +
338+
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. " +
333339
$"Info: {ToString()})");
334340

335341
await Task.Delay(TimeSpan.FromMilliseconds(next))
@@ -346,7 +352,7 @@ await EnsureConnection()
346352
}
347353
}
348354

349-
_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Reset();
355+
_connectionSettings.Recovery.GetBackOffDelayPolicy().Reset();
350356
string connectionDescription = connected ? "recovered" : "not recovered";
351357
Trace.WriteLine(TraceLevel.Information,
352358
$"Connection {connectionDescription}. Info: {ToString()}");
@@ -356,15 +362,15 @@ await EnsureConnection()
356362
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
357363
OnNewStatus(State.Closed,
358364
new Error(ConnectionNotRecoveredCode,
359-
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}"));
365+
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));
360366

361367
ChangeEntitiesStatus(State.Closed, new Error(ConnectionNotRecoveredCode,
362-
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}"));
368+
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));
363369

364370
return;
365371
}
366372

367-
if (_connectionSettings.RecoveryConfiguration.IsTopologyActive())
373+
if (_connectionSettings.Recovery.IsTopologyActive())
368374
{
369375
Trace.WriteLine(TraceLevel.Information, $"Recovering topology. Info: {ToString()}");
370376
var visitor = new Visitor(_management);
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
using System.Collections.Concurrent;
2+
using System.Collections.ObjectModel;
3+
4+
namespace RabbitMQ.AMQP.Client.Impl;
5+
6+
public class AmqpEnvironment : IEnvironment
7+
{
8+
private IConnectionSettings? ConnectionSettings { get; }
9+
private long _sequentialId = 0;
10+
private readonly ConcurrentDictionary<long, IConnection> _connections = [];
11+
12+
private AmqpEnvironment(IConnectionSettings connectionSettings)
13+
{
14+
ConnectionSettings = connectionSettings;
15+
}
16+
17+
public static Task<IEnvironment> CreateAsync(IConnectionSettings connectionSettings)
18+
{
19+
return Task.FromResult<IEnvironment>(new AmqpEnvironment(connectionSettings));
20+
}
21+
22+
public async Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings)
23+
{
24+
IConnection c = await AmqpConnection.CreateAsync(connectionSettings).ConfigureAwait(false);
25+
c.Id = Interlocked.Increment(ref _sequentialId);
26+
_connections.TryAdd(c.Id, c);
27+
c.ChangeState += (sender, previousState, currentState, failureCause) =>
28+
{
29+
if (currentState != State.Closed)
30+
{
31+
return;
32+
}
33+
34+
if (sender is IConnection connection)
35+
{
36+
_connections.TryRemove(connection.Id, out _);
37+
}
38+
};
39+
return c;
40+
}
41+
42+
public async Task<IConnection> CreateConnectionAsync()
43+
{
44+
if (ConnectionSettings != null)
45+
{
46+
return await CreateConnectionAsync(ConnectionSettings).ConfigureAwait(false);
47+
}
48+
49+
throw new ConnectionException("Connection settings are not set");
50+
}
51+
52+
public ReadOnlyCollection<IConnection> GetConnections() =>
53+
new(_connections.Values.ToList());
54+
55+
public Task CloseAsync() => Task.WhenAll(_connections.Values.Select(c => c.CloseAsync()));
56+
}

RabbitMQ.AMQP.Client/Impl/ConnectionSettings.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public ConnectionSettings Build()
9393
_password, _virtualHost,
9494
_scheme, _connectionName, _saslMechanism)
9595
{
96-
RecoveryConfiguration = (RecoveryConfiguration)_recoveryConfiguration
96+
Recovery = (RecoveryConfiguration)_recoveryConfiguration
9797
};
9898

9999
return c;
@@ -153,6 +153,8 @@ public ConnectionSettings(string host, int port,
153153
public SaslMechanism SaslMechanism => _saslMechanism;
154154

155155
public ITlsSettings? TlsSettings => _tlsSettings;
156+
public IRecoveryConfiguration Recovery { get; init; } = RecoveryConfiguration.Create();
157+
156158

157159
public override string ToString()
158160
{
@@ -220,7 +222,7 @@ public bool Equals(IConnectionSettings? other)
220222

221223
internal Address Address => _address;
222224

223-
public RecoveryConfiguration RecoveryConfiguration { get; set; } = RecoveryConfiguration.Create();
225+
// public RecoveryConfiguration RecoveryConfiguration { get; set; } = RecoveryConfiguration.Create();
224226
}
225227

226228
/// <summary>

0 commit comments

Comments
 (0)