Skip to content

Commit ee89e10

Browse files
authored
Refactor the AbstractLifeCycle (#25)
* Refactor the AbstractLifeCycle * Create a common class to deal with Open and Close functions and status --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent d61e84e commit ee89e10

File tree

11 files changed

+153
-131
lines changed

11 files changed

+153
-131
lines changed

RabbitMQ.AMQP.Client/IConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ namespace RabbitMQ.AMQP.Client;
44

55
public class ConnectionException(string? message) : Exception(message);
66

7-
public interface IConnection : IResourceStatus, IClosable
7+
public interface IConnection : ILifeCycle
88
{
99
IManagement Management();
1010

RabbitMQ.AMQP.Client/IConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace RabbitMQ.AMQP.Client;
66
public class ConsumerException(string message) : Exception(message);
77
public delegate void MessageHandler(IContext context, IMessage message);
88

9-
public interface IConsumer : IResourceStatus, IClosable
9+
public interface IConsumer : ILifeCycle
1010
{
1111
void Pause();
1212

RabbitMQ.AMQP.Client/IClosable.cs renamed to RabbitMQ.AMQP.Client/ILifeCycle.cs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public enum State
99
Closed,
1010
}
1111

12+
1213
public class Error(string? errorCode, string? description)
1314
{
1415
public string? Description { get; } = description;
@@ -20,19 +21,15 @@ public override string ToString()
2021
}
2122
}
2223

23-
public interface IClosable // TODO: Create an abstract class with the event and the State property
24-
{
25-
Task CloseAsync();
2624

27-
}
2825

2926
public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);
3027

31-
public interface IResourceStatus
28+
public interface ILifeCycle
3229
{
30+
Task CloseAsync();
31+
3332
public State State { get; }
34-
35-
33+
3634
event LifeCycleCallBack ChangeState;
37-
3835
}

RabbitMQ.AMQP.Client/IManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ public class ModelException(string message) : Exception(message);
44

55
public class PreconditionFailedException(string message) : Exception(message);
66

