Skip to content

Commit fe197d0

Browse files
authored
Merge branch 'main' into rabbitmq-amqp-dotnet-client-17
2 parents a6b990d + d161b4b commit fe197d0

15 files changed

+579
-24
lines changed

RabbitMQ.AMQP.Client/ILifeCycle.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,5 @@ public interface ILifeCycle
2828

2929
public State State { get; }
3030

31-
event LifeCycleCallBack ChangeState;
31+
public event LifeCycleCallBack ChangeState;
3232
}

RabbitMQ.AMQP.Client/Impl/AbstractLifeCycle.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,5 @@ protected void OnNewStatus(State newState, Error? error)
4040

4141
public event LifeCycleCallBack? ChangeState;
4242
}
43+
44+

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 68 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,56 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
4545
private readonly AmqpManagement _management;
4646
private readonly RecordingTopologyListener _recordingTopologyListener = new();
4747

48+
private void ChangeEntitiesStatus(State state, Error? error)
49+
{
50+
ChangePublishersStatus(state, error);
51+
ChangeConsumersStatus(state, error);
52+
_management.ChangeStatus(state, error);
53+
}
54+
55+
private void ChangePublishersStatus(State state, Error? error)
56+
{
57+
foreach (var publisher1 in Publishers.Values)
58+
{
59+
var publisher = (AmqpPublisher)publisher1;
60+
publisher.ChangeStatus(state, error);
61+
}
62+
}
63+
64+
private void ChangeConsumersStatus(State state, Error? error)
65+
{
66+
foreach (var consumer1 in Consumers.Values)
67+
{
68+
var consumer = (AmqpConsumer)consumer1;
69+
consumer.ChangeStatus(state, error);
70+
}
71+
}
72+
73+
74+
private async Task ReconnectEntities()
75+
{
76+
await ReconnectPublishers().ConfigureAwait(false);
77+
await ReconnectConsumers().ConfigureAwait(false);
78+
}
79+
80+
private async Task ReconnectPublishers()
81+
{
82+
foreach (var publisher1 in Publishers.Values)
83+
{
84+
var publisher = (AmqpPublisher)publisher1;
85+
await publisher.Reconnect().ConfigureAwait(false);
86+
}
87+
}
88+
89+
private async Task ReconnectConsumers()
90+
{
91+
foreach (var consumer1 in Consumers.Values)
92+
{
93+
var consumer = (AmqpConsumer)consumer1;
94+
await consumer.Reconnect().ConfigureAwait(false);
95+
}
96+
}
97+
4898
private readonly ConnectionSettings _connectionSettings;
4999
internal readonly AmqpSessionManagement _nativePubSubSessions;
50100

@@ -210,14 +260,6 @@ void onOpened(Amqp.IConnection connection, Open open1)
210260
Trace.WriteLine(TraceLevel.Error, $"Error trying to connect. Info: {ToString()}, error: {e}");
211261
throw new ConnectionException($"Error trying to connect. Info: {ToString()}, error: {e}");
212262
}
213-
214-
215-
finally
216-
{
217-
// _semaphore.Release();
218-
}
219-
220-
// return Task.CompletedTask;
221263
}
222264

223265
/// <summary>
@@ -235,8 +277,12 @@ private ClosedCallback MaybeRecoverConnection()
235277

