Skip to content

Commit d161b4b

Browse files
authored
Auto-reconnect publishers and consumers (#26)
* Autoreconnect publishers and consumers --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent ee89e10 commit d161b4b

15 files changed

+580
-30
lines changed

RabbitMQ.AMQP.Client/ILifeCycle.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ public enum State
99
Closed,
1010
}
1111

12-
1312
public class Error(string? errorCode, string? description)
1413
{
1514
public string? Description { get; } = description;
@@ -21,15 +20,13 @@ public override string ToString()
2120
}
2221
}
2322

24-
25-
2623
public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);
2724

2825
public interface ILifeCycle
2926
{
3027
Task CloseAsync();
31-
28+
3229
public State State { get; }
33-
34-
event LifeCycleCallBack ChangeState;
30+
31+
public event LifeCycleCallBack ChangeState;
3532
}

RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,5 @@ protected void OnNewStatus(State newState, Error? error)
4040

4141
public event LifeCycleCallBack? ChangeState;
4242
}
43+
44+

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,55 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
4646
private readonly AmqpManagement _management;
4747
private readonly RecordingTopologyListener _recordingTopologyListener = new();
4848

49+
private void ChangeEntitiesStatus(State state, Error? error)
50+
{
51+
ChangePublishersStatus(state, error);
52+
ChangeConsumersStatus(state, error);
53+
_management.ChangeStatus(state, error);
54+
}
55+
56+
private void ChangePublishersStatus(State state, Error? error)
57+
{
58+
foreach (var publisher1 in Publishers.Values)
59+
{
60+
var publisher = (AmqpPublisher)publisher1;
61+
publisher.ChangeStatus(state, error);
62+
}
63+
}
64+
65+
private void ChangeConsumersStatus(State state, Error? error)
66+
{
67+
foreach (var consumer1 in Consumers.Values)
68+
{
69+
var consumer = (AmqpConsumer)consumer1;
70+
consumer.ChangeStatus(state, error);
71+
}
72+
}
73+
74+
75+
private async Task ReconnectEntities()
76+
{
77+
await ReconnectPublishers().ConfigureAwait(false);
78+
await ReconnectConsumers().ConfigureAwait(false);
79+
}
80+
81+
private async Task ReconnectPublishers()
82+
{
83+
foreach (var publisher1 in Publishers.Values)
84+
{
85+
var publisher = (AmqpPublisher)publisher1;
86+
await publisher.Reconnect().ConfigureAwait(false);
87+
}
88+
}
89+
90+
private async Task ReconnectConsumers()
91+
{
92+
foreach (var consumer1 in Consumers.Values)
93+
{
94+
var consumer = (AmqpConsumer)consumer1;
95+
await consumer.Reconnect().ConfigureAwait(false);
96+
}
97+
}
4998

5099
private readonly ConnectionSettings _connectionSettings;
51100
internal readonly AmqpSessionManagement _nativePubSubSessions;
@@ -138,7 +187,6 @@ protected override Task OpenAsync()
138187

139188
private void EnsureConnection()
140189
{
141-
// await _semaphore.WaitAsync();
142190
try
143191
{
144192
if (_nativeConnection is { IsClosed: false })
@@ -180,14 +228,6 @@ [new Symbol("connection_name")] = _connectionSettings.ConnectionName(),
180228
Trace.WriteLine(TraceLevel.Error, $"Error trying to connect. Info: {ToString()}, error: {e}");
181229
throw new ConnectionException($"Error trying to connect. Info: {ToString()}, error: {e}");
182230
}
183-
184-
185-
finally
186-
{
187-
// _semaphore.Release();
188-
}
189-
190-
// return Task.CompletedTask;
191231
}
192232

193233
/// <summary>
@@ -205,8 +245,12 @@ private ClosedCallback MaybeRecoverConnection()
205245

