Skip to content

Commit a9cf378

Browse files
author
Francois ROBION
committed
Allow heartbeat to restart the pipe thread with only sync commands
There is a thread looping in the method PhysicalConnection.ReadFromPipe to process response from Redis, match them with the sent command and signaling the completion of the message. If this thread has an exception, its catch block will call RecordConnectionFailed which will proceed to restart a new thread to continue reading Redis responses. However, if another exception occurred in the catch before the new thread can be started (in a case of high memory pressure, OOM exceptions can happen anywhere) we are in a state where no one is reading the pipe of Redis responses, and all commands sent end in timeout. If at least one async command is sent, the heartbeat thread will detect the timeout in the OnBridgeHeartbeat method, and if no read were perform for 4 heartbeat it will issue a connection failure. With this commit, this becomes true for sync commands as well. Therefore, it ensures we will not reach a state were all commands end in timeout.
1 parent 862a70e commit a9cf378

File tree

2 files changed

+16
-8
lines changed

2 files changed

+16
-8
lines changed

src/StackExchange.Redis/PhysicalBridge.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,7 @@ internal void OnHeartbeat(bool ifConnectedOnly)
615615
Interlocked.Exchange(ref connectTimeoutRetryCount, 0);
616616
tmp.BridgeCouldBeNull?.ServerEndPoint?.ClearUnselectable(UnselectableFlags.DidNotRespond);
617617
}
618-
int timedOutThisHeartbeat = tmp.OnBridgeHeartbeat();
618+
tmp.OnBridgeHeartbeat(out int asyncTimeoutThisHeartbeat, out int syncTimeoutThisHeartbeat);
619619
int writeEverySeconds = ServerEndPoint.WriteEverySeconds;
620620
bool configCheckDue = ServerEndPoint.ConfigCheckSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= ServerEndPoint.ConfigCheckSeconds;
621621

@@ -664,14 +664,15 @@ internal void OnHeartbeat(bool ifConnectedOnly)
664664
}
665665

666666
// This is an "always" check - we always want to evaluate a dead connection from a non-responsive sever regardless of the need to heartbeat above
667-
if (timedOutThisHeartbeat > 0
667+
var totalTimeoutThisHeartbeat = asyncTimeoutThisHeartbeat + syncTimeoutThisHeartbeat;
668+
if ((totalTimeoutThisHeartbeat > 0)
668669
&& tmp.LastReadSecondsAgo * 1_000 > (tmp.BridgeCouldBeNull?.Multiplexer.AsyncTimeoutMilliseconds * 4))
669670
{
670671
// If we've received *NOTHING* on the pipe in 4 timeouts worth of time and we're timing out commands, issue a connection failure so that we reconnect
671672
// This is meant to address the scenario we see often in Linux configs where TCP retries will happen for 15 minutes.
672673
// To us as a client, we'll see the socket as green/open/fine when writing but we'll bet getting nothing back.
673674
// Since we can't depend on the pipe to fail in that case, we want to error here based on the criteria above so we reconnect broken clients much faster.
674-
tmp.BridgeCouldBeNull?.Multiplexer.Logger?.LogWarningDeadSocketDetected(tmp.LastReadSecondsAgo, timedOutThisHeartbeat);
675+
tmp.BridgeCouldBeNull?.Multiplexer.Logger?.LogWarningDeadSocketDetected(tmp.LastReadSecondsAgo, totalTimeoutThisHeartbeat);
675676
OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out _, out State oldState);
676677
tmp.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely
677678
}

src/StackExchange.Redis/PhysicalConnection.cs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -746,10 +746,12 @@ internal void GetStormLog(StringBuilder sb)
746746
/// <summary>
747747
/// Runs on every heartbeat for a bridge, timing out any commands that are overdue and returning an integer of how many we timed out.
748748
/// </summary>
749-
/// <returns>How many commands were overdue and threw timeout exceptions.</returns>
750-
internal int OnBridgeHeartbeat()
749+
/// <param name="asyncTimeoutDetected">How many async commands were overdue and threw timeout exceptions.</param>
750+
/// <param name="syncTimeoutDetected">How many sync commands were overdue. No exception are thrown for these commands here.</param>
751+
internal void OnBridgeHeartbeat(out int asyncTimeoutDetected, out int syncTimeoutDetected)
751752
{
752-
var result = 0;
753+
asyncTimeoutDetected = 0;
754+
syncTimeoutDetected = 0;
753755
var now = Environment.TickCount;
754756
Interlocked.Exchange(ref lastBeatTickCount, now);
755757

@@ -776,7 +778,13 @@ internal int OnBridgeHeartbeat()
776778
multiplexer.OnMessageFaulted(msg, timeoutEx);
777779
msg.SetExceptionAndComplete(timeoutEx, bridge); // tell the message that it is doomed
778780
multiplexer.OnAsyncTimeout();
779-
result++;
781+
asyncTimeoutDetected++;
782+
}
783+
else
784+
{
785+
// Only count how many sync timeouts we detect here.
786+
// The actual timeout is handled in ConnectionMultiplexer.ExecuteSyncImpl().
787+
syncTimeoutDetected++;
780788
}
781789
}
782790
else
@@ -791,7 +799,6 @@ internal int OnBridgeHeartbeat()
791799
}
792800
}
793801
}
794-
return result;
795802
}
796803

797804
internal void OnInternalError(Exception exception, [CallerMemberName] string? origin = null)

0 commit comments

Comments
 (0)