Skip to content

Commit ca0c9ba

Browse files
Merge pull request #99 from rabbitmq/rabbitmq-dotnet-client-82
Move heartbeat implementation back to timers and reset events
2 parents ce443b5 + 991bfe1 commit ca0c9ba

File tree

6 files changed

+236
-149
lines changed

6 files changed

+236
-149
lines changed

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 148 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,28 @@ public class Connection : IConnection
7777
public EventHandler<EventArgs> m_connectionUnblocked;
7878
public IConnectionFactory m_factory;
7979
public IFrameHandler m_frameHandler;
80-
public ushort m_heartbeat = 0;
81-
public TimeSpan m_heartbeatTimeSpan = TimeSpan.FromSeconds(0);
8280

8381
public Guid m_id = Guid.NewGuid();
84-
85-
public int m_missedHeartbeats = 0;
8682
public ModelBase m_model0;
8783
public volatile bool m_running = true;
8884
public MainSession m_session0;
8985
public SessionManager m_sessionManager;
9086

9187
public IList<ShutdownReportEntry> m_shutdownReport = new SynchronizedList<ShutdownReportEntry>(new List<ShutdownReportEntry>());
88+
89+
//
90+
// Heartbeats
91+
//
92+
93+
public ushort m_heartbeat = 0;
94+
public TimeSpan m_heartbeatTimeSpan = TimeSpan.FromSeconds(0);
95+
public int m_missedHeartbeats = 0;
96+
9297
private Timer _heartbeatWriteTimer;
98+
private Timer _heartbeatReadTimer;
99+
public AutoResetEvent m_heartbeatRead = new AutoResetEvent(false);
100+
public AutoResetEvent m_heartbeatWrite = new AutoResetEvent(false);
101+
93102

94103
// true if we haven't finished connection negotiation.
95104
// In this state socket exceptions are treated as fatal connection
@@ -113,7 +122,6 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
113122
StartMainLoop(factory.UseBackgroundThreadsForIO);
114123
Open(insist);
115124

116-
StartHeartbeatTimer();
117125
AppDomain.CurrentDomain.DomainUnload += HandleDomainUnload;
118126
}
119127

@@ -233,10 +241,10 @@ public ushort Heartbeat
233241
set
234242
{
235243
m_heartbeat = value;
236-
// timers fire at half the interval to avoid race
244+
// timers fire at slightly below half the interval to avoid race
237245
// conditions
238-
m_heartbeatTimeSpan = TimeSpan.FromMilliseconds((value * 1000) / 2.0);
239-
m_frameHandler.Timeout = (value * 1000) / 2;
246+
m_heartbeatTimeSpan = TimeSpan.FromMilliseconds((value * 1000) / 4);
247+
m_frameHandler.Timeout = value * 1000 * 2;
240248
}
241249
}
242250

@@ -465,8 +473,10 @@ public void EnsureIsOpen()
465473
public void FinishClose()
466474
{
467475
// Notify hearbeat loops that they can leave
476+
m_heartbeatRead.Set();
477+
m_heartbeatWrite.Set();
468478
m_closed = true;
469-
StopHeartbeatTimer();
479+
MaybeStopHeartbeatTimers();
470480

471481
m_frameHandler.Close();
472482
m_model0.SetCloseReason(m_closeReason);
@@ -522,33 +532,6 @@ public bool HardProtocolExceptionHandler(HardProtocolException hpe)
522532
return false;
523533
}
524534