206246
try
207247
{
248+
// close all the sessions, if the connection is closed the sessions are not valid anymore
249+
_nativePubSubSessions.ClearSessions();
250+
208251
if (error != null)
209252
{
253+
// we assume here that the connection is closed unexpectedly, since the error is not null
210254
Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpectedly. " +
211255
$"Info: {ToString()}");
212256

@@ -216,11 +260,16 @@ private ClosedCallback MaybeRecoverConnection()
216260
if (!_connectionSettings.RecoveryConfiguration.IsActivate())
217261
{
218262
OnNewStatus(State.Closed, Utils.ConvertError(error));
263+
ChangeEntitiesStatus(State.Closed, Utils.ConvertError(error));
219264
return;
220265
}
221266

222-
// TODO: Block the publishers and consumers
267+
// change the status for the connection and all the entities
268+
// to reconnecting and all the events are fired
223269
OnNewStatus(State.Reconnecting, Utils.ConvertError(error));
270+
ChangeEntitiesStatus(State.Reconnecting, Utils.ConvertError(error));
271+
272+
224273
await Task.Run(async () =>
225274
{
226275
bool connected = false;
@@ -266,6 +315,10 @@ await Task.Delay(TimeSpan.FromMilliseconds(next))
266315
OnNewStatus(State.Closed,
267316
new Error(ConnectionNotRecoveredCode,
268317
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}"));
318+
319+
ChangeEntitiesStatus(State.Closed, new Error(ConnectionNotRecoveredCode,
320+
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}"));
321+
269322
return;
270323
}
271324

@@ -278,9 +331,10 @@ await _recordingTopologyListener.Accept(visitor)
278331
}
279332

280333
OnNewStatus(State.Open, null);
281-
}).ConfigureAwait(false);
282-
334+
// after the connection is recovered we have to reconnect all the publishers and consumers
283335

336+
await ReconnectEntities().ConfigureAwait(false);
337+
}).ConfigureAwait(false);
284338
return;
285339
}
286340

@@ -320,6 +374,7 @@ await _semaphoreClose.WaitAsync()
320374
await CloseAllConsumers().ConfigureAwait(false);
321375

322376
_recordingTopologyListener.Clear();
377+
_nativePubSubSessions.ClearSessions();
323378

324379
if (State == State.Closed)
325380
{

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ protected sealed override Task OpenAsync()
5454

5555
private void ProcessMessages()
5656
{
57+
// TODO: Check the performance during the download messages
58+
// The publisher is faster than the consumer
5759
_receiverLink?.Start(_initialCredits,
5860
(link, message) =>
5961
{
@@ -88,6 +90,30 @@ public override async Task CloseAsync()
8890
return;
8991
}
9092

93+
OnNewStatus(State.Closing, null);
9194
await (_receiverLink.CloseAsync()).ConfigureAwait(false);
95+
_receiverLink = null;
96+
OnNewStatus(State.Closed, null);
97+
_connection.Consumers.TryRemove(Id, out _);
98+
}
99+
100+
101+
internal void ChangeStatus(State newState, Error? error)
102+
{
103+
OnNewStatus(newState, error);
104+
}
105+
106+
internal async Task Reconnect()
107+
{
108+
int randomWait = Random.Shared.Next(200, 800);
109+
Trace.WriteLine(TraceLevel.Information, $"Consumer: {ToString()} is reconnecting in {randomWait} ms");
110+
await Task.Delay(randomWait).ConfigureAwait(false);
111+
112+
if (_receiverLink != null)
113+
{
114+
await _receiverLink.DetachAsync().ConfigureAwait(false)!;
115+
}
116+
117+
await OpenAsync().ConfigureAwait(false);
92118
}
93119
}

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,12 @@ public override string ToString()
388388

389389
return info;
390390
}
391+
392+
393+
internal void ChangeStatus(State newState, Error? error)
394+
{
395+
OnNewStatus(newState, error);
396+
}
391397
}
392398

393399
public class InvalidCodeException(string message) : Exception(message);

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ protected sealed override Task OpenAsync()
4141
throw new PublisherException("Failed to create sender link. Link state is not attached, error: " +
4242
_senderLink.Error?.ToString() ?? "Unknown error");
4343
}
44+
4445
return base.OpenAsync();
4546
}
4647
catch (Exception e)
@@ -52,7 +53,7 @@ protected sealed override Task OpenAsync()
5253
private string Id { get; } = Guid.NewGuid().ToString();
5354