7-
public interface IManagement : IResourceStatus, IClosable
7+
public interface IManagement : ILifeCycle
88
{
99
IQueueSpecification Queue();
1010
IQueueSpecification Queue(string name);

RabbitMQ.AMQP.Client/IPublisher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class OutcomeDescriptor(ulong code, string description, OutcomeState stat
1919

2020
public delegate void OutcomeDescriptorCallback(IMessage message, OutcomeDescriptor outcomeDescriptor);
2121

22-
public interface IPublisher : IResourceStatus, IClosable
22+
public interface IPublisher : ILifeCycle
2323
{
2424
Task Publish(IMessage message,
2525
OutcomeDescriptorCallback outcomeCallback); // TODO: Add CancellationToken and callBack

RabbitMQ.AMQP.Client/Impl/AbstractResourceStatus.cs renamed to RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,18 @@ namespace RabbitMQ.AMQP.Client.Impl;
22

33
public class AmqpClosedException(string message) : Exception(message);
44

5-
public abstract class AbstractResourceStatus : IResourceStatus
5+
public abstract class AbstractLifeCycle : ILifeCycle
66
{
7+
protected virtual Task OpenAsync()
8+
{
9+
OnNewStatus(State.Open, null);
10+
return Task.CompletedTask;
11+
}
12+
13+
public abstract Task CloseAsync();
14+
715
public State State { get; internal set; } = State.Closed;
16+
817
protected void ThrowIfClosed()
918
{
1019
if (State == State.Closed)
@@ -13,6 +22,9 @@ protected void ThrowIfClosed()
1322
}
1423
}
1524

25+
// wait until the close operation is completed
26+
protected readonly TaskCompletionSource<bool> ConnectionCloseTaskCompletionSource =
27+
new(TaskCreationOptions.RunContinuationsAsynchronously);
1628

1729
protected void OnNewStatus(State newState, Error? error)
1830
{

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 40 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ await Management.Queue(spec).Declare()
3333
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
3434
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
3535
/// </summary>
36-
public class AmqpConnection : AbstractResourceStatus, IConnection
36+
public class AmqpConnection : AbstractLifeCycle, IConnection
3737
{
3838
private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED";
3939
private const string ConnectionNotRecoveredMessage = "Connection not recovered";
@@ -43,14 +43,12 @@ public class AmqpConnection : AbstractResourceStatus, IConnection
4343
// The native AMQP.Net Lite connection
4444
private Connection? _nativeConnection;
4545

46-
private readonly AmqpManagement _management = new();
46+
private readonly AmqpManagement _management;
4747
private readonly RecordingTopologyListener _recordingTopologyListener = new();
4848

49-
private readonly TaskCompletionSource<bool> _connectionCloseTaskCompletionSource =
50-
new(TaskCreationOptions.RunContinuationsAsynchronously);
5149

5250
private readonly ConnectionSettings _connectionSettings;
53-
internal readonly AmqpSessionManagement NativePubSubSessions;
51+
internal readonly AmqpSessionManagement _nativePubSubSessions;
5452

5553
// TODO: Implement the semaphore to avoid multiple connections
5654
// private readonly SemaphoreSlim _semaphore = new(1, 1);
@@ -76,34 +74,18 @@ public ReadOnlyCollection<IPublisher> GetPublishers()
7674
/// Creates a new instance of <see cref="AmqpConnection"/>
7775
/// Through the Connection is possible to create:
7876
/// - Management. See <see cref="AmqpManagement"/>
79-
/// - Publishers and Consumers: TODO: Implement
77+
/// - Publishers and Consumers: See <see cref="AmqpPublisherBuilder"/> and <see cref="AmqpConsumerBuilder"/>
8078
/// </summary>
8179
/// <param name="connectionSettings"></param>
8280
/// <returns></returns>
8381
public static async Task<IConnection> CreateAsync(ConnectionSettings connectionSettings)
8482
{
8583
var connection = new AmqpConnection(connectionSettings);
86-
await connection.ConnectAsync()
84+
await connection.OpenAsync()
8785
.ConfigureAwait(false);
8886
return connection;
8987
}
9088

91-
private void PauseAllPublishers()
92-
{
93-
foreach (AmqpPublisher publisher in Publishers.Values)
94-
{
95-
publisher.PausePublishing();
96-
}
97-
}
98-
99-
private void ResumeAllPublishers()
100-
{
101-
foreach (AmqpPublisher publisher in Publishers.Values)
102-
{
103-
publisher.ResumePublishing();
104-
}
105-
}
106-
10789

10890
/// <summary>
10991
/// Closes all the publishers. It is called when the connection is closed.
@@ -133,7 +115,9 @@ await consumer.CloseAsync()
133115
private AmqpConnection(ConnectionSettings connectionSettings)
134116
{
135117
_connectionSettings = connectionSettings;
136-
NativePubSubSessions = new AmqpSessionManagement(this, 1);
118+
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
119+
_management =
120+
new AmqpManagement(new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));
137121
}
138122

139123
public IManagement Management()
@@ -146,50 +130,49 @@ public IConsumerBuilder ConsumerBuilder()
146130
return new AmqpConsumerBuilder(this);
147131
}
148132

149-
private Task ConnectAsync()
133+
protected override Task OpenAsync()
150134
{
151135
EnsureConnection();
152-
OnNewStatus(State.Open, null);
153-
return Task.CompletedTask;
136+
return base.OpenAsync();
154137
}
155138

156139
private void EnsureConnection()
157140
{
158141
// await _semaphore.WaitAsync();
159142
try
160143
{
161-
if (_nativeConnection == null || _nativeConnection.IsClosed)
144+
if (_nativeConnection is { IsClosed: false })
162145
{
163-
var open = new Open
164-
{
165-
HostName = $"vhost:{_connectionSettings.VirtualHost()}",
166-
Properties = new Fields()
167-
{
168-
[new Symbol("connection_name")] = _connectionSettings.ConnectionName(),
169-
}
170-
};
171-
172-
var manualReset = new ManualResetEvent(false);
173-
// TODO ConnectionFactory.CreateAsync
174-
_nativeConnection = new Connection(_connectionSettings.Address, null, open, (connection, open1) =>
175-
{
176-
manualReset.Set();
177-
Trace.WriteLine(TraceLevel.Information, $"Connection opened. Info: {ToString()}");
178-
OnNewStatus(State.Open, null);
179-
});
146+
return;
147+
}
180148

181-
manualReset.WaitOne(TimeSpan.FromSeconds(5));
182-
if (_nativeConnection.IsClosed)
149+
var open = new Open
150+
{
151+
HostName = $"vhost:{_connectionSettings.VirtualHost()}",
152+
Properties = new Fields()
183153
{
184-
throw new ConnectionException(
185-
$"Connection failed. Info: {ToString()}, error: {_nativeConnection.Error}");
154+
[new Symbol("connection_name")] = _connectionSettings.ConnectionName(),
186155
}
156+
};
187157

188-
_management.Init(
189-
new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));
158+
var manualReset = new ManualResetEvent(false);
159+
_nativeConnection = new Connection(_connectionSettings.Address, null, open, (connection, open1) =>
160+
{
161+
manualReset.Set();
162+
Trace.WriteLine(TraceLevel.Verbose, $"Connection opened. Info: {ToString()}");
163+
OnNewStatus(State.Open, null);
164+
});
190165

191-
_nativeConnection.Closed += MaybeRecoverConnection();
166+
manualReset.WaitOne(TimeSpan.FromSeconds(5));
167+
if (_nativeConnection.IsClosed)
168+
{
169+
throw new ConnectionException(
170+
$"Connection failed. Info: {ToString()}, error: {_nativeConnection.Error}");
192171
}
172+
173+
_management.Init();
174+
175+
_nativeConnection.Closed += MaybeRecoverConnection();
193176
}
194177

195178
catch (AmqpException e)
@@ -310,7 +293,7 @@ await _recordingTopologyListener.Accept(visitor)
310293
_semaphoreClose.Release();
311294
}
312295

313-
_connectionCloseTaskCompletionSource.SetResult(true);
296+
ConnectionCloseTaskCompletionSource.SetResult(true);
314297
};
315298
}
316299