525-
public void HeartbeatWriteTimerCallback(object state)
526-
{
527-
bool shouldTerminate = false;
528-
try
529-
{
530-
if (!m_closed)
531-
{
532-
WriteFrame(m_heartbeatFrame);
533-
}
534-
}
535-
catch (Exception e)
536-
{
537-
HandleMainLoopException(new ShutdownEventArgs(
538-
ShutdownInitiator.Library,
539-
0,
540-
"End of stream",
541-
e));
542-
shouldTerminate = true;
543-
}
544-
545-
if (m_closed || shouldTerminate)
546-
{
547-
TerminateMainloop();
548-
FinishClose();
549-
}
550-
}
551-
552535
public void InternalClose(ShutdownEventArgs reason)
553536
{
554537
if (!SetCloseReason(reason))
@@ -639,96 +622,62 @@ public void MainLoop()
639622

640623
public void MainLoopIteration()
641624
{
642-
try
643-
{
644-
Frame frame = m_frameHandler.ReadFrame();
625+
Frame frame = m_frameHandler.ReadFrame();
645626

646-
// We have received an actual frame.
647-
m_missedHeartbeats = 0;
648-
if (frame.Type == Constants.FrameHeartbeat)
649-
{
650-
// Ignore it: we've already just reset the heartbeat
651-
// counter.
652-
return;
653-
}
627+
NotifyHeartbeatListener();
628+
// We have received an actual frame.
629+
if (frame.Type == Constants.FrameHeartbeat)
630+
{
631+
// Ignore it: we've already just reset the heartbeat
632+
// latch.
633+
return;
634+
}
654635

655-
if (frame.Channel == 0)
656-
{
657-
// In theory, we could get non-connection.close-ok
658-
// frames here while we're quiescing (m_closeReason !=
659-
// null). In practice, there's a limited number of
660-
// things the server can ask of us on channel 0 -
661-
// essentially, just connection.close. That, combined
662-
// with the restrictions on pipelining, mean that
663-
// we're OK here to handle channel 0 traffic in a
664-
// quiescing situation, even though technically we
665-
// should be ignoring everything except
666-
// connection.close-ok.
667-
m_session0.HandleFrame(frame);
668-
}
669-
else
636+
if (frame.Channel == 0)
637+
{
638+
// In theory, we could get non-connection.close-ok
639+
// frames here while we're quiescing (m_closeReason !=
640+
// null). In practice, there's a limited number of
641+
// things the server can ask of us on channel 0 -
642+
// essentially, just connection.close. That, combined
643+
// with the restrictions on pipelining, mean that
644+
// we're OK here to handle channel 0 traffic in a
645+
// quiescing situation, even though technically we
646+
// should be ignoring everything except
647+
// connection.close-ok.
648+
m_session0.HandleFrame(frame);
649+
}
650+
else
651+
{
652+
// If we're still m_running, but have a m_closeReason,
653+
// then we must be quiescing, which means any inbound
654+
// frames for non-zero channels (and any inbound
655+
// commands on channel zero that aren't
656+
// Connection.CloseOk) must be discarded.
657+
if (m_closeReason == null)
670658
{
671-
// If we're still m_running, but have a m_closeReason,
672-
// then we must be quiescing, which means any inbound
673-
// frames for non-zero channels (and any inbound
674-
// commands on channel zero that aren't
675-
// Connection.CloseOk) must be discarded.
676-
if (m_closeReason == null)
659+
// No close reason, not quiescing the
660+
// connection. Handle the frame. (Of course, the
661+
// Session itself may be quiescing this particular
662+
// channel, but that's none of our concern.)
663+
ISession session = m_sessionManager.Lookup(frame.Channel);
664+
if (session == null)
677665
{
678-
// No close reason, not quiescing the
679-
// connection. Handle the frame. (Of course, the
680-
// Session itself may be quiescing this particular
681-
// channel, but that's none of our concern.)
682-
ISession session = m_sessionManager.Lookup(frame.Channel);
683-
if (session == null)
684-
{
685-
throw new ChannelErrorException(frame.Channel);
686-
}
687-
else
688-
{
689-
session.HandleFrame(frame);
690-
}
666+
throw new ChannelErrorException(frame.Channel);
667+
}
668+
else
669+
{
670+
session.HandleFrame(frame);
691671
}
692672
}
693673
}
694-
catch (SocketException ioe)
695-
{
696-
HandleIOException(ioe);
697-
}
698-
catch (IOException ioe)
699-
{
700-
HandleIOException(ioe);
701-
}
702674
}
703675

704-
// socket receive timeout is configured to be 1/2 of the heartbeat timeout
705-
// and the peer must be considered dead after two subsequent missed heartbeats:
706-
// terminate after 4 socket timeouts
707-
private const int SOCKET_TIMEOUTS_TO_CONSIDER_PEER_UNRESPONSIVE = 4;
708-
709-
protected void HandleIOException(Exception e)
676+
public void NotifyHeartbeatListener()
710677
{
711-
// socket error when in negotiation, throw BrokerUnreachableException
712-
// immediately
713-
if (m_inConnectionNegotiation)
714-
{
715-
var cfe = new ConnectFailureException("I/O error before connection negotiation was completed", e);
716-
throw new BrokerUnreachableException(cfe);
717-
}
718-
719-
if (++m_missedHeartbeats >= SOCKET_TIMEOUTS_TO_CONSIDER_PEER_UNRESPONSIVE)
678+
if (m_heartbeat != 0)
720679
{
721-
var description =
722-
String.Format("Peer missed 2 heartbeats with heartbeat timeout set to {0} seconds",
723-
m_heartbeat);
724-
var eose = new EndOfStreamException(description);
725-
m_shutdownReport.Add(new ShutdownReportEntry(description, eose));
726-
HandleMainLoopException(new ShutdownEventArgs(ShutdownInitiator.Library,
727-
0,
728-
"End of stream",
729-
eose));
730-
TerminateMainloop();
731-
FinishClose();
680+
m_heartbeatRead.Set();
732681
}
733682
}
734683

@@ -946,12 +895,15 @@ public bool SetCloseReason(ShutdownEventArgs reason)
946895
}
947896
}
948897

