Skip to content

Commit 310b8c0

Browse files
Switch heartbeat implementation back to latches (reset events)
We're back to square one with approaches to heartbeat implementations. The socket-based solution has two issues: * Obscure TLS timeouts. * WinRT has incompatible I/O API which makes implementing the same socket timeout-based approach becomes a lot more challenging (a couple of days spent in an unsuccessful spike). This reset event-driven approach had bugs in 3.5.0 but is quite straightforward and I/O layer agnostic, so we're willing to go back. Note that we now have a significantly better test coverage in this area compared to the 3.5.0 development cycle.
1 parent 067d222 commit 310b8c0

File tree

3 files changed

+192
-122
lines changed

3 files changed

+192
-122
lines changed

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 147 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,28 @@ public class Connection : IConnection
7777
public EventHandler<EventArgs> m_connectionUnblocked;
7878
public IConnectionFactory m_factory;
7979
public IFrameHandler m_frameHandler;
80-
public ushort m_heartbeat = 0;
81-
public TimeSpan m_heartbeatTimeSpan = TimeSpan.FromSeconds(0);
8280

8381
public Guid m_id = Guid.NewGuid();
84-
85-
public int m_missedHeartbeats = 0;
8682
public ModelBase m_model0;
8783
public volatile bool m_running = true;
8884
public MainSession m_session0;
8985
public SessionManager m_sessionManager;
9086

9187
public IList<ShutdownReportEntry> m_shutdownReport = new SynchronizedList<ShutdownReportEntry>(new List<ShutdownReportEntry>());
88+
89+
//
90+
// Heartbeats
91+
//
92+
93+
public ushort m_heartbeat = 0;
94+
public TimeSpan m_heartbeatTimeSpan = TimeSpan.FromSeconds(0);
95+
public int m_missedHeartbeats = 0;
96+
9297
private Timer _heartbeatWriteTimer;
98+
private Timer _heartbeatReadTimer;
99+
public AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
100+
public AutoResetEvent m_heartbeatWrite = new AutoResetEvent(false);
101+
93102

94103
// true if we haven't finished connection negotiation.
95104
// In this state socket exceptions are treated as fatal connection
@@ -113,7 +122,7 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
113122
StartMainLoop(factory.UseBackgroundThreadsForIO);
114123
Open(insist);
115124

116-
StartHeartbeatTimer();
125+
StartHeartbeatTimers();
117126
AppDomain.CurrentDomain.DomainUnload += HandleDomainUnload;
118127
}
119128

@@ -465,8 +474,10 @@ public void EnsureIsOpen()
465474
public void FinishClose()
466475
{
467476
// Notify hearbeat loops that they can leave
477+
m_heartbeatRead.Set();
478+
m_heartbeatWrite.Set();
468479
m_closed = true;
469-
StopHeartbeatTimer();
480+
StopHeartbeatTimers();
470481

471482
m_frameHandler.Close();
472483
m_model0.SetCloseReason(m_closeReason);
@@ -522,33 +533,6 @@ public bool HardProtocolExceptionHandler(HardProtocolException hpe)
522533
return false;
523534
}
524535