236278
try
237279
{
280+
// close all the sessions, if the connection is closed the sessions are not valid anymore
281+
_nativePubSubSessions.ClearSessions();
282+
238283
if (error != null)
239284
{
285+
// we assume here that the connection is closed unexpectedly, since the error is not null
240286
Trace.WriteLine(TraceLevel.Warning, $"connection is closed unexpectedly. " +
241287
$"Info: {ToString()}");
242288

@@ -246,11 +292,16 @@ private ClosedCallback MaybeRecoverConnection()
246292
if (!_connectionSettings.RecoveryConfiguration.IsActivate())
247293
{
248294
OnNewStatus(State.Closed, Utils.ConvertError(error));
295+
ChangeEntitiesStatus(State.Closed, Utils.ConvertError(error));
249296
return;
250297
}
251298

252-
// TODO: Block the publishers and consumers
299+
// change the status for the connection and all the entities
300+
// to reconnecting and all the events are fired
253301
OnNewStatus(State.Reconnecting, Utils.ConvertError(error));
302+
ChangeEntitiesStatus(State.Reconnecting, Utils.ConvertError(error));
303+
304+
254305
await Task.Run(async () =>
255306
{
256307
bool connected = false;
@@ -297,6 +348,10 @@ await EnsureConnection()
297348
OnNewStatus(State.Closed,
298349
new Error(ConnectionNotRecoveredCode,
299350
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}"));
351+
352+
ChangeEntitiesStatus(State.Closed, new Error(ConnectionNotRecoveredCode,
353+
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.RecoveryConfiguration}"));
354+
300355
return;
301356
}
302357

@@ -309,9 +364,10 @@ await _recordingTopologyListener.Accept(visitor)
309364
}
310365

311366
OnNewStatus(State.Open, null);
312-
}).ConfigureAwait(false);
313-
367+
// after the connection is recovered we have to reconnect all the publishers and consumers
314368

369+
await ReconnectEntities().ConfigureAwait(false);
370+
}).ConfigureAwait(false);
315371
return;
316372
}
317373

@@ -351,6 +407,7 @@ await _semaphoreClose.WaitAsync()
351407
await CloseAllConsumers().ConfigureAwait(false);
352408

353409
_recordingTopologyListener.Clear();
410+
_nativePubSubSessions.ClearSessions();
354411

355412
if (State == State.Closed)
356413
{

RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ protected sealed override Task OpenAsync()
5454

5555
private void ProcessMessages()
5656
{
57+
// TODO: Check the performance during the download messages
58+
// The publisher is faster than the consumer
5759
_receiverLink?.Start(_initialCredits,
5860
(link, message) =>
5961
{
@@ -88,6 +90,30 @@ public override async Task CloseAsync()
8890
return;
8991
}
9092

93+
OnNewStatus(State.Closing, null);
9194
await (_receiverLink.CloseAsync()).ConfigureAwait(false);
95+
_receiverLink = null;
96+
OnNewStatus(State.Closed, null);
97+
_connection.Consumers.TryRemove(Id, out _);
98+
}
99+
100+
101+
internal void ChangeStatus(State newState, Error? error)
102+
{
103+
OnNewStatus(newState, error);
104+
}
105+
106+
internal async Task Reconnect()
107+
{
108+
int randomWait = Random.Shared.Next(200, 800);
109+
Trace.WriteLine(TraceLevel.Information, $"Consumer: {ToString()} is reconnecting in {randomWait} ms");
110+
await Task.Delay(randomWait).ConfigureAwait(false);
111+
112+
if (_receiverLink != null)
113+
{
114+
await _receiverLink.DetachAsync().ConfigureAwait(false)!;
115+
}
116+
117+
await OpenAsync().ConfigureAwait(false);
92118
}
93119
}

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,12 @@ public override string ToString()
388388

389389
return info;
390390
}
391+
392+
393+
internal void ChangeStatus(State newState, Error? error)
394+
{
395+
OnNewStatus(newState, error);
396+
}
391397
}
392398

393399
public class InvalidCodeException(string message) : Exception(message);

RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ protected sealed override Task OpenAsync()
4141
throw new PublisherException("Failed to create sender link. Link state is not attached, error: " +
4242
_senderLink.Error?.ToString() ?? "Unknown error");
4343
}
44+
4445
return base.OpenAsync();
4546
}
4647
catch (Exception e)
@@ -52,7 +53,7 @@ protected sealed override Task OpenAsync()
5253
private string Id { get; } = Guid.NewGuid().ToString();
5354

5455