949-
public void StartHeartbeatTimer()
898+
public void MaybeStartHeartbeatTimers()
950899
{
951900
if (Heartbeat != 0)
952901
{
953902
_heartbeatWriteTimer = new Timer(HeartbeatWriteTimerCallback);
954-
_heartbeatWriteTimer.Change(TimeSpan.FromMilliseconds(0), m_heartbeatTimeSpan);
903+
_heartbeatWriteTimer.Change(TimeSpan.FromMilliseconds(200), m_heartbeatTimeSpan);
904+
905+
_heartbeatReadTimer = new Timer(HeartbeatReadTimerCallback);
906+
_heartbeatReadTimer.Change(TimeSpan.FromMilliseconds(200), m_heartbeatTimeSpan);
955907
}
956908
}
957909

@@ -963,9 +915,81 @@ public void StartMainLoop(bool useBackgroundThread)
963915
mainLoopThread.Start();
964916
}
965917

966-
protected void StopHeartbeatTimer()
918+
public void HeartbeatReadTimerCallback(object state)
919+
{
920+
bool shouldTerminate = false;
921+
if (!m_closed)
922+
{
923+
if (!m_heartbeatRead.WaitOne(0, false))
924+
{
925+
m_missedHeartbeats++;
926+
}
927+
else
928+
{
929+
m_missedHeartbeats = 0;
930+
}
931+
932+
// We check against 8 = 2 * 4 because we need to wait for at
933+
// least two complete heartbeat setting intervals before
934+
// complaining, and we've set the socket timeout to a quarter
935+
// of the heartbeat setting in setHeartbeat above.
936+
if (m_missedHeartbeats > 2 * 4)
937+
{
938+
String description = String.Format("Heartbeat missing with heartbeat == {0} seconds", m_heartbeat);
939+
var eose = new EndOfStreamException(description);
940+
m_shutdownReport.Add(new ShutdownReportEntry(description, eose));
941+
HandleMainLoopException(
942+
new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose));
943+
shouldTerminate = true;
944+
}
945+
}
946+
947+
if (shouldTerminate)
948+
{
949+
TerminateMainloop();
950+
FinishClose();
951+
}
952+
else
953+
{
954+
_heartbeatReadTimer.Change(Heartbeat * 1000, Timeout.Infinite);
955+
}
956+
}
957+
958+
public void HeartbeatWriteTimerCallback(object state)
967959
{
968-
if (_heartbeatWriteTimer != null)
960+
bool shouldTerminate = false;
961+
try
962+
{
963+
if (!m_closed)
964+
{
965+
WriteFrame(m_heartbeatFrame);
966+
m_frameHandler.Flush();
967+
}
968+
}
969+
catch (Exception e)
970+
{
971+
HandleMainLoopException(new ShutdownEventArgs(
972+
ShutdownInitiator.Library,
973+
0,
974+
"End of stream",
975+
e));
976+
shouldTerminate = true;
977+
}
978+
979+
if (m_closed || shouldTerminate)
980+
{
981+
TerminateMainloop();
982+
FinishClose();
983+
}
984+
}
985+
986+
protected void MaybeStopHeartbeatTimers()
987+
{
988+
if(_heartbeatReadTimer != null)
989+
{
990+
_heartbeatReadTimer.Dispose();
991+
}
992+
if(_heartbeatWriteTimer != null)
969993
{
970994
_heartbeatWriteTimer.Dispose();
971995
}
@@ -976,7 +1000,7 @@ protected void StopHeartbeatTimer()
9761000
///</remarks>
9771001
public void TerminateMainloop()
9781002
{
979-
StopHeartbeatTimer();
1003+
MaybeStopHeartbeatTimers();
9801004
m_running = false;
9811005
}
9821006

@@ -988,6 +1012,7 @@ public override string ToString()
9881012
public void WriteFrame(Frame f)
9891013
{
9901014
m_frameHandler.WriteFrame(f);
1015+
m_heartbeatWrite.Set();
9911016
}
9921017

9931018
///<summary>API-side invocation of connection abort.</summary>
@@ -1060,7 +1085,7 @@ public void HandleConnectionUnblocked()
10601085

10611086
void IDisposable.Dispose()
10621087
{
1063-
StopHeartbeatTimer();
1088+
MaybeStopHeartbeatTimers();
10641089
Abort();
10651090
if (ShutdownReport.Count > 0)
10661091
{
@@ -1190,6 +1215,8 @@ protected void StartAndTune()
11901215
heartbeat);
11911216

11921217
m_inConnectionNegotiation = false;
1218+
// now we can start heartbeat timers
1219+
MaybeStartHeartbeatTimers();
11931220
}
11941221

11951222
private static uint NegotiatedMaxValue(uint clientValue, uint serverValue)

projects/client/RabbitMQ.Client/src/client/impl/IFrameHandler.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,7 @@ public interface IFrameHandler
6868
void SendHeader();
6969

7070
void WriteFrame(Frame frame);
71+
72+
void Flush();
7173
}
7274
}

0 commit comments

Comments
 (0)