From 18e57ad88c671fc3f46ebab5e80bd73b031c61cb Mon Sep 17 00:00:00 2001 From: Jean-Christophe Cura Date: Mon, 6 Oct 2025 16:11:53 +0200 Subject: [PATCH 1/3] Move PhysicalConnection? physical member in a new class with all accesses threadsafe --- src/StackExchange.Redis/PhysicalBridge.cs | 344 +++++++++--------- .../ThreadSafePhysicalConnectionAccessor.cs | 263 +++++++++++++ 2 files changed, 441 insertions(+), 166 deletions(-) create mode 100644 src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index c430cf5af..e640dae2c 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -52,7 +52,8 @@ internal sealed class PhysicalBridge : IDisposable // private volatile int missedHeartbeats; private long operationCount, socketCount; - private volatile PhysicalConnection? physical; + + private readonly ThreadSafePhysicalConnectionAccessor threadSafePhysicalConnectionAccessor = new ThreadSafePhysicalConnectionAccessor(); private long profileLastLog; private int profileLogIndex; @@ -61,7 +62,7 @@ internal sealed class PhysicalBridge : IDisposable private volatile int state = (int)State.Disconnected; - internal long? ConnectionId => physical?.ConnectionId; + internal long? ConnectionId => threadSafePhysicalConnectionAccessor.GetConnectionId(); #if NETCOREAPP private readonly SemaphoreSlim _singleWriterMutex = new(1, 1); @@ -69,7 +70,7 @@ internal sealed class PhysicalBridge : IDisposable private readonly MutexSlim _singleWriterMutex; #endif - internal string? PhysicalName => physical?.ToString(); + internal string? PhysicalName => threadSafePhysicalConnectionAccessor.GetPhysicalName(); private uint _nextHighIntegrityToken; // zero means not enabled @@ -114,7 +115,7 @@ public enum State : byte public ServerEndPoint ServerEndPoint { get; } - public long SubscriptionCount => physical?.SubscriptionCount ?? 0; + public long SubscriptionCount => threadSafePhysicalConnectionAccessor.GetSubscriptionCount(); internal State ConnectionState => (State)state; internal bool IsBeating => Interlocked.CompareExchange(ref beating, 0, 0) == 1; @@ -142,10 +143,7 @@ public void Dispose() _backlogAutoReset?.Dispose(); } catch { } - using (var tmp = physical) - { - physical = null; - } + threadSafePhysicalConnectionAccessor.DisposePhysicalConnection(); GC.SuppressFinalize(this); } @@ -160,13 +158,7 @@ public void Dispose() // in a finalizer, but we need to kill that socket, // and this is the first place that isn't going to // be rooted by the socket async bits - try - { - var tmp = physical; - physical = null; - tmp?.Shutdown(); - } - catch { } + threadSafePhysicalConnectionAccessor.Shutdown(); } public void ReportNextFailure() { @@ -212,8 +204,16 @@ public WriteResult TryWriteSync(Message message, bool isReplica) if (isDisposed) throw new ObjectDisposedException(Name); if (!IsConnected) return QueueOrFailMessage(message); - var physical = this.physical; - if (physical == null) + WriteResult result = WriteResult.WriteFailure; + if (threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => + { + result = WriteMessageTakingWriteLockSync(physical, message); + LogNonPreferred(message.Flags, isReplica); + })) + { + return result; + } + else { // If we're not connected yet and supposed to, queue it up if (Multiplexer.RawConfig.BacklogPolicy.QueueWhileDisconnected) @@ -226,18 +226,26 @@ public WriteResult TryWriteSync(Message message, bool isReplica) } return FailDueToNoConnection(message); } - var result = WriteMessageTakingWriteLockSync(physical, message); - LogNonPreferred(message.Flags, isReplica); - return result; } public ValueTask TryWriteAsync(Message message, bool isReplica, bool bypassBacklog = false) { if (isDisposed) throw new ObjectDisposedException(Name); - if (!IsConnected && !bypassBacklog) return new ValueTask(QueueOrFailMessage(message)); + if (!IsConnected && !bypassBacklog) + { + return new ValueTask(QueueOrFailMessage(message)); + } - var physical = this.physical; - if (physical == null) + ValueTask result = new ValueTask(WriteResult.Success); + if (threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => + { + result = WriteMessageTakingWriteLockAsync(physical, message, bypassBacklog: bypassBacklog); + LogNonPreferred(message.Flags, isReplica); + })) + { + return result; + } + else { // If we're not connected yet and supposed to, queue it up if (!bypassBacklog && Multiplexer.RawConfig.BacklogPolicy.QueueWhileDisconnected) @@ -250,10 +258,6 @@ public ValueTask TryWriteAsync(Message message, bool isReplica, boo } return new ValueTask(FailDueToNoConnection(message)); } - - var result = WriteMessageTakingWriteLockAsync(physical, message, bypassBacklog: bypassBacklog); - LogNonPreferred(message.Flags, isReplica); - return result; } internal void AppendProfile(StringBuilder sb) @@ -287,7 +291,7 @@ internal void GetCounters(ConnectionCounters counters) counters.SocketCount = Interlocked.Read(ref socketCount); counters.WriterCount = Interlocked.CompareExchange(ref activeWriters, 0, 0); counters.NonPreferredEndpointCount = Interlocked.Read(ref nonPreferredEndpointCount); - physical?.GetCounters(counters); + threadSafePhysicalConnectionAccessor.GetCounters(counters); } internal readonly struct BridgeStatus @@ -354,7 +358,7 @@ public override string ToString() => BacklogMessagesPendingCounter = Volatile.Read(ref _backlogCurrentEnqueued), BacklogStatus = _backlogStatus, TotalBacklogMessagesQueued = _backlogTotalEnqueued, - Connection = physical?.GetStatus() ?? PhysicalConnection.ConnectionStatus.Default, + Connection = threadSafePhysicalConnectionAccessor.GetStatus(), }; internal string GetStormLog() @@ -362,7 +366,7 @@ internal string GetStormLog() var sb = new StringBuilder("Storm log for ").Append(Format.ToString(ServerEndPoint.EndPoint)).Append(" / ").Append(ConnectionType) .Append(" at ").Append(DateTime.UtcNow) .AppendLine().AppendLine(); - physical?.GetStormLog(sb); + threadSafePhysicalConnectionAccessor.GetStormLog(sb); sb.Append("Circular op-count snapshot:"); AppendProfile(sb); sb.AppendLine(); @@ -380,7 +384,7 @@ internal void IncrementOpCount() /// Whether to run even then the connection isn't idle. internal void KeepAlive(bool forceRun = false) { - if (!forceRun && !(physical?.IsIdle() ?? false)) return; // don't pile on if already doing something + if (!forceRun && !threadSafePhysicalConnectionAccessor.IsIdle()) return; // don't pile on if already doing something var commandMap = Multiplexer.CommandMap; Message? msg = null; @@ -410,8 +414,12 @@ internal void KeepAlive(bool forceRun = false) { msg.SetInternalCall(); Multiplexer.Trace("Enqueue: " + msg); - Multiplexer.OnInfoMessage($"heartbeat ({physical?.LastWriteSecondsAgo}s >= {ServerEndPoint.WriteEverySeconds}s, {physical?.GetSentAwaitingResponseCount()} waiting) '{msg.CommandAndKey}' on '{PhysicalName}' (v{features.Version})"); - physical?.UpdateLastWriteTime(); // preemptively + threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => + { + Multiplexer.OnInfoMessage($"heartbeat ({physical?.LastWriteSecondsAgo}s >= {ServerEndPoint.WriteEverySeconds}s, {physical?.GetSentAwaitingResponseCount()} waiting) '{msg.CommandAndKey}' on '{PhysicalName}' (v{features.Version})"); + physical?.UpdateLastWriteTime(); // preemptively + }); + #pragma warning disable CS0618 // Type or member is obsolete var result = TryWriteSync(msg, ServerEndPoint.IsReplica); #pragma warning restore CS0618 @@ -427,7 +435,7 @@ internal void KeepAlive(bool forceRun = false) internal async Task OnConnectedAsync(PhysicalConnection connection, ILogger? log) { Trace("OnConnected"); - if (physical == connection && !isDisposed && ChangeState(State.Connecting, State.ConnectedEstablishing)) + if (threadSafePhysicalConnectionAccessor.ConnectionMatch(connection) && !isDisposed && ChangeState(State.Connecting, State.ConnectedEstablishing)) { ConnectedAt ??= DateTime.UtcNow; await ServerEndPoint.OnEstablishingAsync(connection, log).ForAwait(); @@ -445,11 +453,14 @@ internal async Task OnConnectedAsync(PhysicalConnection connection, ILogger? log internal void ResetNonConnected() { - var tmp = physical; - if (tmp != null && state != (int)State.ConnectedEstablished) + threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => { - tmp.RecordConnectionFailed(ConnectionFailureType.UnableToConnect); - } + if (state != (int)State.ConnectedEstablished) + { + physical.RecordConnectionFailed(ConnectionFailureType.UnableToConnect); + } + }); + TryConnect(null); } @@ -485,11 +496,13 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti oldState = default(State); // only defined when isCurrent = true ConnectedAt = default; - if (isCurrent = physical == connection) + isCurrent = false; + + if (threadSafePhysicalConnectionAccessor.ClearPhysicalIfMatch(connection, out bool isNull)) { + isCurrent = true; Trace("Bridge noting disconnect from active connection" + (isDisposed ? " (disposed)" : "")); oldState = ChangeState(State.Disconnected); - physical = null; if (oldState == State.ConnectedEstablished && !ServerEndPoint.IsReplica) { // if the disconnected endpoint was a primary endpoint run info replication @@ -506,7 +519,7 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti TryConnect(null); // try to connect immediately } } - else if (physical == null) + else if (isNull) { Trace("Bridge noting disconnect (already terminated)"); } @@ -529,7 +542,7 @@ internal void OnFullyEstablished(PhysicalConnection connection, string source) { Trace("OnFullyEstablished"); connection.SetIdle(); - if (physical == connection && !isDisposed && ChangeState(State.ConnectedEstablishing, State.ConnectedEstablished)) + if (threadSafePhysicalConnectionAccessor.ConnectionMatch(connection) && !isDisposed && ChangeState(State.ConnectedEstablishing, State.ConnectedEstablished)) { reportNextFailure = reconfigureNextFailure = true; LastException = null; @@ -592,90 +605,24 @@ internal void OnHeartbeat(bool ifConnectedOnly) Multiplexer.Logger?.LogErrorConnectionIssue(ex, ex.Message); Trace("Aborting connect"); // abort and reconnect - var snapshot = physical; - OnDisconnected(ConnectionFailureType.UnableToConnect, snapshot, out bool isCurrent, out State oldState); - snapshot?.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely + threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => + { + OnDisconnected(ConnectionFailureType.UnableToConnect, physical, out bool isCurrent, out State oldState); + physical.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely + }); TryConnect(null); } break; case (int)State.ConnectedEstablishing: - // (Fall through) Happens when we successfully connected via TCP, but no Redis handshake completion yet. - // This can happen brief (usual) or when the server never answers (rare). When we're in this state, - // a socket is open and reader likely listening indefinitely for incoming data on an async background task. - // We need to time that out and cleanup the PhysicalConnection if needed, otherwise that reader and socket will remain open - // for the lifetime of the application due to being orphaned, yet still referenced by the active task doing the pipe read. + // (Fall through) Happens when we successfully connected via TCP, but no Redis handshake completion yet. + // This can happen brief (usual) or when the server never answers (rare). When we're in this state, + // a socket is open and reader likely listening indefinitely for incoming data on an async background task. + // We need to time that out and cleanup the PhysicalConnection if needed, otherwise that reader and socket will remain open + // for the lifetime of the application due to being orphaned, yet still referenced by the active task doing the pipe read. case (int)State.ConnectedEstablished: // Track that we should reset the count on the next disconnect, but not do so in a loop shouldResetConnectionRetryCount = true; - var tmp = physical; - if (tmp != null) - { - if (state == (int)State.ConnectedEstablished) - { - Interlocked.Exchange(ref connectTimeoutRetryCount, 0); - tmp.BridgeCouldBeNull?.ServerEndPoint?.ClearUnselectable(UnselectableFlags.DidNotRespond); - } - int timedOutThisHeartbeat = tmp.OnBridgeHeartbeat(); - int writeEverySeconds = ServerEndPoint.WriteEverySeconds; - bool configCheckDue = ServerEndPoint.ConfigCheckSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= ServerEndPoint.ConfigCheckSeconds; - - if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive - && tmp.BridgeCouldBeNull?.Multiplexer.RawConfig.HeartbeatConsistencyChecks == true) - { - // If HeartbeatConsistencyChecks are enabled, we're sending a PING (expecting PONG) or ECHO (expecting UniqueID back) every single - // heartbeat as an opt-in measure to react to any network stream drop ASAP to terminate the connection as faulted. - // If we don't get the expected response to that command, then the connection is terminated. - // This is to prevent the case of things like 100% string command usage where a protocol error isn't otherwise encountered. - KeepAlive(forceRun: true); - - // If we're configured to check info replication, perform that too - if (configCheckDue) - { - ServerEndPoint.CheckInfoReplication(); - } - } - else if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive - && configCheckDue - && ServerEndPoint.CheckInfoReplication()) - { - // that serves as a keep-alive, if it is accepted - } - else if (writeEverySeconds > 0 && tmp.LastWriteSecondsAgo >= writeEverySeconds) - { - Trace("OnHeartbeat - overdue"); - if (state == (int)State.ConnectedEstablished) - { - KeepAlive(); - } - else - { - OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out bool ignore, out State oldState); - tmp.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely - } - } - else if (writeEverySeconds <= 0 - && tmp.IsIdle() - && tmp.LastWriteSecondsAgo > 2 - && tmp.GetSentAwaitingResponseCount() != 0) - { - // There's a chance this is a dead socket; sending data will shake that up a bit, - // so if we have an empty unsent queue and a non-empty sent queue, test the socket. - KeepAlive(); - } - - // This is an "always" check - we always want to evaluate a dead connection from a non-responsive sever regardless of the need to heartbeat above - if (timedOutThisHeartbeat > 0 - && tmp.LastReadSecondsAgo * 1_000 > (tmp.BridgeCouldBeNull?.Multiplexer.AsyncTimeoutMilliseconds * 4)) - { - // If we've received *NOTHING* on the pipe in 4 timeouts worth of time and we're timing out commands, issue a connection failure so that we reconnect - // This is meant to address the scenario we see often in Linux configs where TCP retries will happen for 15 minutes. - // To us as a client, we'll see the socket as green/open/fine when writing but we'll bet getting nothing back. - // Since we can't depend on the pipe to fail in that case, we want to error here based on the criteria above so we reconnect broken clients much faster. - tmp.BridgeCouldBeNull?.Multiplexer.Logger?.LogWarningDeadSocketDetected(tmp.LastReadSecondsAgo, timedOutThisHeartbeat); - OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out _, out State oldState); - tmp.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely - } - } + threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(ManageHeartBeatPhysicalConnection()); break; case (int)State.Disconnected: // Only if we should reset the connection count @@ -712,6 +659,74 @@ internal void OnHeartbeat(bool ifConnectedOnly) } } + private Action ManageHeartBeatPhysicalConnection() => physical => + { + if (state == (int)State.ConnectedEstablished) + { + Interlocked.Exchange(ref connectTimeoutRetryCount, 0); + physical.BridgeCouldBeNull?.ServerEndPoint?.ClearUnselectable(UnselectableFlags.DidNotRespond); + } + int timedOutThisHeartbeat = physical.OnBridgeHeartbeat(); + int writeEverySeconds = ServerEndPoint.WriteEverySeconds; + bool configCheckDue = ServerEndPoint.ConfigCheckSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= ServerEndPoint.ConfigCheckSeconds; + + if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive + && physical.BridgeCouldBeNull?.Multiplexer.RawConfig.HeartbeatConsistencyChecks == true) + { + // If HeartbeatConsistencyChecks are enabled, we're sending a PING (expecting PONG) or ECHO (expecting UniqueID back) every single + // heartbeat as an opt-in measure to react to any network stream drop ASAP to terminate the connection as faulted. + // If we don't get the expected response to that command, then the connection is terminated. + // This is to prevent the case of things like 100% string command usage where a protocol error isn't otherwise encountered. + KeepAlive(forceRun: true); + + // If we're configured to check info replication, perform that too + if (configCheckDue) + { + ServerEndPoint.CheckInfoReplication(); + } + } + else if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive + && configCheckDue + && ServerEndPoint.CheckInfoReplication()) + { + // that serves as a keep-alive, if it is accepted + } + else if (writeEverySeconds > 0 && physical.LastWriteSecondsAgo >= writeEverySeconds) + { + Trace("OnHeartbeat - overdue"); + if (state == (int)State.ConnectedEstablished) + { + KeepAlive(); + } + else + { + OnDisconnected(ConnectionFailureType.SocketFailure, physical, out bool ignore, out State oldState); + physical.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely + } + } + else if (writeEverySeconds <= 0 + && physical.IsIdle() + && physical.LastWriteSecondsAgo > 2 + && physical.GetSentAwaitingResponseCount() != 0) + { + // There's a chance this is a dead socket; sending data will shake that up a bit, + // so if we have an empty unsent queue and a non-empty sent queue, test the socket. + KeepAlive(); + } + + // This is an "always" check - we always want to evaluate a dead connection from a non-responsive sever regardless of the need to heartbeat above + if (timedOutThisHeartbeat > 0 + && physical.LastReadSecondsAgo * 1_000 > (physical.BridgeCouldBeNull?.Multiplexer.AsyncTimeoutMilliseconds * 4)) + { + // If we've received *NOTHING* on the pipe in 4 timeouts worth of time and we're timing out commands, issue a connection failure so that we reconnect + // This is meant to address the scenario we see often in Linux configs where TCP retries will happen for 15 minutes. + // To us as a client, we'll see the socket as green/open/fine when writing but we'll bet getting nothing back. + // Since we can't depend on the pipe to fail in that case, we want to error here based on the criteria above so we reconnect broken clients much faster. + physical.BridgeCouldBeNull?.Multiplexer.Logger?.LogWarningDeadSocketDetected(physical.LastReadSecondsAgo, timedOutThisHeartbeat); + OnDisconnected(ConnectionFailureType.SocketFailure, physical, out _, out State oldState); + physical.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely + } + }; [Conditional("VERBOSE")] internal void Trace(string message) => Multiplexer.Trace(message, ToString()); @@ -732,19 +747,19 @@ internal bool TryEnqueue(List messages, bool isReplica) return false; } - var physical = this.physical; - if (physical == null) return false; - foreach (var message in messages) + return threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => { - // deliberately not taking a single lock here; we don't care if - // other threads manage to interleave - in fact, it would be desirable - // (to avoid a batch monopolising the connection) + foreach (var message in messages) + { + // deliberately not taking a single lock here; we don't care if + // other threads manage to interleave - in fact, it would be desirable + // (to avoid a batch monopolising the connection) #pragma warning disable CS0618 // Type or member is obsolete - WriteMessageTakingWriteLockSync(physical, message); + WriteMessageTakingWriteLockSync(physical, message); #pragma warning restore CS0618 - LogNonPreferred(message.Flags, isReplica); - } - return true; + LogNonPreferred(message.Flags, isReplica); + } + }); } private Message? _activeMessage; @@ -1147,7 +1162,7 @@ private void ProcessBridgeBacklog() // Only execute if we're connected. // Timeouts are handled above, so we're exclusively into backlog items eligible to write at this point. // If we can't write them, abort and wait for the next heartbeat or activation to try this again. - while (IsConnected && physical?.HasOutputPipe == true) + while (IsConnected && threadSafePhysicalConnectionAccessor.HasOutputPipe == true) { Message? message; _backlogStatus = BacklogStatus.CheckingForWork; @@ -1165,23 +1180,24 @@ private void ProcessBridgeBacklog() try { _backlogStatus = BacklogStatus.WritingMessage; - var result = WriteMessageInsideLock(physical, message); - - if (result == WriteResult.Success) + threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => { - _backlogStatus = BacklogStatus.Flushing; + var result = WriteMessageInsideLock(physical, message); + if (result == WriteResult.Success) + { + _backlogStatus = BacklogStatus.Flushing; #pragma warning disable CS0618 // Type or member is obsolete - result = physical.FlushSync(false, TimeoutMilliseconds); + result = physical.FlushSync(false, TimeoutMilliseconds); #pragma warning restore CS0618 // Type or member is obsolete - } - - _backlogStatus = BacklogStatus.MarkingInactive; - if (result != WriteResult.Success) - { - _backlogStatus = BacklogStatus.RecordingWriteFailure; - var ex = Multiplexer.GetException(result, message, ServerEndPoint); - HandleWriteException(message, ex); - } + } + _backlogStatus = BacklogStatus.MarkingInactive; + if (result != WriteResult.Success) + { + _backlogStatus = BacklogStatus.RecordingWriteFailure; + var ex = Multiplexer.GetException(result, message, ServerEndPoint); + HandleWriteException(message, ex); + } + }); } catch (Exception ex) { @@ -1194,7 +1210,7 @@ private void ProcessBridgeBacklog() } } _backlogStatus = BacklogStatus.SettingIdle; - physical?.SetIdle(); + threadSafePhysicalConnectionAccessor.SetIdle(); _backlogStatus = BacklogStatus.Inactive; } finally @@ -1219,7 +1235,7 @@ public bool HasPendingCallerFacingItems() if (!item.IsInternalCall) return true; } } - return physical?.HasPendingCallerFacingItems() ?? false; + return threadSafePhysicalConnectionAccessor.HasPendingCallerFacingItems; } private WriteResult TimedOutBeforeWrite(Message message) @@ -1396,22 +1412,22 @@ private async ValueTask CompleteWriteAndReleaseLockAsync( #if !NETCOREAPP using (lockToken) #endif - try - { - var result = await flush.ForAwait(); - physical?.SetIdle(); - return result; - } - catch (Exception ex) - { - return HandleWriteException(message, ex); - } - finally - { + try + { + var result = await flush.ForAwait(); + threadSafePhysicalConnectionAccessor.SetIdle(); + return result; + } + catch (Exception ex) + { + return HandleWriteException(message, ex); + } + finally + { #if NETCOREAPP _singleWriterMutex.Release(); #endif - } + } } private WriteResult HandleWriteException(Message message, Exception ex) @@ -1445,7 +1461,7 @@ private bool ChangeState(State oldState, State newState) return result; } - public PhysicalConnection? TryConnect(ILogger? log) + public void TryConnect(ILogger? log) { if (state == (int)State.Disconnected) { @@ -1461,12 +1477,9 @@ private bool ChangeState(State oldState, State newState) Interlocked.Exchange(ref connectStartTicks, Environment.TickCount); // separate creation and connection for case when connection completes synchronously // in that case PhysicalConnection will call back to PhysicalBridge, and most PhysicalBridge methods assume that physical is not null; - physical = new PhysicalConnection(this); - - physical.BeginConnectAsync(log).RedisFireAndForget(); + threadSafePhysicalConnectionAccessor.CreateConnection(this, log); } } - return null; } catch (Exception ex) { @@ -1477,7 +1490,6 @@ private bool ChangeState(State oldState, State newState) throw; } } - return physical; } private void LogNonPreferred(CommandFlags flags, bool isReplica) @@ -1685,7 +1697,7 @@ internal void SimulateConnectionFailure(SimulatedFailureType failureType) { throw ExceptionFactory.AdminModeNotEnabled(Multiplexer.RawConfig.IncludeDetailInExceptions, RedisCommand.DEBUG, null, ServerEndPoint); // close enough } - physical?.SimulateConnectionFailure(failureType); + threadSafePhysicalConnectionAccessor.SimulateConnectionFailure(failureType); } internal RedisCommand? GetActiveMessage() => Volatile.Read(ref _activeMessage)?.Command; diff --git a/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs b/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs new file mode 100644 index 000000000..01705255f --- /dev/null +++ b/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs @@ -0,0 +1,263 @@ +using System; +using System.Text; +using System.Threading; +using Microsoft.Extensions.Logging; + +namespace StackExchange.Redis +{ + internal sealed class ThreadSafePhysicalConnectionAccessor + { + private PhysicalConnection? physical; + private readonly ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); + internal ThreadSafePhysicalConnectionAccessor() + { + } + + public bool HasOutputPipe + { + get + { + try + { + rwLock.EnterReadLock(); + return physical?.HasOutputPipe ?? false; + } + finally + { + rwLock.ExitReadLock(); + } + } + } + + public bool HasPendingCallerFacingItems + { + get + { + try + { + rwLock.EnterReadLock(); + return physical?.HasPendingCallerFacingItems() ?? false; + } + finally + { + rwLock.ExitReadLock(); + } + } + } + + internal void CreateConnection(PhysicalBridge bridge, ILogger? log) + { + try + { + rwLock.EnterWriteLock(); + physical = new PhysicalConnection(bridge); + physical.BeginConnectAsync(log).RedisFireAndForget(); + } + finally + { + rwLock.ExitWriteLock(); + } + } + + internal long? GetConnectionId() + { + try + { + rwLock.EnterReadLock(); + return physical?.ConnectionId; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal string? GetPhysicalName() + { + try + { + rwLock.EnterReadLock(); + return physical?.ToString(); + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal long GetSubscriptionCount() + { + try + { + rwLock.EnterReadLock(); + return physical?.SubscriptionCount ?? 0; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal void DisposePhysicalConnection() + { + try + { + rwLock.EnterWriteLock(); + if (physical != null) + { + physical.Dispose(); + physical = null; + } + } + finally + { + rwLock.ExitWriteLock(); + } + } + + internal void Shutdown() + { + try + { + rwLock.EnterWriteLock(); + try + { + physical?.Shutdown(); + } + catch { } + } + finally + { + rwLock.ExitWriteLock(); + } + } + + internal bool WorkOnPhysicalWithLock(Action callBack) + { + try + { + rwLock.EnterWriteLock(); + if (physical != null) + { + callBack(physical); + return true; + } + return false; + } + finally + { + rwLock.ExitWriteLock(); + } + } + + internal void SimulateConnectionFailure(SimulatedFailureType failureType) + { + try + { + rwLock.EnterWriteLock(); + physical?.SimulateConnectionFailure(failureType); + } + finally + { + rwLock.ExitWriteLock(); + } + } + + internal void SetIdle() + { + try + { + rwLock.EnterWriteLock(); + physical?.SetIdle(); + } + finally + { + rwLock.ExitWriteLock(); + } + } + + internal void GetCounters(ConnectionCounters counters) + { + try + { + rwLock.EnterReadLock(); + physical?.GetCounters(counters); + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal PhysicalConnection.ConnectionStatus GetStatus() + { + try + { + rwLock.EnterReadLock(); + return physical?.GetStatus() ?? PhysicalConnection.ConnectionStatus.Default; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal void GetStormLog(StringBuilder sb) + { + try + { + rwLock.EnterReadLock(); + physical?.GetStormLog(sb); + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal bool IsIdle() + { + try + { + rwLock.EnterReadLock(); + return physical?.IsIdle() ?? false; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal bool ConnectionMatch(PhysicalConnection connection) + { + try + { + rwLock.EnterReadLock(); + return physical == connection; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal bool ClearPhysicalIfMatch(PhysicalConnection? connection, out bool isNull) + { + try + { + rwLock.EnterWriteLock(); + isNull = physical == null; + if (physical == connection) + { + physical = null; + return true; + } + return false; + } + finally + { + rwLock.ExitWriteLock(); + } + } + } +} From 44c765ad2dac583fd8f78b72b3f7bbd199190c51 Mon Sep 17 00:00:00 2001 From: Jean-Christophe Cura Date: Mon, 6 Oct 2025 16:17:54 +0200 Subject: [PATCH 2/3] fix access modifiers --- .../ThreadSafePhysicalConnectionAccessor.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs b/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs index 01705255f..773d48203 100644 --- a/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs +++ b/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs @@ -13,7 +13,7 @@ internal ThreadSafePhysicalConnectionAccessor() { } - public bool HasOutputPipe + internal bool HasOutputPipe { get { @@ -29,7 +29,7 @@ public bool HasOutputPipe } } - public bool HasPendingCallerFacingItems + internal bool HasPendingCallerFacingItems { get { From 286b096e76244b82f99c34e02a32ed6934c178f3 Mon Sep 17 00:00:00 2001 From: Jean-Christophe Cura Date: Wed, 8 Oct 2025 15:29:07 +0200 Subject: [PATCH 3/3] Write lock only when applying changes on physical member (dispose, shutdown, =null ...) => so in normal run, only read locks are done => no slowdown out of connections problems because of read lock are reentrant --- src/StackExchange.Redis/PhysicalBridge.cs | 10 +++---- .../ThreadSafePhysicalConnectionAccessor.cs | 26 ++++++++++++++++--- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index e640dae2c..a6d9e5d73 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -205,7 +205,7 @@ public WriteResult TryWriteSync(Message message, bool isReplica) if (!IsConnected) return QueueOrFailMessage(message); WriteResult result = WriteResult.WriteFailure; - if (threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => + if (threadSafePhysicalConnectionAccessor.UsePhysicalWithLock(physical => { result = WriteMessageTakingWriteLockSync(physical, message); LogNonPreferred(message.Flags, isReplica); @@ -237,7 +237,7 @@ public ValueTask TryWriteAsync(Message message, bool isReplica, boo } ValueTask result = new ValueTask(WriteResult.Success); - if (threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => + if (threadSafePhysicalConnectionAccessor.UsePhysicalWithLock(physical => { result = WriteMessageTakingWriteLockAsync(physical, message, bypassBacklog: bypassBacklog); LogNonPreferred(message.Flags, isReplica); @@ -414,7 +414,7 @@ internal void KeepAlive(bool forceRun = false) { msg.SetInternalCall(); Multiplexer.Trace("Enqueue: " + msg); - threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => + threadSafePhysicalConnectionAccessor.UsePhysicalWithLock(physical => { Multiplexer.OnInfoMessage($"heartbeat ({physical?.LastWriteSecondsAgo}s >= {ServerEndPoint.WriteEverySeconds}s, {physical?.GetSentAwaitingResponseCount()} waiting) '{msg.CommandAndKey}' on '{PhysicalName}' (v{features.Version})"); physical?.UpdateLastWriteTime(); // preemptively @@ -747,7 +747,7 @@ internal bool TryEnqueue(List messages, bool isReplica) return false; } - return threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => + return threadSafePhysicalConnectionAccessor.UsePhysicalWithLock(physical => { foreach (var message in messages) { @@ -1180,7 +1180,7 @@ private void ProcessBridgeBacklog() try { _backlogStatus = BacklogStatus.WritingMessage; - threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => + threadSafePhysicalConnectionAccessor.UsePhysicalWithLock(physical => { var result = WriteMessageInsideLock(physical, message); if (result == WriteResult.Success) diff --git a/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs b/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs index 773d48203..9cbab9e33 100644 --- a/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs +++ b/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs @@ -132,6 +132,24 @@ internal void Shutdown() } } + internal bool UsePhysicalWithLock(Action callBack) + { + try + { + rwLock.EnterReadLock(); + if (physical != null) + { + callBack(physical); + return true; + } + return false; + } + finally + { + rwLock.ExitReadLock(); + } + } + internal bool WorkOnPhysicalWithLock(Action callBack) { try @@ -154,12 +172,12 @@ internal void SimulateConnectionFailure(SimulatedFailureType failureType) { try { - rwLock.EnterWriteLock(); + rwLock.EnterReadLock(); physical?.SimulateConnectionFailure(failureType); } finally { - rwLock.ExitWriteLock(); + rwLock.ExitReadLock(); } } @@ -167,12 +185,12 @@ internal void SetIdle() { try { - rwLock.EnterWriteLock(); + rwLock.EnterReadLock(); physical?.SetIdle(); } finally { - rwLock.ExitWriteLock(); + rwLock.ExitReadLock(); } }