Skip to content

Commit 7a43277

Browse files
authored
back off recovery (#8)
* back off recovery --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent d3e16d7 commit 7a43277

15 files changed

+600
-144
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
# RabbitMQ Amqp1.0 DotNet Client
1+
# RabbitMQ AMQP 1.0 DotNet Client
22

3-
See the [internal documentation](https://docs.google.com/document/d/1afO2ugGpTIZYUeXH_0GtMxedV51ZzmsbC3-mRdoSI_o/edit#heading=h.kqd38uu4iku)
3+
4+
This library is in early stages of development.
5+
It is meant to be used with RabbitMQ 4.0.
46

57

68

RabbitMQ.AMQP.Client/IClosable.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
namespace RabbitMQ.AMQP.Client;
22

3-
public enum Status
3+
public enum State
44
{
5-
Closed,
6-
Reconneting,
5+
// Opening,
76
Open,
7+
Reconnecting,
8+
Closing,
9+
Closed,
810
}
911

1012
public class Error
@@ -15,13 +17,11 @@ public class Error
1517

1618
public interface IClosable
1719
{
18-
public Status Status { get; }
20+
public State State { get; }
1921

2022
Task CloseAsync();
2123

22-
public delegate void ChangeStatusCallBack(object sender, Status from, Status to, Error? error);
24+
public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);
2325

24-
event ChangeStatusCallBack ChangeStatus;
25-
26-
26+
event LifeCycleCallBack ChangeState;
2727
}

RabbitMQ.AMQP.Client/IEntities.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ public interface IEntityInfo
44
{
55
}
66

7+
8+
/// <summary>
9+
/// Generic interface for declaring entities
10+
/// </summary>
11+
/// <typeparam name="T"></typeparam>
712
public interface IEntityDeclaration<T> where T : IEntityInfo
813
{
914
Task<T> Declare();

RabbitMQ.AMQP.Client/IManagement.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ namespace RabbitMQ.AMQP.Client;
22

33
public class ModelException(string message) : Exception(message);
44

5-
public class PreconditionFailException(string message) : Exception(message);
5+
public class PreconditionFailedException(string message) : Exception(message);
66

77
public interface IManagement : IClosable
88
{
Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,63 @@
11
namespace RabbitMQ.AMQP.Client;
22

3+
4+
/// <summary>
5+
/// Interface for the recovery configuration.
6+
/// </summary>
37
public interface IRecoveryConfiguration
48
{
9+
/// <summary>
10+
/// Define if the recovery is activated.
11+
/// If is not activated the connection will not try to reconnect.
12+
/// </summary>
13+
/// <param name="activated"></param>
14+
/// <returns></returns>
515
IRecoveryConfiguration Activated(bool activated);
616

717
bool IsActivate();
818

9-
// IRecoveryConfiguration BackOffDelayPolicy(BackOffDelayPolicy backOffDelayPolicy);
19+
/// <summary>
20+
/// Define the backoff delay policy.
21+
/// It is used when the connection is trying to reconnect.
22+
/// </summary>
23+
/// <param name="backOffDelayPolicy"></param>
24+
/// <returns></returns>
25+
IRecoveryConfiguration BackOffDelayPolicy(IBackOffDelayPolicy backOffDelayPolicy);
1026

27+
/// <summary>
28+
/// Define if the recovery of the topology is activated.
29+
/// When Activated the connection will try to recover the topology after a reconnection.
30+
/// It is valid only if the recovery is activated.
31+
/// </summary>
32+
/// <param name="activated"></param>
33+
/// <returns></returns>
1134
IRecoveryConfiguration Topology(bool activated);
1235

1336
bool IsTopologyActive();
1437

38+
}
39+
40+
/// <summary>
41+
/// Interface for the backoff delay policy.
42+
/// Used during the recovery of the connection.
43+
/// </summary>
44+
public interface IBackOffDelayPolicy
45+
{
46+
/// <summary>
47+
/// Get the next delay in milliseconds.
48+
/// </summary>
49+
/// <returns></returns>
50+
int Delay();
51+
52+
/// <summary>
53+
/// Reset the backoff delay policy.
54+
/// </summary>
55+
void Reset();
56+
57+
/// <summary>
58+
/// Define if the backoff delay policy is active.
59+
/// Can be used to disable the backoff delay policy after a certain number of retries.
60+
/// or when the user wants to disable the backoff delay policy.
61+
/// </summary>
62+
bool IsActive { get; }
1563
}

RabbitMQ.AMQP.Client/ITopologyListener.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,8 @@ public interface ITopologyListener
55
void QueueDeclared(IQueueSpecification specification);
66

77
void QueueDeleted(string name);
8+
9+
void Clear();
10+
11+
int QueueCount();
812
}

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 94 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,47 @@ internal class Visitor(AmqpManagement management) : IVisitor
88
{
99
private AmqpManagement Management { get; } = management;
1010

11-
12-
public void VisitQueues(List<QueueSpec> queueSpec)
11+
public async Task VisitQueues(List<QueueSpec> queueSpec)
1312
{
1413
foreach (var spec in queueSpec)
1514
{
1615
Trace.WriteLine(TraceLevel.Information, $"Recovering queue {spec.Name}");
17-
Management.Queue(spec).Declare();
16+
await Management.Queue(spec).Declare();
1817
}
1918
}
2019
}
2120

22-
2321
/// <summary>
2422
/// AmqpConnection is the concrete implementation of <see cref="IConnection"/>
2523
/// It is a wrapper around the AMQP.Net Lite <see cref="Connection"/> class
2624
/// </summary>
2725
public class AmqpConnection : IConnection
2826
{
27+
private const string ConnectionNotRecoveredCode = "CONNECTION_NOT_RECOVERED";
28+
private const string ConnectionNotRecoveredMessage = "Connection not recovered";
29+
2930
// The native AMQP.Net Lite connection
3031
private Connection? _nativeConnection;
3132
private readonly AmqpManagement _management = new();
33+
34+
3235
private readonly RecordingTopologyListener _recordingTopologyListener = new();
36+
3337
private readonly ConnectionSettings _connectionSettings;
3438

3539
/// <summary>
3640
/// Creates a new instance of <see cref="AmqpConnection"/>
41+
/// Through the Connection is possible to create:
42+
/// - Management. See <see cref="AmqpManagement"/>
43+
/// - Publishers and Consumers: TODO: Implement
3744
/// </summary>
3845
/// <param name="connectionSettings"></param>
3946
/// <returns></returns>
4047
public static async Task<AmqpConnection> CreateAsync(ConnectionSettings connectionSettings)
4148
{
4249
var connection = new AmqpConnection(connectionSettings);
4350
await connection.EnsureConnectionAsync();
51+
4452
return connection;
4553
}
4654

@@ -86,75 +94,116 @@ [new Symbol("connection_name")] = _connectionSettings.ConnectionName(),
8694
_nativeConnection.AddClosedCallback(MaybeRecoverConnection());
8795
}
8896

89-
OnNewStatus(Status.Open, null);
97+
OnNewStatus(State.Open, null);
9098
}
9199
catch (AmqpException e)
92100
{
93-
throw new ConnectionException("AmqpException: Connection failed", e);
101+
throw new ConnectionException($"AmqpException: Connection failed. Info: {ToString()} ", e);
94102
}
95103
catch (OperationCanceledException e)
96104
{
97105
// wrong virtual host
98-
throw new ConnectionException("OperationCanceledException: Connection failed", e);
106+
throw new ConnectionException($"OperationCanceledException: Connection failed. Info: {ToString()}", e);
99107
}
100108

101109
catch (NotSupportedException e)
102110
{
103111
// wrong schema
104-
throw new ConnectionException("NotSupportedException: Connection failed", e);
112+
throw new ConnectionException($"NotSupportedException: Connection failed. Info: {ToString()}", e);
105113
}
106114
}
107115