@@ -327,7 +310,7 @@ public IPublisherBuilder PublisherBuilder()
327310
}
328311

329312

330-
public async Task CloseAsync()
313+
public override async Task CloseAsync()
331314
{
332315
await _semaphoreClose.WaitAsync()
333316
.ConfigureAwait(false);
@@ -347,21 +330,19 @@ await _semaphoreClose.WaitAsync()
347330

348331
await _management.CloseAsync()
349332
.ConfigureAwait(false);
350-
333+
351334
if (_nativeConnection is { IsClosed: false })
352335
{
353336
await _nativeConnection.CloseAsync()
354337
.ConfigureAwait(false);
355338
}
356-
357-
358339
}
359340
finally
360341
{
361342
_semaphoreClose.Release();
362343
}
363344

364-
await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
345+
await ConnectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(10))
365346
.ConfigureAwait(false);
366347

367348
OnNewStatus(State.Closed, null);
@@ -370,7 +351,7 @@ await _connectionCloseTaskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(1
370351

371352
public override string ToString()
372353
{
373-
var info = $"AmqpConnection{{ConnectionSettings='{_connectionSettings}', Status='{State.ToString()}'}}";
354+
string info = $"AmqpConnection{{ConnectionSettings='{_connectionSettings}', Status='{State.ToString()}'}}";
374355
return info;
375356
}
376357
}

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
namespace RabbitMQ.AMQP.Client.Impl;
55

6-
public class AmqpConsumer : AbstractResourceStatus, IConsumer
6+
public class AmqpConsumer : AbstractLifeCycle, IConsumer
77
{
88
private readonly AmqpConnection _connection;
99
private readonly string _address;
@@ -20,17 +20,17 @@ public AmqpConsumer(AmqpConnection connection, string address,
2020
_messageHandler = messageHandler;
2121
_initialCredits = initialCredits;
2222
_filters = filters;
23-
Connect();
24-
_connection.Consumers.TryAdd(Id, this); // TODO: Close all consumers on connection close
23+
OpenAsync();
24+
_connection.Consumers.TryAdd(Id, this);
2525
}
2626

2727

28-
private void Connect()
28+
protected sealed override Task OpenAsync()
2929
{
3030
try
3131
{
3232
var attachCompleted = new ManualResetEvent(false);
33-
_receiverLink = new ReceiverLink(_connection.NativePubSubSessions.GetOrCreateSession(), Id,
33+
_receiverLink = new ReceiverLink(_connection._nativePubSubSessions.GetOrCreateSession(), Id,
3434
Utils.CreateAttach(_address, DeliveryMode.AtLeastOnce, Id, _filters),
3535
(link, attach) => { attachCompleted.Set(); });
3636

@@ -48,6 +48,8 @@ private void Connect()
4848
{
4949
throw new ConsumerException($"Failed to create receiver link, {e}");
5050
}
51+
52+
return Task.CompletedTask;
5153
}
5254

5355
private void ProcessMessages()
@@ -61,7 +63,7 @@ private void ProcessMessages()
6163
});
6264
}
6365

64-
public string Id { get; } = Guid.NewGuid().ToString();
66+
private string Id { get; } = Guid.NewGuid().ToString();
6567

6668
public void Pause()
6769
{
@@ -79,7 +81,7 @@ public void Unpause()
7981
}
8082

8183

82-
public async Task CloseAsync()
84+
public override async Task CloseAsync()
8385
{
8486
if (_receiverLink == null)
8587
{

0 commit comments

Comments
 (0)