Skip to content

Commit 3fe6446

Browse files
Merge pull request #295 from rabbitmq/rabbitmq-dotnet-client-294
Ensures connection recovery does not keep going after the connection …
2 parents 274d27a + e7bff15 commit 3fe6446

File tree

4 files changed

+68
-45
lines changed

4 files changed

+68
-45
lines changed

projects/client/RabbitMQ.Client/src/client/api/IConnection.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public interface IConnection : NetworkConnection, IDisposable
194194
/// <remarks>
195195
/// Note that all active channels, sessions, and models will be closed if this method is called.
196196
/// In comparison to normal <see cref="Close()"/> method, <see cref="Abort()"/> will not throw
197-
/// <see cref="AlreadyClosedException"/> or <see cref="IOException"/> during closing connection.
197+
/// <see cref="IOException"/> during closing connection.
198198
///This method waits infinitely for the in-progress close operation to complete.
199199
/// </remarks>
200200
void Abort();
@@ -206,7 +206,7 @@ public interface IConnection : NetworkConnection, IDisposable
206206
/// The method behaves in the same way as <see cref="Abort()"/>, with the only
207207
/// difference that the connection is closed with the given connection close code and message.
208208
/// <para>
209-
/// The close code (See under "Reply Codes" in the AMQP specification)
209+
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification)
210210
/// </para>
211211
/// <para>
212212
/// A message indicating the reason for closing the connection
@@ -253,7 +253,7 @@ public interface IConnection : NetworkConnection, IDisposable
253253
/// closed if this method is called. It will wait for the in-progress
254254
/// close operation to complete. This method will not return to the caller
255255
/// until the shutdown is complete. If the connection is already closed
256-
/// (or closing), then this method will throw <see cref="AlreadyClosedException"/>.
256+
/// (or closing), then this method will do nothing.
257257
/// It can also throw <see cref="IOException"/> when socket was closed unexpectedly.
258258
/// </remarks>
259259
void Close();
@@ -281,7 +281,7 @@ public interface IConnection : NetworkConnection, IDisposable
281281
/// Note that all active channels, sessions, and models will be
282282
/// closed if this method is called. It will wait for the in-progress
283283
/// close operation to complete with a timeout. If the connection is
284-
/// already closed (or closing), then this method will throw <see cref="AlreadyClosedException"/>.
284+
/// already closed (or closing), then this method will do nothing.
285285
/// It can also throw <see cref="IOException"/> when socket was closed unexpectedly.
286286
/// If timeout is reached and the close operations haven't finished, then socket is forced to close.
287287
/// <para>
@@ -298,7 +298,7 @@ public interface IConnection : NetworkConnection, IDisposable
298298
/// The method behaves in the same way as <see cref="Close(int)"/>, with the only
299299
/// difference that the connection is closed with the given connection close code and message.
300300
/// <para>
301-
/// The close code (See under "Reply Codes" in the AMQP specification).
301+
/// The close code (See under "Reply Codes" in the AMQP 0-9-1 specification).
302302
/// </para>
303303
/// <para>
304304
/// A message indicating the reason for closing the connection.

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 43 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -397,14 +397,14 @@ public void BeginAutomaticRecovery()
397397