108-
109-
private void OnNewStatus(Status newStatus, Error? error)
116+
117+
private void OnNewStatus(State newState, Error? error)
110118
{
111-
if (Status == newStatus) return;
112-
var oldStatus = Status;
113-
Status = newStatus;
114-
ChangeStatus?.Invoke(this, oldStatus, newStatus, error);
119+
if (State == newState) return;
120+
var oldStatus = State;
121+
State = newState;
122+
ChangeState?.Invoke(this, oldStatus, newState, error);
115123
}
116124

117125
private ClosedCallback MaybeRecoverConnection()
118126
{
119-
return (sender, error) =>
127+
return async (sender, error) =>
120128
{
121129
if (error != null)
122130
{
123-
// TODO: Implement Dump Interface
124-
Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpected" +
125-
$"{sender} {error} {Status} " +
126-
$"{_nativeConnection!.IsClosed}");
131+
Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpectedly. " +
132+
$"Info: {ToString()}");
127133

128134
if (!_connectionSettings.RecoveryConfiguration.IsActivate())
129135
{
130-
OnNewStatus(Status.Closed, Utils.ConvertError(error));
136+
OnNewStatus(State.Closed, Utils.ConvertError(error));
131137
return;
132138
}
133139

140+
// TODO: Block the publishers and consumers
141+
OnNewStatus(State.Reconnecting, Utils.ConvertError(error));
134142

135-
OnNewStatus(Status.Reconneting, Utils.ConvertError(error));
136-
137-
Thread.Sleep(1000);
138-
// TODO: Replace with Backoff pattern
139-
var t = Task.Run(async () =>
143+
await Task.Run(async () =>
140144
{
141-
Trace.WriteLine(TraceLevel.Information, "Recovering connection");
142-
await EnsureConnectionAsync();
143-
Trace.WriteLine(TraceLevel.Information, "Recovering topology");
145+
var connected = false;
146+
// as first step we try to recover the connection
147+
// so the connected flag is false
148+
while (!connected &&
149+
// we have to check if the recovery is active.
150+
// The user may want to disable the recovery mechanism
151+
// the user can use the lifecycle callback to handle the error
152+
_connectionSettings.RecoveryConfiguration.IsActivate() &&
153+
// we have to check if the backoff policy is active
154+
// the user may want to disable the backoff policy or
155+
// the backoff policy is not active due of some condition
156+
// for example: Reaching the maximum number of retries and avoid the forever loop
157+
_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().IsActive)
158+
{
159+
try
160+
{
161+
var next = _connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Delay();
162+
Trace.WriteLine(TraceLevel.Information,
163+
$"Trying Recovering connection in {next} milliseconds. Info: {ToString()})");
164+
await Task.Delay(
165+
TimeSpan.FromMilliseconds(next));
166+
167+
await EnsureConnectionAsync();
168+
connected = true;
169+
}
170+
catch (Exception e)
171+
{
172+
Trace.WriteLine(TraceLevel.Warning,
173+
$"Error trying to recover connection {e}. Info: {this}");
174+
}
175+
}
176+
177+
_connectionSettings.RecoveryConfiguration.GetBackOffDelayPolicy().Reset();
178+
var connectionDescription = connected ? "recovered" : "not recovered";
179+
Trace.WriteLine(TraceLevel.Information,
180+
$"Connection {connectionDescription}. Info: {ToString()}");
181+
182+
if (!connected)
183+
{
184+
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
185+
OnNewStatus(State.Closed, new Error()
186+
{
187+
Description =
188+
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}",
189+
ErrorCode = ConnectionNotRecoveredCode
190+
});
191+
return;
192+
}
193+
194+
144195
if (_connectionSettings.RecoveryConfiguration.IsTopologyActive())
145196
{
146-
_recordingTopologyListener.Accept(new Visitor(_management));
197+
Trace.WriteLine(TraceLevel.Information, $"Recovering topology. Info: {ToString()}");
198+
await _recordingTopologyListener.Accept(new Visitor(_management));
147199
}
148200
});
149-
t.WaitAsync(TimeSpan.FromSeconds(10));
150201
return;
151202
}
152203