525-
public void HeartbeatWriteTimerCallback(object state)
526-
{
527-
bool shouldTerminate = false;
528-
try
529-
{
530-
if (!m_closed)
531-
{
532-
WriteFrame(m_heartbeatFrame);
533-
}
534-
}
535-
catch (Exception e)
536-
{
537-
HandleMainLoopException(new ShutdownEventArgs(
538-
ShutdownInitiator.Library,
539-
0,
540-
"End of stream",
541-
e));
542-
shouldTerminate = true;
543-
}
544-
545-
if (m_closed || shouldTerminate)
546-
{
547-
TerminateMainloop();
548-
FinishClose();
549-
}
550-
}
551-
552536
public void InternalClose(ShutdownEventArgs reason)
553537
{
554538
if (!SetCloseReason(reason))
@@ -639,96 +623,62 @@ public void MainLoop()
639623

640624
public void MainLoopIteration()
641625
{
642-
try
643-
{
644-
Frame frame = m_frameHandler.ReadFrame();
626+
Frame frame = m_frameHandler.ReadFrame();
645627

646-
// We have received an actual frame.
647-
m_missedHeartbeats = 0;
648-
if (frame.Type == Constants.FrameHeartbeat)
649-
{
650-
// Ignore it: we've already just reset the heartbeat
651-
// counter.
652-
return;
653-
}
628+
NotifyHeartbeatListener();
629+
// We have received an actual frame.
630+
if (frame.Type == Constants.FrameHeartbeat)
631+
{
632+
// Ignore it: we've already just reset the heartbeat
633+
// latch.
634+
return;
635+
}
654636

655-
if (frame.Channel == 0)
656-
{
657-
// In theory, we could get non-connection.close-ok
658-
// frames here while we're quiescing (m_closeReason !=
659-
// null). In practice, there's a limited number of
660-
// things the server can ask of us on channel 0 -
661-
// essentially, just connection.close. That, combined
662-
// with the restrictions on pipelining, mean that
663-
// we're OK here to handle channel 0 traffic in a
664-
// quiescing situation, even though technically we
665-
// should be ignoring everything except
666-
// connection.close-ok.
667-
m_session0.HandleFrame(frame);
668-
}
669-
else
637+
if (frame.Channel == 0)
638+
{
639+
// In theory, we could get non-connection.close-ok
640+
// frames here while we're quiescing (m_closeReason !=
641+
// null). In practice, there's a limited number of
642+
// things the server can ask of us on channel 0 -
643+
// essentially, just connection.close. That, combined
644+
// with the restrictions on pipelining, mean that
645+
// we're OK here to handle channel 0 traffic in a
646+
// quiescing situation, even though technically we
647+
// should be ignoring everything except
648+
// connection.close-ok.
649+
m_session0.HandleFrame(frame);
650+
}
651+
else
652+
{
653+
// If we're still m_running, but have a m_closeReason,
654+
// then we must be quiescing, which means any inbound
655+
// frames for non-zero channels (and any inbound
656+
// commands on channel zero that aren't
657+
// Connection.CloseOk) must be discarded.
658+
if (m_closeReason == null)
670659
{
671-
// If we're still m_running, but have a m_closeReason,
672-
// then we must be quiescing, which means any inbound
673-
// frames for non-zero channels (and any inbound
674-
// commands on channel zero that aren't
675-
// Connection.CloseOk) must be discarded.
676-
if (m_closeReason == null)
660+
// No close reason, not quiescing the
661+
// connection. Handle the frame. (Of course, the
662+
// Session itself may be quiescing this particular
663+
// channel, but that's none of our concern.)
664+
ISession session = m_sessionManager.Lookup(frame.Channel);
665+
if (session == null)
677666
{
678-
// No close reason, not quiescing the
679-
// connection. Handle the frame. (Of course, the
680-
// Session itself may be quiescing this particular
681-
// channel, but that's none of our concern.)
682-
ISession session = m_sessionManager.Lookup(frame.Channel);
683-
if (session == null)
684-
{
685-
throw new ChannelErrorException(frame.Channel);
686-
}
687-
else
688-
{
689-
session.HandleFrame(frame);
690-
}
667+
throw new ChannelErrorException(frame.Channel);
668+
}
669+
else
670+
{
671+
session.HandleFrame(frame);
691672
}
692673
}
693674
}
694-
catch (SocketException ioe)
695-
{
696-
HandleIOException(ioe);
697-
}
698-
catch (IOException ioe)
699-
{
700-
HandleIOException(ioe);
701-
}
702675
}
703676

704-
// socket receive timeout is configured to be 1/2 of the heartbeat timeout
705-
// and the peer must be considered dead after two subsequent missed heartbeats:
706-
// terminate after 4 socket timeouts
707-
private const int SOCKET_TIMEOUTS_TO_CONSIDER_PEER_UNRESPONSIVE = 4;
708-
709-
protected void HandleIOException(Exception e)
677+
public void NotifyHeartbeatListener()
710678
{
711-
// socket error when in negotiation, throw BrokerUnreachableException
712-
// immediately
713-
if (m_inConnectionNegotiation)
714-
{
715-
var cfe = new ConnectFailureException("I/O error before connection negotiation was completed", e);
716-
throw new BrokerUnreachableException(cfe);
717-
}
718-
719-
if (++m_missedHeartbeats >= SOCKET_TIMEOUTS_TO_CONSIDER_PEER_UNRESPONSIVE)
679+
if (m_heartbeat != 0)
720680
{
721-
var description =
722-
String.Format("Peer missed 2 heartbeats with heartbeat timeout set to {0} seconds",
723-
m_heartbeat);
724-
var eose = new EndOfStreamException(description);
725-
m_shutdownReport.Add(new ShutdownReportEntry(description, eose));
726-
HandleMainLoopException(new ShutdownEventArgs(ShutdownInitiator.Library,
727-
0,
728-
"End of stream",
729-
eose));
730-
TerminateMainloop();
731-
FinishClose();
681+
m_heartbeatRead.Set();
732682
}
733683
}
734684

@@ -946,12 +896,15 @@ public bool SetCloseReason(ShutdownEventArgs reason)
946896
}
947897
}
948898

949-
public void StartHeartbeatTimer()
899+
public void StartHeartbeatTimers()
950900
{
951901
if (Heartbeat != 0)
952902
{
953903
_heartbeatWriteTimer = new Timer(HeartbeatWriteTimerCallback);
954-
_heartbeatWriteTimer.Change(TimeSpan.FromMilliseconds(0), m_heartbeatTimeSpan);
904+
_heartbeatWriteTimer.Change(TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(-1));
905+
906+
_heartbeatReadTimer = new Timer(HeartbeatReadTimerCallback);
907+
_heartbeatReadTimer.Change(TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(-1));
955908
}
956909
}
957910

