Skip to content

Commit 4a8c9c6

Browse files
committed
Ensures connection recovery does not keep going after the connection has been manually closed.
1 parent 274d27a commit 4a8c9c6

File tree

2 files changed

+59
-28
lines changed

2 files changed

+59
-28
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -397,14 +397,14 @@ public void BeginAutomaticRecovery()
397397

398398
recoveryTaskFactory.StartNew(() =>
399399
{
400-
if(!self.ManuallyClosed)
400+
if (!self.ManuallyClosed)
401401
{
402402
try
403403
{
404404
#if NETFX_CORE
405-
System.Threading.Tasks.Task.Delay(m_factory.NetworkRecoveryInterval).Wait();
405+
System.Threading.Tasks.Task.Delay(m_factory.NetworkRecoveryInterval).Wait();
406406
#else
407-
Thread.Sleep(m_factory.NetworkRecoveryInterval);
407+
Thread.Sleep(m_factory.NetworkRecoveryInterval);
408408
#endif
409409
self.PerformAutomaticRecovery();
410410
}
@@ -422,19 +422,21 @@ protected void PerformAutomaticRecovery()
422422
{
423423
lock (recoveryLockTarget)
424424
{
425-
RecoverConnectionDelegate();
426-
RecoverConnectionShutdownHandlers();
427-
RecoverConnectionBlockedHandlers();
428-
RecoverConnectionUnblockedHandlers();
429-
430-
RecoverModels();
431-
if (m_factory.TopologyRecoveryEnabled)
425+
if (RecoverConnectionDelegate())
432426
{
433-
RecoverEntities();
434-
RecoverConsumers();
435-
}
427+
RecoverConnectionShutdownHandlers();
428+
RecoverConnectionBlockedHandlers();
429+
RecoverConnectionUnblockedHandlers();
430+
431+
RecoverModels();
432+
if (m_factory.TopologyRecoveryEnabled)
433+
{
434+
RecoverEntities();
435+
RecoverConsumers();
436+
}
436437

437-
RunRecoveryEventHandlers();
438+
RunRecoveryEventHandlers();
439+
}
438440
}
439441
}
440442

@@ -659,56 +661,64 @@ private void Init(IFrameHandler fh)
659661
public void Abort()
660662
{
661663
this.ManuallyClosed = true;
662-
m_delegate.Abort();
664+
if(m_delegate.IsOpen)
665+
m_delegate.Abort();
663666
}
664667

665668
///<summary>API-side invocation of connection abort.</summary>
666669
public void Abort(ushort reasonCode, string reasonText)
667670
{
668671
this.ManuallyClosed = true;
669-
m_delegate.Abort(reasonCode, reasonText);
672+
if (m_delegate.IsOpen)
673+
m_delegate.Abort(reasonCode, reasonText);
670674
}
671675

672676
///<summary>API-side invocation of connection abort with timeout.</summary>
673677
public void Abort(int timeout)
674678
{
675679
this.ManuallyClosed = true;
676-
m_delegate.Abort(timeout);
680+
if (m_delegate.IsOpen)
681+
m_delegate.Abort(timeout);
677682
}
678683

679684
///<summary>API-side invocation of connection abort with timeout.</summary>
680685
public void Abort(ushort reasonCode, string reasonText, int timeout)
681686
{
682687
this.ManuallyClosed = true;
683-
m_delegate.Abort(reasonCode, reasonText, timeout);
688+
if (m_delegate.IsOpen)
689+
m_delegate.Abort(reasonCode, reasonText, timeout);
684690
}
685691

686692
///<summary>API-side invocation of connection.close.</summary>
687693
public void Close()
688694
{
689695
this.ManuallyClosed = true;
690-
m_delegate.Close();
696+
if (m_delegate.IsOpen)
697+
m_delegate.Close();
691698
}
692699

693700
///<summary>API-side invocation of connection.close.</summary>
694701
public void Close(ushort reasonCode, string reasonText)
695702
{
696703
this.ManuallyClosed = true;
697-
m_delegate.Close(reasonCode, reasonText);
704+
if (m_delegate.IsOpen)
705+
m_delegate.Close(reasonCode, reasonText);
698706
}
699707

700708
///<summary>API-side invocation of connection.close with timeout.</summary>
701709
public void Close(int timeout)
702710
{
703711
this.ManuallyClosed = true;
704-
m_delegate.Close(timeout);
712+
if (m_delegate.IsOpen)
713+
m_delegate.Close(timeout);
705714
}
706715

707716
///<summary>API-side invocation of connection.close with timeout.</summary>
708717
public void Close(ushort reasonCode, string reasonText, int timeout)
709718
{
710719
this.ManuallyClosed = true;
711-
m_delegate.Close(reasonCode, reasonText, timeout);
720+
if (m_delegate.IsOpen)
721+
m_delegate.Close(reasonCode, reasonText, timeout);
712722
}
713723

714724
public IModel CreateModel()
@@ -736,7 +746,8 @@ public void HandleConnectionUnblocked()
736746

737747
void IDisposable.Dispose()
738748
{
739-
try {
749+
try
750+
{
740751
Abort();
741752
}
742753
finally
@@ -752,6 +763,7 @@ void IDisposable.Dispose()
752763
throw entry.Exception;
753764
}
754765
}
766+
755767
throw new OperationInterruptedException(null);
756768
}
757769
}
@@ -826,16 +838,15 @@ protected void RecoverConnectionBlockedHandlers()
826838
}
827839
}
828840