398398
recoveryTaskFactory.StartNew(() =>
399399
{
400-
if(!self.ManuallyClosed)
400+
if (!self.ManuallyClosed)
401401
{
402402
try
403403
{
404404
#if NETFX_CORE
405-
System.Threading.Tasks.Task.Delay(m_factory.NetworkRecoveryInterval).Wait();
405+
System.Threading.Tasks.Task.Delay(m_factory.NetworkRecoveryInterval).Wait();
406406
#else
407-
Thread.Sleep(m_factory.NetworkRecoveryInterval);
407+
Thread.Sleep(m_factory.NetworkRecoveryInterval);
408408
#endif
409409
self.PerformAutomaticRecovery();
410410
}
@@ -422,19 +422,21 @@ protected void PerformAutomaticRecovery()
422422
{
423423
lock (recoveryLockTarget)
424424
{
425-
RecoverConnectionDelegate();
426-
RecoverConnectionShutdownHandlers();
427-
RecoverConnectionBlockedHandlers();
428-
RecoverConnectionUnblockedHandlers();
429-
430-
RecoverModels();
431-
if (m_factory.TopologyRecoveryEnabled)
425+
if (RecoverConnectionDelegate())
432426
{
433-
RecoverEntities();
434-
RecoverConsumers();
435-
}
427+
RecoverConnectionShutdownHandlers();
428+
RecoverConnectionBlockedHandlers();
429+
RecoverConnectionUnblockedHandlers();
430+
431+
RecoverModels();
432+
if (m_factory.TopologyRecoveryEnabled)
433+
{
434+
RecoverEntities();
435+
RecoverConsumers();
436+
}
436437

437-
RunRecoveryEventHandlers();
438+
RunRecoveryEventHandlers();
439+
}
438440
}
439441
}
440442

@@ -659,56 +661,64 @@ private void Init(IFrameHandler fh)
659661
public void Abort()
660662
{
661663
this.ManuallyClosed = true;
662-
m_delegate.Abort();
664+
if(m_delegate.IsOpen)
665+
m_delegate.Abort();
663666
}
664667

665668
///<summary>API-side invocation of connection abort.</summary>
666669
public void Abort(ushort reasonCode, string reasonText)
667670
{
668671
this.ManuallyClosed = true;
669-
m_delegate.Abort(reasonCode, reasonText);
672+
if (m_delegate.IsOpen)
673+
m_delegate.Abort(reasonCode, reasonText);
670674
}
671675

672676
///<summary>API-side invocation of connection abort with timeout.</summary>
673677
public void Abort(int timeout)
674678
{
675679
this.ManuallyClosed = true;
676-
m_delegate.Abort(timeout);
680+
if (m_delegate.IsOpen)
681+
m_delegate.Abort(timeout);
677682
}
678683

679684
///<summary>API-side invocation of connection abort with timeout.</summary>
680685
public void Abort(ushort reasonCode, string reasonText, int timeout)
681686
{
682687
this.ManuallyClosed = true;
683-
m_delegate.Abort(reasonCode, reasonText, timeout);
688+
if (m_delegate.IsOpen)
689+
m_delegate.Abort(reasonCode, reasonText, timeout);
684690
}
685691

686692
///<summary>API-side invocation of connection.close.</summary>
687693
public void Close()
688694
{
689695
this.ManuallyClosed = true;
690-
m_delegate.Close();
696+
if (m_delegate.IsOpen)
697+
m_delegate.Close();
691698
}
692699

693700
///<summary>API-side invocation of connection.close.</summary>
694701
public void Close(ushort reasonCode, string reasonText)
695702
{
696703
this.ManuallyClosed = true;
697-
m_delegate.Close(reasonCode, reasonText);
704+
if (m_delegate.IsOpen)
705+
m_delegate.Close(reasonCode, reasonText);
698706
}
699707

700708
///<summary>API-side invocation of connection.close with timeout.</summary>
701709
public void Close(int timeout)
702710
{
703711
this.ManuallyClosed = true;
704-
m_delegate.Close(timeout);
712+
if (m_delegate.IsOpen)
713+
m_delegate.Close(timeout);
705714
}
706715

707716
///<summary>API-side invocation of connection.close with timeout.</summary>
708717
public void Close(ushort reasonCode, string reasonText, int timeout)
709718
{
710719
this.ManuallyClosed = true;
711-
m_delegate.Close(reasonCode, reasonText, timeout);
720+
if (m_delegate.IsOpen)
721+
m_delegate.Close(reasonCode, reasonText, timeout);
712722
}
713723

714724
public IModel CreateModel()
@@ -736,23 +746,17 @@ public void HandleConnectionUnblocked()
736746

737747
void IDisposable.Dispose()
738748
{
739-
try {
749+
try
750+
{
740751
Abort();
741752
}
742-
finally
753+
catch(Exception)
743754
{
744-
m_models.Clear();
755+
// TODO: log
745756
}
746-
if (ShutdownReport.Count > 0)
757+
finally
747758
{
748-
foreach (ShutdownReportEntry entry in ShutdownReport)
749-
{
750-
if (entry.Exception != null)
751-
{
752-
throw entry.Exception;
753-
}
754-
}
755-
throw new OperationInterruptedException(null);
759+
m_models.Clear();
756760
}
757761
}
758762

@@ -826,16 +830,15 @@ protected void RecoverConnectionBlockedHandlers()
826830
}
827831
}
828832

829-
protected void RecoverConnectionDelegate()
833+
protected bool RecoverConnectionDelegate()
830834
{
831-
bool recovering = true;
832-
while (recovering)
835+
while (!ManuallyClosed)
833836
{
834837
try
835838
{
836839
var fh = endpoints.SelectOne(m_factory.CreateFrameHandler);
837840
m_delegate = new Connection(m_factory, false, fh, this.ClientProvidedName);
838-
recovering = false;
841+
return true;
839842
}
840843
catch (Exception e)
841844
{
@@ -866,6 +869,8 @@ protected void RecoverConnectionDelegate()
866869
// TODO: provide a way to handle these exceptions
867870
}
868871
}
872+
873+
return false;
869874
}
870875

871876
protected void RecoverConnectionShutdownHandlers()

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1240,9 +1240,9 @@ public void HandleConnectionUnblocked()
12401240

12411241
void IDisposable.Dispose()
12421242
{
1243-
MaybeStopHeartbeatTimers();
12441243
try
12451244
{
1245+
MaybeStopHeartbeatTimers();
12461246
Abort();
12471247
}
12481248
catch (OperationInterruptedException)

projects/client/Unit/src/unit/TestConnectionRecovery.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public void TestBasicConnectionRecoveryWithHostnameList()
151151
public void TestBasicConnectionRecoveryWithHostnameListAndUnreachableHosts()
152152
{
153153
using(var c = CreateAutorecoveringConnection(new List<string> { "191.72.44.22", "127.0.0.1", "localhost" }))
154-
{
154+
{
155155
Assert.IsTrue(c.IsOpen);
156156
CloseAndWaitForRecovery(c);
157157
Assert.IsTrue(c.IsOpen);
@@ -189,6 +189,24 @@ public void TestBasicConnectionRecoveryErrorEvent()
189189
}
190190
}
191191

192+
[Test]
193+
public void TestBasicConnectionRecoveryStopsAfterManualClose()
194+
{
195+
Assert.IsTrue(Conn.IsOpen);
196+
var c = CreateAutorecoveringConnection();
197+
var latch = new AutoResetEvent(false);
198+
c.ConnectionRecoveryError += (o, args) => latch.Set();
199+
StopRabbitMQ();
200+
latch.WaitOne(30000); // we got the failed reconnection event.
201+
var triedRecoveryAfterClose = false;
202+
c.Close();
203+
Thread.Sleep(5000);
204+
c.ConnectionRecoveryError += (o, args) => triedRecoveryAfterClose = true;
205+
Thread.Sleep(10000);
206+
Assert.IsFalse(triedRecoveryAfterClose);
207+
StartRabbitMQ();
208+
}
209+
192210
[Test]
193211
public void TestBasicConnectionRecoveryWithEndpointListAndUnreachableHosts()
194212
{

0 commit comments

Comments
 (0)