55-
public void PausePublishing()
56+
private void PausePublishing()
5657
{
5758
_pausePublishing.Reset();
5859
}
@@ -150,11 +151,7 @@ public override async Task CloseAsync()
150151
{
151152
return;
152153
}
153-
154154
OnNewStatus(State.Closing, null);
155-
156-
_connection.Publishers.TryRemove(Id, out _);
157-
158155
try
159156
{
160157
if (_senderLink != null)
@@ -172,10 +169,26 @@ await _senderLink.CloseAsync()
172169
}
173170

174171
OnNewStatus(State.Closed, null);
172+
_connection.Publishers.TryRemove(Id, out _);
173+
}
174+
175+
176+
internal void ChangeStatus(State newState, Error? error)
177+
{
178+
OnNewStatus(newState, error);
175179
}
176180

177-
public void ResumePublishing()
181+
internal async Task Reconnect()
178182
{
179-
MaybeResumePublishing();
183+
int randomWait = Random.Shared.Next(200, 800);
184+
Trace.WriteLine(TraceLevel.Information, $"Publisher: {ToString()} is reconnecting in {randomWait} ms");
185+
await Task.Delay(randomWait).ConfigureAwait(false);
186+
187+
if (_senderLink != null)
188+
{
189+
await _senderLink.DetachAsync().ConfigureAwait(false)!;
190+
}
191+
192+
await OpenAsync().ConfigureAwait(false);
180193
}
181194
}

RabbitMQ.AMQP.Client/Impl/AmqpSessionManagement.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,9 @@ public Session GetOrCreateSession()
1818
Sessions.Add(session);
1919
return session;
2020
}
21+
22+
public void ClearSessions()
23+
{
24+
Sessions.Clear();
25+
}
2126
}

RabbitMQ.AMQP.Client/Impl/DeliveryContext.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,31 @@ public class DeliveryContext(IReceiverLink link, Message message) : IContext
66
{
77
public void Accept()
88
{
9+
if (link.IsClosed)
10+
{
11+
throw new ConsumerException("Link is closed");
12+
}
13+
914
link.Accept(message);
1015
}
1116

1217
public void Discard()
1318
{
19+
if (link.IsClosed)
20+
{
21+
throw new ConsumerException("Link is closed");
22+
}
23+
1424
link.Reject(message);
1525
}
1626

1727
public void Requeue()
1828
{
29+
if (!link.IsClosed)
30+
{
31+
throw new ConsumerException("Link is closed");
32+
}
33+
1934
link.Release(message);
2035
}
2136
}

Tests/ConnectionRecoverTests.cs renamed to Tests/ConnectionRecoveryTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ public void Reset()
3838
public bool IsActive() => true;
3939
}
4040

41-
public class ConnectionRecoverTests(ITestOutputHelper testOutputHelper)
41+
public class ConnectionRecoveryTests(ITestOutputHelper testOutputHelper)
4242
{
43-
public ITestOutputHelper TestOutputHelper { get; } = testOutputHelper;
43+
private ITestOutputHelper TestOutputHelper { get; } = testOutputHelper;
4444

4545
/// <summary>
4646
/// The normal close the status should be correct and error null
@@ -53,8 +53,8 @@ public class ConnectionRecoverTests(ITestOutputHelper testOutputHelper)
5353
[InlineData(false)]
5454
public async Task NormalCloseTheStatusShouldBeCorrectAndErrorNull(bool activeRecovery)
5555
{
56-
var connectionName = Guid.NewGuid().ToString();
57-
var connection = await AmqpConnection.CreateAsync(
56+
string connectionName = Guid.NewGuid().ToString();
57+
IConnection connection = await AmqpConnection.CreateAsync(
5858
ConnectionSettingBuilder.Create().ConnectionName(connectionName).RecoveryConfiguration(
5959
RecoveryConfiguration.Create().Activated(activeRecovery).Topology(false)).Build());
6060

Tests/ManagementTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public async Task RaiseInvalidCodeException()
9797
{
9898
Properties = new Properties()
9999
{
100-
CorrelationId = messageId, Subject = "506", // 506 is not a valid code
100+
CorrelationId = messageId,
101+
Subject = "506", // 506 is not a valid code
101102
}
102103
});
103104
});

0 commit comments

Comments
 (0)