5455

55-
public void PausePublishing()
56+
private void PausePublishing()
5657
{
5758
_pausePublishing.Reset();
5859
}
@@ -150,11 +151,7 @@ public override async Task CloseAsync()
150151
{
151152
return;
152153
}
153-
154154
OnNewStatus(State.Closing, null);
155-
156-
_connection.Publishers.TryRemove(Id, out _);
157-
158155
try
159156
{
160157
if (_senderLink != null)
@@ -172,10 +169,26 @@ await _senderLink.CloseAsync()
172169
}
173170

174171
OnNewStatus(State.Closed, null);
172+
_connection.Publishers.TryRemove(Id, out _);
173+
}
174+
175+
176+
internal void ChangeStatus(State newState, Error? error)
177+
{
178+
OnNewStatus(newState, error);
175179
}
176180

177-
public void ResumePublishing()
181+
internal async Task Reconnect()
178182
{
179-
MaybeResumePublishing();
183+
int randomWait = Random.Shared.Next(200, 800);
184+
Trace.WriteLine(TraceLevel.Information, $"Publisher: {ToString()} is reconnecting in {randomWait} ms");
185+
await Task.Delay(randomWait).ConfigureAwait(false);
186+
187+
if (_senderLink != null)
188+
{
189+
await _senderLink.DetachAsync().ConfigureAwait(false)!;
190+
}
191+
192+
await OpenAsync().ConfigureAwait(false);
180193
}
181194
}

RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,9 @@ public Session GetOrCreateSession()
1818
Sessions.Add(session);
1919
return session;
2020
}
21+
22+
public void ClearSessions()
23+
{
24+
Sessions.Clear();
25+
}
2126
}

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,31 @@ public class DeliveryContext(IReceiverLink link, Message message) : IContext
66
{
77
public void Accept()
88
{
9+
if (link.IsClosed)
10+
{
11+
throw new ConsumerException("Link is closed");
12+
}
13+
914
link.Accept(message);
1015
}
1116

1217
public void Discard()
1318
{
19+
if (link.IsClosed)
20+
{
21+
throw new ConsumerException("Link is closed");
22+
}
23+
1424
link.Reject(message);
1525
}
1626

1727
public void Requeue()
1828
{
29+
if (!link.IsClosed)
30+
{
31+
throw new ConsumerException("Link is closed");
32+
}
33+
1934
link.Release(message);
2035
}
2136
}

Tests/ConnectionRecoverTests.cs renamed to Tests/ConnectionRecoveryTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public void Reset()
3838
public bool IsActive() => true;
3939
}
4040

41-
public class ConnectionRecoverTests(ITestOutputHelper testOutputHelper)
41+
public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper)
4242
{
43-
public ITestOutputHelper TestOutputHelper { get; } = testOutputHelper;
43+
private ITestOutputHelper TestOutputHelper { get; } = testOutputHelper;
4444

4545
/// <summary>
4646
/// The normal close the status should be correct and error null
@@ -53,8 +53,8 @@ public class ConnectionRecoverTests(ITestOutputHelper testOutputHelper)
5353
[InlineData(false)]
5454
public async Task NormalCloseTheStatusShouldBeCorrectAndErrorNull(bool activeRecovery)
5555
{
56-
var connectionName = Guid.NewGuid().ToString();
57-
var connection = await AmqpConnection.CreateAsync(
56+
string connectionName = Guid.NewGuid().ToString();
57+
IConnection connection = await AmqpConnection.CreateAsync(
5858
ConnectionSettingBuilder.Create().ConnectionName(connectionName).RecoveryConfiguration(
5959
RecoveryConfiguration.Create().Activated(activeRecovery).Topology(false)).Build());
6060

Tests/ManagementTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public async Task RaiseInvalidCodeException()
9797
{
9898
Properties = new Properties()
9999
{
100-
CorrelationId = messageId, Subject = "506", // 506 is not a valid code
100+
CorrelationId = messageId,
101+
Subject = "506", // 506 is not a valid code
101102
}
102103
});
103104
});

0 commit comments

Comments
 (0)