153204

154-
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed" +
155-
$"{sender} {error} {Status} " +
156-
$"{_nativeConnection!.IsClosed}");
157-
OnNewStatus(Status.Closed, Utils.ConvertError(error));
205+
Trace.WriteLine(TraceLevel.Verbose, $"connection is closed. Info: {ToString()}");
206+
OnNewStatus(State.Closed, Utils.ConvertError(error));
158207
};
159208
}
160209

@@ -166,12 +215,20 @@ private ClosedCallback MaybeRecoverConnection()
166215

167216
public async Task CloseAsync()
168217
{
169-
OnNewStatus(Status.Closed, null);
218+
_recordingTopologyListener.Clear();
219+
if (State == State.Closed) return;
220+
OnNewStatus(State.Closing, null);
170221
if (_nativeConnection is { IsClosed: false }) await _nativeConnection.CloseAsync();
171222
await _management.CloseAsync();
172223
}
173224

174-
public event IClosable.ChangeStatusCallBack? ChangeStatus;
225+
public event IClosable.LifeCycleCallBack? ChangeState;
226+
227+
public State State { get; private set; } = State.Closed;
175228

176-
public Status Status { get; private set; } = Status.Closed;
229+
public override string ToString()
230+
{
231+
var info = $"AmqpConnection{{ConnectionSettings='{_connectionSettings}', Status='{State.ToString()}'}}";
232+
return info;
233+
}
177234
}

0 commit comments

Comments
 (0)