Skip to content

Commit 991bfe1

Browse files
Start heartbeat timers after connection.tune
Also simplify socket timeout configuration and flush the socket after every heartbeat sent.
1 parent 310b8c0 commit 991bfe1

File tree

3 files changed

+31
-23
lines changed

3 files changed

+31
-23
lines changed

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
122122
StartMainLoop(factory.UseBackgroundThreadsForIO);
123123
Open(insist);
124124

125-
StartHeartbeatTimers();
126125
AppDomain.CurrentDomain.DomainUnload += HandleDomainUnload;
127126
}
128127

@@ -242,10 +241,10 @@ public ushort Heartbeat
242241
set
243242
{
244243
m_heartbeat = value;
245-
// timers fire at half the interval to avoid race
244+
// timers fire at slightly below half the interval to avoid race
246245
// conditions
247-
m_heartbeatTimeSpan = TimeSpan.FromMilliseconds((value * 1000) / 2.0);
248-
m_frameHandler.Timeout = (value * 1000) / 2;
246+
m_heartbeatTimeSpan = TimeSpan.FromMilliseconds((value * 1000) / 4);
247+
m_frameHandler.Timeout = value * 1000 * 2;
249248
}
250249
}
251250

@@ -477,7 +476,7 @@ public void FinishClose()
477476
m_heartbeatRead.Set();
478477
m_heartbeatWrite.Set();
479478
m_closed = true;
480-
StopHeartbeatTimers();
479+
MaybeStopHeartbeatTimers();
481480

482481
m_frameHandler.Close();
483482
m_model0.SetCloseReason(m_closeReason);
@@ -896,15 +895,15 @@ public bool SetCloseReason(ShutdownEventArgs reason)
896895
}
897896
}
898897

899-
public void StartHeartbeatTimers()
898+
public void MaybeStartHeartbeatTimers()
900899
{
901900
if (Heartbeat != 0)
902901
{
903902
_heartbeatWriteTimer = new Timer(HeartbeatWriteTimerCallback);
904-
_heartbeatWriteTimer.Change(TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(-1));
903+
_heartbeatWriteTimer.Change(TimeSpan.FromMilliseconds(200), m_heartbeatTimeSpan);
905904

906905
_heartbeatReadTimer = new Timer(HeartbeatReadTimerCallback);
907-
_heartbeatReadTimer.Change(TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(-1));
906+
_heartbeatReadTimer.Change(TimeSpan.FromMilliseconds(200), m_heartbeatTimeSpan);
908907
}
909908
}
910909

@@ -930,8 +929,11 @@ public void HeartbeatReadTimerCallback(object state)
930929
m_missedHeartbeats = 0;
931930
}
932931

933-
// Has to miss two full heartbeats to force socket close
934-
if (m_missedHeartbeats > 1)
932+
// We check against 8 = 2 * 4 because we need to wait for at
933+
// least two complete heartbeat setting intervals before
934+
// complaining, and we've set the socket timeout to a quarter
935+
// of the heartbeat setting in setHeartbeat above.
936+
if (m_missedHeartbeats > 2 * 4)
935937
{
936938
String description = String.Format("Heartbeat missing with heartbeat == {0} seconds", m_heartbeat);
937939
var eose = new EndOfStreamException(description);
@@ -960,10 +962,8 @@ public void HeartbeatWriteTimerCallback(object state)
960962
{
961963
if (!m_closed)
962964
{
963-
if (!m_heartbeatWrite.WaitOne(0, false))
964-
{
965-
WriteFrame(m_heartbeatFrame);
966-
}
965+
WriteFrame(m_heartbeatFrame);
966+
m_frameHandler.Flush();
967967
}
968968
}
969969
catch (Exception e)
@@ -981,13 +981,9 @@ public void HeartbeatWriteTimerCallback(object state)
981981
TerminateMainloop();
982982
FinishClose();
983983
}
984-
else
985-
{
986-
_heartbeatWriteTimer.Change(Heartbeat * 1000, Timeout.Infinite);
987-
}
988984
}
989985

990-
protected void StopHeartbeatTimers()
986+
protected void MaybeStopHeartbeatTimers()
991987
{
992988
if(_heartbeatReadTimer != null)
993989
{
@@ -1004,7 +1000,7 @@ protected void StopHeartbeatTimers()
10041000
///</remarks>
10051001
public void TerminateMainloop()
10061002
{
1007-
StopHeartbeatTimers();
1003+
MaybeStopHeartbeatTimers();
10081004
m_running = false;
10091005
}
10101006

@@ -1089,7 +1085,7 @@ public void HandleConnectionUnblocked()
10891085

10901086
void IDisposable.Dispose()
10911087
{
1092-
StopHeartbeatTimers();
1088+
MaybeStopHeartbeatTimers();
10931089
Abort();
10941090
if (ShutdownReport.Count > 0)
10951091
{
@@ -1219,6 +1215,8 @@ protected void StartAndTune()
12191215
heartbeat);
12201216

12211217
m_inConnectionNegotiation = false;
1218+
// now we can start heartbeat timers
1219+
MaybeStartHeartbeatTimers();
12221220
}
12231221

12241222
private static uint NegotiatedMaxValue(uint clientValue, uint serverValue)

projects/client/RabbitMQ.Client/src/client/impl/IFrameHandler.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,7 @@ public interface IFrameHandler
6868
void SendHeader();
6969

7070
void WriteFrame(Frame frame);
71+
72+
void Flush();
7173
}
7274
}

projects/client/RabbitMQ.Client/src/client/impl/SocketFrameHandler.cs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
9595

9696
Stream netstream = m_socket.GetStream();
9797
// make sure the socket timeout is greater than heartbeat timeout
98-
netstream.ReadTimeout = timeout * 4;
99-
netstream.WriteTimeout = timeout * 4;
98+
netstream.ReadTimeout = timeout;
99+
netstream.WriteTimeout = timeout;
100100

101101
if (endpoint.Ssl.Enabled)
102102
{
@@ -226,6 +226,14 @@ public void WriteFrame(Frame frame)
226226
}
227227
}
228228

229+
public void Flush()
230+
{
231+
lock (m_writer)
232+
{
233+
m_writer.Flush();
234+
}
235+
}
236+
229237
private void Connect(TcpClient socket, AmqpTcpEndpoint endpoint, int timeout)
230238
{
231239
IAsyncResult ar = null;

0 commit comments

Comments
 (0)