diff --git a/src/StackExchange.Redis/PhysicalBridge.cs b/src/StackExchange.Redis/PhysicalBridge.cs index c430cf5af..a6d9e5d73 100644 --- a/src/StackExchange.Redis/PhysicalBridge.cs +++ b/src/StackExchange.Redis/PhysicalBridge.cs @@ -52,7 +52,8 @@ internal sealed class PhysicalBridge : IDisposable // private volatile int missedHeartbeats; private long operationCount, socketCount; - private volatile PhysicalConnection? physical; + + private readonly ThreadSafePhysicalConnectionAccessor threadSafePhysicalConnectionAccessor = new ThreadSafePhysicalConnectionAccessor(); private long profileLastLog; private int profileLogIndex; @@ -61,7 +62,7 @@ internal sealed class PhysicalBridge : IDisposable private volatile int state = (int)State.Disconnected; - internal long? ConnectionId => physical?.ConnectionId; + internal long? ConnectionId => threadSafePhysicalConnectionAccessor.GetConnectionId(); #if NETCOREAPP private readonly SemaphoreSlim _singleWriterMutex = new(1, 1); @@ -69,7 +70,7 @@ internal sealed class PhysicalBridge : IDisposable private readonly MutexSlim _singleWriterMutex; #endif - internal string? PhysicalName => physical?.ToString(); + internal string? PhysicalName => threadSafePhysicalConnectionAccessor.GetPhysicalName(); private uint _nextHighIntegrityToken; // zero means not enabled @@ -114,7 +115,7 @@ public enum State : byte public ServerEndPoint ServerEndPoint { get; } - public long SubscriptionCount => physical?.SubscriptionCount ?? 0; + public long SubscriptionCount => threadSafePhysicalConnectionAccessor.GetSubscriptionCount(); internal State ConnectionState => (State)state; internal bool IsBeating => Interlocked.CompareExchange(ref beating, 0, 0) == 1; @@ -142,10 +143,7 @@ public void Dispose() _backlogAutoReset?.Dispose(); } catch { } - using (var tmp = physical) - { - physical = null; - } + threadSafePhysicalConnectionAccessor.DisposePhysicalConnection(); GC.SuppressFinalize(this); } @@ -160,13 +158,7 @@ public void Dispose() // in a finalizer, but we need to kill that socket, // and this is the first place that isn't going to // be rooted by the socket async bits - try - { - var tmp = physical; - physical = null; - tmp?.Shutdown(); - } - catch { } + threadSafePhysicalConnectionAccessor.Shutdown(); } public void ReportNextFailure() { @@ -212,8 +204,16 @@ public WriteResult TryWriteSync(Message message, bool isReplica) if (isDisposed) throw new ObjectDisposedException(Name); if (!IsConnected) return QueueOrFailMessage(message); - var physical = this.physical; - if (physical == null) + WriteResult result = WriteResult.WriteFailure; + if (threadSafePhysicalConnectionAccessor.UsePhysicalWithLock(physical => + { + result = WriteMessageTakingWriteLockSync(physical, message); + LogNonPreferred(message.Flags, isReplica); + })) + { + return result; + } + else { // If we're not connected yet and supposed to, queue it up if (Multiplexer.RawConfig.BacklogPolicy.QueueWhileDisconnected) @@ -226,18 +226,26 @@ public WriteResult TryWriteSync(Message message, bool isReplica) } return FailDueToNoConnection(message); } - var result = WriteMessageTakingWriteLockSync(physical, message); - LogNonPreferred(message.Flags, isReplica); - return result; } public ValueTask TryWriteAsync(Message message, bool isReplica, bool bypassBacklog = false) { if (isDisposed) throw new ObjectDisposedException(Name); - if (!IsConnected && !bypassBacklog) return new ValueTask(QueueOrFailMessage(message)); + if (!IsConnected && !bypassBacklog) + { + return new ValueTask(QueueOrFailMessage(message)); + } - var physical = this.physical; - if (physical == null) + ValueTask result = new ValueTask(WriteResult.Success); + if (threadSafePhysicalConnectionAccessor.UsePhysicalWithLock(physical => + { + result = WriteMessageTakingWriteLockAsync(physical, message, bypassBacklog: bypassBacklog); + LogNonPreferred(message.Flags, isReplica); + })) + { + return result; + } + else { // If we're not connected yet and supposed to, queue it up if (!bypassBacklog && Multiplexer.RawConfig.BacklogPolicy.QueueWhileDisconnected) @@ -250,10 +258,6 @@ public ValueTask TryWriteAsync(Message message, bool isReplica, boo } return new ValueTask(FailDueToNoConnection(message)); } - - var result = WriteMessageTakingWriteLockAsync(physical, message, bypassBacklog: bypassBacklog); - LogNonPreferred(message.Flags, isReplica); - return result; } internal void AppendProfile(StringBuilder sb) @@ -287,7 +291,7 @@ internal void GetCounters(ConnectionCounters counters) counters.SocketCount = Interlocked.Read(ref socketCount); counters.WriterCount = Interlocked.CompareExchange(ref activeWriters, 0, 0); counters.NonPreferredEndpointCount = Interlocked.Read(ref nonPreferredEndpointCount); - physical?.GetCounters(counters); + threadSafePhysicalConnectionAccessor.GetCounters(counters); } internal readonly struct BridgeStatus @@ -354,7 +358,7 @@ public override string ToString() => BacklogMessagesPendingCounter = Volatile.Read(ref _backlogCurrentEnqueued), BacklogStatus = _backlogStatus, TotalBacklogMessagesQueued = _backlogTotalEnqueued, - Connection = physical?.GetStatus() ?? PhysicalConnection.ConnectionStatus.Default, + Connection = threadSafePhysicalConnectionAccessor.GetStatus(), }; internal string GetStormLog() @@ -362,7 +366,7 @@ internal string GetStormLog() var sb = new StringBuilder("Storm log for ").Append(Format.ToString(ServerEndPoint.EndPoint)).Append(" / ").Append(ConnectionType) .Append(" at ").Append(DateTime.UtcNow) .AppendLine().AppendLine(); - physical?.GetStormLog(sb); + threadSafePhysicalConnectionAccessor.GetStormLog(sb); sb.Append("Circular op-count snapshot:"); AppendProfile(sb); sb.AppendLine(); @@ -380,7 +384,7 @@ internal void IncrementOpCount() /// Whether to run even then the connection isn't idle. internal void KeepAlive(bool forceRun = false) { - if (!forceRun && !(physical?.IsIdle() ?? false)) return; // don't pile on if already doing something + if (!forceRun && !threadSafePhysicalConnectionAccessor.IsIdle()) return; // don't pile on if already doing something var commandMap = Multiplexer.CommandMap; Message? msg = null; @@ -410,8 +414,12 @@ internal void KeepAlive(bool forceRun = false) { msg.SetInternalCall(); Multiplexer.Trace("Enqueue: " + msg); - Multiplexer.OnInfoMessage($"heartbeat ({physical?.LastWriteSecondsAgo}s >= {ServerEndPoint.WriteEverySeconds}s, {physical?.GetSentAwaitingResponseCount()} waiting) '{msg.CommandAndKey}' on '{PhysicalName}' (v{features.Version})"); - physical?.UpdateLastWriteTime(); // preemptively + threadSafePhysicalConnectionAccessor.UsePhysicalWithLock(physical => + { + Multiplexer.OnInfoMessage($"heartbeat ({physical?.LastWriteSecondsAgo}s >= {ServerEndPoint.WriteEverySeconds}s, {physical?.GetSentAwaitingResponseCount()} waiting) '{msg.CommandAndKey}' on '{PhysicalName}' (v{features.Version})"); + physical?.UpdateLastWriteTime(); // preemptively + }); + #pragma warning disable CS0618 // Type or member is obsolete var result = TryWriteSync(msg, ServerEndPoint.IsReplica); #pragma warning restore CS0618 @@ -427,7 +435,7 @@ internal void KeepAlive(bool forceRun = false) internal async Task OnConnectedAsync(PhysicalConnection connection, ILogger? log) { Trace("OnConnected"); - if (physical == connection && !isDisposed && ChangeState(State.Connecting, State.ConnectedEstablishing)) + if (threadSafePhysicalConnectionAccessor.ConnectionMatch(connection) && !isDisposed && ChangeState(State.Connecting, State.ConnectedEstablishing)) { ConnectedAt ??= DateTime.UtcNow; await ServerEndPoint.OnEstablishingAsync(connection, log).ForAwait(); @@ -445,11 +453,14 @@ internal async Task OnConnectedAsync(PhysicalConnection connection, ILogger? log internal void ResetNonConnected() { - var tmp = physical; - if (tmp != null && state != (int)State.ConnectedEstablished) + threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => { - tmp.RecordConnectionFailed(ConnectionFailureType.UnableToConnect); - } + if (state != (int)State.ConnectedEstablished) + { + physical.RecordConnectionFailed(ConnectionFailureType.UnableToConnect); + } + }); + TryConnect(null); } @@ -485,11 +496,13 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti oldState = default(State); // only defined when isCurrent = true ConnectedAt = default; - if (isCurrent = physical == connection) + isCurrent = false; + + if (threadSafePhysicalConnectionAccessor.ClearPhysicalIfMatch(connection, out bool isNull)) { + isCurrent = true; Trace("Bridge noting disconnect from active connection" + (isDisposed ? " (disposed)" : "")); oldState = ChangeState(State.Disconnected); - physical = null; if (oldState == State.ConnectedEstablished && !ServerEndPoint.IsReplica) { // if the disconnected endpoint was a primary endpoint run info replication @@ -506,7 +519,7 @@ internal void OnDisconnected(ConnectionFailureType failureType, PhysicalConnecti TryConnect(null); // try to connect immediately } } - else if (physical == null) + else if (isNull) { Trace("Bridge noting disconnect (already terminated)"); } @@ -529,7 +542,7 @@ internal void OnFullyEstablished(PhysicalConnection connection, string source) { Trace("OnFullyEstablished"); connection.SetIdle(); - if (physical == connection && !isDisposed && ChangeState(State.ConnectedEstablishing, State.ConnectedEstablished)) + if (threadSafePhysicalConnectionAccessor.ConnectionMatch(connection) && !isDisposed && ChangeState(State.ConnectedEstablishing, State.ConnectedEstablished)) { reportNextFailure = reconfigureNextFailure = true; LastException = null; @@ -592,90 +605,24 @@ internal void OnHeartbeat(bool ifConnectedOnly) Multiplexer.Logger?.LogErrorConnectionIssue(ex, ex.Message); Trace("Aborting connect"); // abort and reconnect - var snapshot = physical; - OnDisconnected(ConnectionFailureType.UnableToConnect, snapshot, out bool isCurrent, out State oldState); - snapshot?.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely + threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(physical => + { + OnDisconnected(ConnectionFailureType.UnableToConnect, physical, out bool isCurrent, out State oldState); + physical.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely + }); TryConnect(null); } break; case (int)State.ConnectedEstablishing: - // (Fall through) Happens when we successfully connected via TCP, but no Redis handshake completion yet. - // This can happen brief (usual) or when the server never answers (rare). When we're in this state, - // a socket is open and reader likely listening indefinitely for incoming data on an async background task. - // We need to time that out and cleanup the PhysicalConnection if needed, otherwise that reader and socket will remain open - // for the lifetime of the application due to being orphaned, yet still referenced by the active task doing the pipe read. + // (Fall through) Happens when we successfully connected via TCP, but no Redis handshake completion yet. + // This can happen brief (usual) or when the server never answers (rare). When we're in this state, + // a socket is open and reader likely listening indefinitely for incoming data on an async background task. + // We need to time that out and cleanup the PhysicalConnection if needed, otherwise that reader and socket will remain open + // for the lifetime of the application due to being orphaned, yet still referenced by the active task doing the pipe read. case (int)State.ConnectedEstablished: // Track that we should reset the count on the next disconnect, but not do so in a loop shouldResetConnectionRetryCount = true; - var tmp = physical; - if (tmp != null) - { - if (state == (int)State.ConnectedEstablished) - { - Interlocked.Exchange(ref connectTimeoutRetryCount, 0); - tmp.BridgeCouldBeNull?.ServerEndPoint?.ClearUnselectable(UnselectableFlags.DidNotRespond); - } - int timedOutThisHeartbeat = tmp.OnBridgeHeartbeat(); - int writeEverySeconds = ServerEndPoint.WriteEverySeconds; - bool configCheckDue = ServerEndPoint.ConfigCheckSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= ServerEndPoint.ConfigCheckSeconds; - - if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive - && tmp.BridgeCouldBeNull?.Multiplexer.RawConfig.HeartbeatConsistencyChecks == true) - { - // If HeartbeatConsistencyChecks are enabled, we're sending a PING (expecting PONG) or ECHO (expecting UniqueID back) every single - // heartbeat as an opt-in measure to react to any network stream drop ASAP to terminate the connection as faulted. - // If we don't get the expected response to that command, then the connection is terminated. - // This is to prevent the case of things like 100% string command usage where a protocol error isn't otherwise encountered. - KeepAlive(forceRun: true); - - // If we're configured to check info replication, perform that too - if (configCheckDue) - { - ServerEndPoint.CheckInfoReplication(); - } - } - else if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive - && configCheckDue - && ServerEndPoint.CheckInfoReplication()) - { - // that serves as a keep-alive, if it is accepted - } - else if (writeEverySeconds > 0 && tmp.LastWriteSecondsAgo >= writeEverySeconds) - { - Trace("OnHeartbeat - overdue"); - if (state == (int)State.ConnectedEstablished) - { - KeepAlive(); - } - else - { - OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out bool ignore, out State oldState); - tmp.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely - } - } - else if (writeEverySeconds <= 0 - && tmp.IsIdle() - && tmp.LastWriteSecondsAgo > 2 - && tmp.GetSentAwaitingResponseCount() != 0) - { - // There's a chance this is a dead socket; sending data will shake that up a bit, - // so if we have an empty unsent queue and a non-empty sent queue, test the socket. - KeepAlive(); - } - - // This is an "always" check - we always want to evaluate a dead connection from a non-responsive sever regardless of the need to heartbeat above - if (timedOutThisHeartbeat > 0 - && tmp.LastReadSecondsAgo * 1_000 > (tmp.BridgeCouldBeNull?.Multiplexer.AsyncTimeoutMilliseconds * 4)) - { - // If we've received *NOTHING* on the pipe in 4 timeouts worth of time and we're timing out commands, issue a connection failure so that we reconnect - // This is meant to address the scenario we see often in Linux configs where TCP retries will happen for 15 minutes. - // To us as a client, we'll see the socket as green/open/fine when writing but we'll bet getting nothing back. - // Since we can't depend on the pipe to fail in that case, we want to error here based on the criteria above so we reconnect broken clients much faster. - tmp.BridgeCouldBeNull?.Multiplexer.Logger?.LogWarningDeadSocketDetected(tmp.LastReadSecondsAgo, timedOutThisHeartbeat); - OnDisconnected(ConnectionFailureType.SocketFailure, tmp, out _, out State oldState); - tmp.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely - } - } + threadSafePhysicalConnectionAccessor.WorkOnPhysicalWithLock(ManageHeartBeatPhysicalConnection()); break; case (int)State.Disconnected: // Only if we should reset the connection count @@ -712,6 +659,74 @@ internal void OnHeartbeat(bool ifConnectedOnly) } } + private Action ManageHeartBeatPhysicalConnection() => physical => + { + if (state == (int)State.ConnectedEstablished) + { + Interlocked.Exchange(ref connectTimeoutRetryCount, 0); + physical.BridgeCouldBeNull?.ServerEndPoint?.ClearUnselectable(UnselectableFlags.DidNotRespond); + } + int timedOutThisHeartbeat = physical.OnBridgeHeartbeat(); + int writeEverySeconds = ServerEndPoint.WriteEverySeconds; + bool configCheckDue = ServerEndPoint.ConfigCheckSeconds > 0 && ServerEndPoint.LastInfoReplicationCheckSecondsAgo >= ServerEndPoint.ConfigCheckSeconds; + + if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive + && physical.BridgeCouldBeNull?.Multiplexer.RawConfig.HeartbeatConsistencyChecks == true) + { + // If HeartbeatConsistencyChecks are enabled, we're sending a PING (expecting PONG) or ECHO (expecting UniqueID back) every single + // heartbeat as an opt-in measure to react to any network stream drop ASAP to terminate the connection as faulted. + // If we don't get the expected response to that command, then the connection is terminated. + // This is to prevent the case of things like 100% string command usage where a protocol error isn't otherwise encountered. + KeepAlive(forceRun: true); + + // If we're configured to check info replication, perform that too + if (configCheckDue) + { + ServerEndPoint.CheckInfoReplication(); + } + } + else if (state == (int)State.ConnectedEstablished && ConnectionType == ConnectionType.Interactive + && configCheckDue + && ServerEndPoint.CheckInfoReplication()) + { + // that serves as a keep-alive, if it is accepted + } + else if (writeEverySeconds > 0 && physical.LastWriteSecondsAgo >= writeEverySeconds) + { + Trace("OnHeartbeat - overdue"); + if (state == (int)State.ConnectedEstablished) + { + KeepAlive(); + } + else + { + OnDisconnected(ConnectionFailureType.SocketFailure, physical, out bool ignore, out State oldState); + physical.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely + } + } + else if (writeEverySeconds <= 0 + && physical.IsIdle() + && physical.LastWriteSecondsAgo > 2 + && physical.GetSentAwaitingResponseCount() != 0) + { + // There's a chance this is a dead socket; sending data will shake that up a bit, + // so if we have an empty unsent queue and a non-empty sent queue, test the socket. + KeepAlive(); + } + + // This is an "always" check - we always want to evaluate a dead connection from a non-responsive sever regardless of the need to heartbeat above + if (timedOutThisHeartbeat > 0 + && physical.LastReadSecondsAgo * 1_000 > (physical.BridgeCouldBeNull?.Multiplexer.AsyncTimeoutMilliseconds * 4)) + { + // If we've received *NOTHING* on the pipe in 4 timeouts worth of time and we're timing out commands, issue a connection failure so that we reconnect + // This is meant to address the scenario we see often in Linux configs where TCP retries will happen for 15 minutes. + // To us as a client, we'll see the socket as green/open/fine when writing but we'll bet getting nothing back. + // Since we can't depend on the pipe to fail in that case, we want to error here based on the criteria above so we reconnect broken clients much faster. + physical.BridgeCouldBeNull?.Multiplexer.Logger?.LogWarningDeadSocketDetected(physical.LastReadSecondsAgo, timedOutThisHeartbeat); + OnDisconnected(ConnectionFailureType.SocketFailure, physical, out _, out State oldState); + physical.Dispose(); // Cleanup the existing connection/socket if any, otherwise it will wait reading indefinitely + } + }; [Conditional("VERBOSE")] internal void Trace(string message) => Multiplexer.Trace(message, ToString()); @@ -732,19 +747,19 @@ internal bool TryEnqueue(List messages, bool isReplica) return false; } - var physical = this.physical; - if (physical == null) return false; - foreach (var message in messages) + return threadSafePhysicalConnectionAccessor.UsePhysicalWithLock(physical => { - // deliberately not taking a single lock here; we don't care if - // other threads manage to interleave - in fact, it would be desirable - // (to avoid a batch monopolising the connection) + foreach (var message in messages) + { + // deliberately not taking a single lock here; we don't care if + // other threads manage to interleave - in fact, it would be desirable + // (to avoid a batch monopolising the connection) #pragma warning disable CS0618 // Type or member is obsolete - WriteMessageTakingWriteLockSync(physical, message); + WriteMessageTakingWriteLockSync(physical, message); #pragma warning restore CS0618 - LogNonPreferred(message.Flags, isReplica); - } - return true; + LogNonPreferred(message.Flags, isReplica); + } + }); } private Message? _activeMessage; @@ -1147,7 +1162,7 @@ private void ProcessBridgeBacklog() // Only execute if we're connected. // Timeouts are handled above, so we're exclusively into backlog items eligible to write at this point. // If we can't write them, abort and wait for the next heartbeat or activation to try this again. - while (IsConnected && physical?.HasOutputPipe == true) + while (IsConnected && threadSafePhysicalConnectionAccessor.HasOutputPipe == true) { Message? message; _backlogStatus = BacklogStatus.CheckingForWork; @@ -1165,23 +1180,24 @@ private void ProcessBridgeBacklog() try { _backlogStatus = BacklogStatus.WritingMessage; - var result = WriteMessageInsideLock(physical, message); - - if (result == WriteResult.Success) + threadSafePhysicalConnectionAccessor.UsePhysicalWithLock(physical => { - _backlogStatus = BacklogStatus.Flushing; + var result = WriteMessageInsideLock(physical, message); + if (result == WriteResult.Success) + { + _backlogStatus = BacklogStatus.Flushing; #pragma warning disable CS0618 // Type or member is obsolete - result = physical.FlushSync(false, TimeoutMilliseconds); + result = physical.FlushSync(false, TimeoutMilliseconds); #pragma warning restore CS0618 // Type or member is obsolete - } - - _backlogStatus = BacklogStatus.MarkingInactive; - if (result != WriteResult.Success) - { - _backlogStatus = BacklogStatus.RecordingWriteFailure; - var ex = Multiplexer.GetException(result, message, ServerEndPoint); - HandleWriteException(message, ex); - } + } + _backlogStatus = BacklogStatus.MarkingInactive; + if (result != WriteResult.Success) + { + _backlogStatus = BacklogStatus.RecordingWriteFailure; + var ex = Multiplexer.GetException(result, message, ServerEndPoint); + HandleWriteException(message, ex); + } + }); } catch (Exception ex) { @@ -1194,7 +1210,7 @@ private void ProcessBridgeBacklog() } } _backlogStatus = BacklogStatus.SettingIdle; - physical?.SetIdle(); + threadSafePhysicalConnectionAccessor.SetIdle(); _backlogStatus = BacklogStatus.Inactive; } finally @@ -1219,7 +1235,7 @@ public bool HasPendingCallerFacingItems() if (!item.IsInternalCall) return true; } } - return physical?.HasPendingCallerFacingItems() ?? false; + return threadSafePhysicalConnectionAccessor.HasPendingCallerFacingItems; } private WriteResult TimedOutBeforeWrite(Message message) @@ -1396,22 +1412,22 @@ private async ValueTask CompleteWriteAndReleaseLockAsync( #if !NETCOREAPP using (lockToken) #endif - try - { - var result = await flush.ForAwait(); - physical?.SetIdle(); - return result; - } - catch (Exception ex) - { - return HandleWriteException(message, ex); - } - finally - { + try + { + var result = await flush.ForAwait(); + threadSafePhysicalConnectionAccessor.SetIdle(); + return result; + } + catch (Exception ex) + { + return HandleWriteException(message, ex); + } + finally + { #if NETCOREAPP _singleWriterMutex.Release(); #endif - } + } } private WriteResult HandleWriteException(Message message, Exception ex) @@ -1445,7 +1461,7 @@ private bool ChangeState(State oldState, State newState) return result; } - public PhysicalConnection? TryConnect(ILogger? log) + public void TryConnect(ILogger? log) { if (state == (int)State.Disconnected) { @@ -1461,12 +1477,9 @@ private bool ChangeState(State oldState, State newState) Interlocked.Exchange(ref connectStartTicks, Environment.TickCount); // separate creation and connection for case when connection completes synchronously // in that case PhysicalConnection will call back to PhysicalBridge, and most PhysicalBridge methods assume that physical is not null; - physical = new PhysicalConnection(this); - - physical.BeginConnectAsync(log).RedisFireAndForget(); + threadSafePhysicalConnectionAccessor.CreateConnection(this, log); } } - return null; } catch (Exception ex) { @@ -1477,7 +1490,6 @@ private bool ChangeState(State oldState, State newState) throw; } } - return physical; } private void LogNonPreferred(CommandFlags flags, bool isReplica) @@ -1685,7 +1697,7 @@ internal void SimulateConnectionFailure(SimulatedFailureType failureType) { throw ExceptionFactory.AdminModeNotEnabled(Multiplexer.RawConfig.IncludeDetailInExceptions, RedisCommand.DEBUG, null, ServerEndPoint); // close enough } - physical?.SimulateConnectionFailure(failureType); + threadSafePhysicalConnectionAccessor.SimulateConnectionFailure(failureType); } internal RedisCommand? GetActiveMessage() => Volatile.Read(ref _activeMessage)?.Command; diff --git a/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs b/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs new file mode 100644 index 000000000..9cbab9e33 --- /dev/null +++ b/src/StackExchange.Redis/ThreadSafePhysicalConnectionAccessor.cs @@ -0,0 +1,281 @@ +using System; +using System.Text; +using System.Threading; +using Microsoft.Extensions.Logging; + +namespace StackExchange.Redis +{ + internal sealed class ThreadSafePhysicalConnectionAccessor + { + private PhysicalConnection? physical; + private readonly ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); + internal ThreadSafePhysicalConnectionAccessor() + { + } + + internal bool HasOutputPipe + { + get + { + try + { + rwLock.EnterReadLock(); + return physical?.HasOutputPipe ?? false; + } + finally + { + rwLock.ExitReadLock(); + } + } + } + + internal bool HasPendingCallerFacingItems + { + get + { + try + { + rwLock.EnterReadLock(); + return physical?.HasPendingCallerFacingItems() ?? false; + } + finally + { + rwLock.ExitReadLock(); + } + } + } + + internal void CreateConnection(PhysicalBridge bridge, ILogger? log) + { + try + { + rwLock.EnterWriteLock(); + physical = new PhysicalConnection(bridge); + physical.BeginConnectAsync(log).RedisFireAndForget(); + } + finally + { + rwLock.ExitWriteLock(); + } + } + + internal long? GetConnectionId() + { + try + { + rwLock.EnterReadLock(); + return physical?.ConnectionId; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal string? GetPhysicalName() + { + try + { + rwLock.EnterReadLock(); + return physical?.ToString(); + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal long GetSubscriptionCount() + { + try + { + rwLock.EnterReadLock(); + return physical?.SubscriptionCount ?? 0; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal void DisposePhysicalConnection() + { + try + { + rwLock.EnterWriteLock(); + if (physical != null) + { + physical.Dispose(); + physical = null; + } + } + finally + { + rwLock.ExitWriteLock(); + } + } + + internal void Shutdown() + { + try + { + rwLock.EnterWriteLock(); + try + { + physical?.Shutdown(); + } + catch { } + } + finally + { + rwLock.ExitWriteLock(); + } + } + + internal bool UsePhysicalWithLock(Action callBack) + { + try + { + rwLock.EnterReadLock(); + if (physical != null) + { + callBack(physical); + return true; + } + return false; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal bool WorkOnPhysicalWithLock(Action callBack) + { + try + { + rwLock.EnterWriteLock(); + if (physical != null) + { + callBack(physical); + return true; + } + return false; + } + finally + { + rwLock.ExitWriteLock(); + } + } + + internal void SimulateConnectionFailure(SimulatedFailureType failureType) + { + try + { + rwLock.EnterReadLock(); + physical?.SimulateConnectionFailure(failureType); + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal void SetIdle() + { + try + { + rwLock.EnterReadLock(); + physical?.SetIdle(); + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal void GetCounters(ConnectionCounters counters) + { + try + { + rwLock.EnterReadLock(); + physical?.GetCounters(counters); + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal PhysicalConnection.ConnectionStatus GetStatus() + { + try + { + rwLock.EnterReadLock(); + return physical?.GetStatus() ?? PhysicalConnection.ConnectionStatus.Default; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal void GetStormLog(StringBuilder sb) + { + try + { + rwLock.EnterReadLock(); + physical?.GetStormLog(sb); + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal bool IsIdle() + { + try + { + rwLock.EnterReadLock(); + return physical?.IsIdle() ?? false; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal bool ConnectionMatch(PhysicalConnection connection) + { + try + { + rwLock.EnterReadLock(); + return physical == connection; + } + finally + { + rwLock.ExitReadLock(); + } + } + + internal bool ClearPhysicalIfMatch(PhysicalConnection? connection, out bool isNull) + { + try + { + rwLock.EnterWriteLock(); + isNull = physical == null; + if (physical == connection) + { + physical = null; + return true; + } + return false; + } + finally + { + rwLock.ExitWriteLock(); + } + } + } +}