@@ -963,9 +916,84 @@ public void StartMainLoop(bool useBackgroundThread)
963916
mainLoopThread.Start();
964917
}
965918

966-
protected void StopHeartbeatTimer()
919+
public void HeartbeatReadTimerCallback(object state)
920+
{
921+
bool shouldTerminate = false;
922+
if (!m_closed)
923+
{
924+
if (!m_heartbeatRead.WaitOne(0, false))
925+
{
926+
m_missedHeartbeats++;
927+
}
928+
else
929+
{
930+
m_missedHeartbeats = 0;
931+
}
932+
933+
// Has to miss two full heartbeats to force socket close
934+
if (m_missedHeartbeats > 1)
935+
{
936+
String description = String.Format("Heartbeat missing with heartbeat == {0} seconds", m_heartbeat);
937+
var eose = new EndOfStreamException(description);
938+
m_shutdownReport.Add(new ShutdownReportEntry(description, eose));
939+
HandleMainLoopException(
940+
new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose));
941+
shouldTerminate = true;
942+
}
943+
}
944+
945+
if (shouldTerminate)
946+
{
947+
TerminateMainloop();
948+
FinishClose();
949+
}
950+
else
951+
{
952+
_heartbeatReadTimer.Change(Heartbeat * 1000, Timeout.Infinite);
953+
}
954+
}
955+
956+
public void HeartbeatWriteTimerCallback(object state)
967957
{
968-
if (_heartbeatWriteTimer != null)
958+
bool shouldTerminate = false;
959+
try
960+
{
961+
if (!m_closed)
962+
{
963+
if (!m_heartbeatWrite.WaitOne(0, false))
964+
{
965+
WriteFrame(m_heartbeatFrame);
966+
}
967+
}
968+
}
969+
catch (Exception e)
970+
{
971+
HandleMainLoopException(new ShutdownEventArgs(
972+
ShutdownInitiator.Library,
973+
0,
974+
"End of stream",
975+
e));
976+
shouldTerminate = true;
977+
}
978+
979+
if (m_closed || shouldTerminate)
980+
{
981+
TerminateMainloop();
982+
FinishClose();
983+
}
984+
else
985+
{
986+
_heartbeatWriteTimer.Change(Heartbeat * 1000, Timeout.Infinite);
987+
}
988+
}
989+
990+
protected void StopHeartbeatTimers()
991+
{
992+
if(_heartbeatReadTimer != null)
993+
{
994+
_heartbeatReadTimer.Dispose();
995+
}
996+
if(_heartbeatWriteTimer != null)
969997
{
970998
_heartbeatWriteTimer.Dispose();
971999
}
@@ -976,7 +1004,7 @@ protected void StopHeartbeatTimer()
9761004
///</remarks>
9771005
public void TerminateMainloop()
9781006
{
979-
StopHeartbeatTimer();
1007+
StopHeartbeatTimers();
9801008
m_running = false;
9811009
}
9821010

@@ -988,6 +1016,7 @@ public override string ToString()
9881016
public void WriteFrame(Frame f)
9891017
{
9901018
m_frameHandler.WriteFrame(f);
1019+
m_heartbeatWrite.Set();
9911020
}
9921021

9931022
///<summary>API-side invocation of connection abort.</summary>
@@ -1060,7 +1089,7 @@ public void HandleConnectionUnblocked()
10601089

10611090
void IDisposable.Dispose()
10621091
{
1063-
StopHeartbeatTimer();
1092+
StopHeartbeatTimers();
10641093
Abort();
10651094
if (ShutdownReport.Count > 0)
10661095
{

projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,9 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
9494
}
9595

9696
Stream netstream = m_socket.GetStream();
97-
netstream.ReadTimeout = timeout;
98-
netstream.WriteTimeout = timeout;
97+
// make sure the socket timeout is greater than heartbeat timeout
98+
netstream.ReadTimeout = timeout * 4;
99+
netstream.WriteTimeout = timeout * 4;
99100

100101
if (endpoint.Ssl.Enabled)
101102
{
@@ -143,7 +144,8 @@ public int Timeout
143144
{
144145
if (m_socket.Connected)
145146
{
146-
m_socket.ReceiveTimeout = value;
147+
// make sure the socket timeout is greater than heartbeat interval
148+
m_socket.ReceiveTimeout = value * 4;
147149
}
148150
}
149151
#pragma warning disable 0168
@@ -163,7 +165,13 @@ public void Close()
163165
{
164166
try
165167
{
166-
m_socket.LingerState = new LingerOption(true, SOCKET_CLOSING_TIMEOUT);
168+
try
169+
{
170+
171+
} catch (ArgumentException _)
172+
{
173+
// ignore, we are closing anyway
174+
};
167175
m_socket.Close();
168176
}
169177
catch (Exception _)

0 commit comments

Comments
 (0)