829-
protected void RecoverConnectionDelegate()
841+
protected bool RecoverConnectionDelegate()
830842
{
831-
bool recovering = true;
832-
while (recovering)
843+
while (!ManuallyClosed)
833844
{
834845
try
835846
{
836847
var fh = endpoints.SelectOne(m_factory.CreateFrameHandler);
837848
m_delegate = new Connection(m_factory, false, fh, this.ClientProvidedName);
838-
recovering = false;
849+
return true;
839850
}
840851
catch (Exception e)
841852
{
@@ -866,6 +877,8 @@ protected void RecoverConnectionDelegate()
866877
// TODO: provide a way to handle these exceptions
867878
}
868879
}
880+
881+
return false;
869882
}
870883

871884
protected void RecoverConnectionShutdownHandlers()

projects/client/Unit/src/unit/TestConnectionRecovery.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public void TestBasicConnectionRecoveryWithHostnameList()
151151
public void TestBasicConnectionRecoveryWithHostnameListAndUnreachableHosts()
152152
{
153153
using(var c = CreateAutorecoveringConnection(new List<string> { "191.72.44.22", "127.0.0.1", "localhost" }))
154-
{
154+
{
155155
Assert.IsTrue(c.IsOpen);
156156
CloseAndWaitForRecovery(c);
157157
Assert.IsTrue(c.IsOpen);
@@ -189,6 +189,24 @@ public void TestBasicConnectionRecoveryErrorEvent()
189189
}
190190
}
191191

192+
[Test]
193+
public void TestBasicConnectionRecoveryStopsAfterManualClose()
194+
{
195+
Assert.IsTrue(Conn.IsOpen);
196+
var c = CreateAutorecoveringConnection();
197+
var latch = new AutoResetEvent(false);
198+
c.ConnectionRecoveryError += (o, args) => latch.Set();
199+
StopRabbitMQ();
200+
latch.WaitOne(30000); // we got the failed reconnection event.
201+
var triedRecoveryAfterClose = false;
202+
c.Close();
203+
Thread.Sleep(5000);
204+
c.ConnectionRecoveryError += (o, args) => triedRecoveryAfterClose = true;
205+
Thread.Sleep(10000);
206+
Assert.IsFalse(triedRecoveryAfterClose);
207+
StartRabbitMQ();
208+
}
209+
192210
[Test]
193211
public void TestBasicConnectionRecoveryWithEndpointListAndUnreachableHosts()
194212
{

0 commit comments

Comments
 (0)