Skip to content

Commit 129d59f

Browse files
NickCravermgravell
andauthored
Timeouts: Improve Backlog handling and errors for users + GC rooting fixes for outstanding scenarios (#2408)
This combination PR is both fixing a GC issue (see below, and #2413 for details) and improves timeout exception. Basically if a timeout happens for a message that was in the backlog but was never sent, the user now gets a much more informative message like this: > Exception: The message timed out in the backlog attempting to send because no connection became available - Last Connection Exception: InternalFailure on 127.0.0.1:6379/Interactive, Initializing/NotStarted, last: GET, origin: ConnectedAsync, outstanding: 0, last-read: 0s ago, last-write: 0s ago, keep-alive: 500s, state: Connecting, mgr: 10 of 10 available, last-heartbeat: never, last-mbeat: 0s ago, global: 0s ago, v: 2.6.99.22667, command=PING, inst: 0, qu: 0, qs: 0, aw: False, bw: CheckingForTimeout, last-in: 0, cur-in: 0, sync-ops: 1, async-ops: 1, serverEndpoint: 127.0.0.1:6379, conn-sec: n/a, aoc: 0, mc: 1/1/0, mgr: 10 of 10 available, clientName: NAMISTOU-3(SE.Redis-v2.6.99.22667), IOCP: (Busy=0,Free=1000,Min=32,Max=1000), WORKER: (Busy=2,Free=32765,Min=32,Max=32767), POOL: (Threads=18,QueuedItems=0,CompletedItems=65), v: 2.6.99.22667 (Please take a look at this article for some common client-side issues that can cause timeouts: https://stackexchange.github.io/StackExchange.Redis/Timeouts) Today, this isn't intuitive especially for connections with `AbortOnConnectFail` set to `false`. What happens is a multiplexer _never_ connects successfully, but the user just gets generic timeouts. This makes the error more specific and includes the inner exception (also as `.InnerException`) for more details, informing the user of a config/auth/whatever error underneath as to why things are never successfully sending. Also adds `aoc: (0|1)` to the exception message for easier advice in issues (reflecting what `AbortOnConnectFail` is set to). Co-authored-by: Nick Craver <[email protected]> Co-authored-by: Marc Gravell <[email protected]>
1 parent f690d16 commit 129d59f

15 files changed

+414
-45
lines changed

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
<PackageVersion Include="Nerdbank.GitVersioning" Version="3.4.255" />
2020
<PackageVersion Include="Newtonsoft.Json" Version="13.0.1" />
2121
<PackageVersion Include="NSubstitute" Version="4.3.0" />
22-
<PackageVersion Include="StackExchange.Redis" Version="2.5.43" />
22+
<PackageVersion Include="StackExchange.Redis" Version="2.6.96" />
2323
<!-- For binding redirect testing, main package gets this transitively -->
2424
<PackageVersion Include="System.IO.Pipelines" Version="5.0.1" />
2525
<PackageVersion Include="System.Runtime.Caching" Version="5.0.0" />

docs/ReleaseNotes.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ Current package versions:
88

99
## Unreleased
1010

11+
- Fix [#2412](https://github.com/StackExchange/StackExchange.Redis/issues/2412): Critical (but rare) GC bug that can lead to async tasks never completing if the multiplexer is not held by the consumer ([#2408 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2408))
12+
- Add: Better error messages (over generic timeout) when commands are backlogged and unable to write to any connection ([#2408 by NickCraver](https://github.com/StackExchange/StackExchange.Redis/pull/2408))
1113
- Fix [#2392](https://github.com/StackExchange/StackExchange.Redis/issues/2392): Dequeue *all* timed out messages from the backlog when not connected (including Fire+Forget) ([#2397 by kornelpal](https://github.com/StackExchange/StackExchange.Redis/pull/2397))
1214
- Fix [#2400](https://github.com/StackExchange/StackExchange.Redis/issues/2400): Expose `ChannelMessageQueue` as `IAsyncEnumerable<ChannelMessage>` ([#2402 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2402))
13-
- Add: support for `CLIENT SETINFO` (lib name/version) during handshake; opt-out is via `ConfigurationOptions`; also support read of `resp`, `lib-ver` and `lib-name` via `CLIENT LIST` ([#2414 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2414))
15+
- Add: Support for `CLIENT SETINFO` (lib name/version) during handshake; opt-out is via `ConfigurationOptions`; also support read of `resp`, `lib-ver` and `lib-name` via `CLIENT LIST` ([#2414 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2414))
1416
- Documentation: clarify the meaning of `RedisValue.IsInteger` re [#2418](https://github.com/StackExchange/StackExchange.Redis/issues/2418) ([#2420 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2420))
1517

1618
## 2.6.96

src/Directory.Build.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
<PropertyGroup>
44
<GenerateDocumentationFile>true</GenerateDocumentationFile>
55
<IsPackable>true</IsPackable>
6+
<CheckEolTargetFramework>false</CheckEolTargetFramework>
67
</PropertyGroup>
78
<ItemGroup>
89
<PackageReference Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" PrivateAssets="all" />

src/StackExchange.Redis/ConnectionMultiplexer.cs

Lines changed: 123 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public sealed partial class ConnectionMultiplexer : IInternalConnectionMultiplex
3434
internal long syncOps, asyncOps;
3535
private long syncTimeouts, fireAndForgets, asyncTimeouts;
3636
private string? failureMessage, activeConfigCause;
37-
private IDisposable? pulse;
37+
private TimerToken? pulse;
3838

3939
private readonly Hashtable servers = new Hashtable();
4040
private volatile ServerSnapshot _serverSnapshot = ServerSnapshot.Empty;
@@ -874,7 +874,7 @@ public ServerSnapshotFiltered(ServerEndPoint[] endpoints, int count, Func<Server
874874
}
875875
}
876876

877-
[return: NotNullIfNotNull("endpoint")]
877+
[return: NotNullIfNotNull(nameof(endpoint))]
878878
internal ServerEndPoint? GetServerEndPoint(EndPoint? endpoint, LogProxy? log = null, bool activate = true)
879879
{
880880
if (endpoint == null) return null;
@@ -909,40 +909,130 @@ public ServerSnapshotFiltered(ServerEndPoint[] endpoints, int count, Func<Server
909909
return server;
910910
}
911911

912-
private sealed class TimerToken
912+
internal void Root() => pulse?.Root(this);
913+
914+
// note that this also acts (conditionally) as the GC root for the multiplexer
915+
// when there are in-flight messages; the timer can then acts as the heartbeat
916+
// to make sure that everything *eventually* completes
917+
private sealed class TimerToken : IDisposable
913918
{
914919
private TimerToken(ConnectionMultiplexer muxer)
915920
{
916-
_ref = new WeakReference(muxer);
921+
_weakRef = new(muxer);
917922
}
918923
private Timer? _timer;
919924
public void SetTimer(Timer timer) => _timer = timer;
920-
private readonly WeakReference _ref;
925+
926+
private readonly WeakReference<ConnectionMultiplexer> _weakRef;
927+
928+
private object StrongRefSyncLock => _weakRef; // private and readonly? it'll do
929+
private ConnectionMultiplexer? _strongRef;
930+
private int _strongRefToken;
921931

922932
private static readonly TimerCallback Heartbeat = state =>
923933
{
924934
var token = (TimerToken)state!;
925-
var muxer = (ConnectionMultiplexer?)(token._ref?.Target);
926-
if (muxer != null)
935+
if (token._weakRef.TryGetTarget(out var muxer))
927936
{
928937
muxer.OnHeartbeat();
929938
}
930939
else
931940
{
932941
// the muxer got disposed from out of us; kill the timer
933-
var tmp = token._timer;
934-
token._timer = null;
935-
if (tmp != null) try { tmp.Dispose(); } catch { }
942+
token.Dispose();
936943
}
937944
};
938945

939-
internal static IDisposable Create(ConnectionMultiplexer connection)
946+
internal static TimerToken Create(ConnectionMultiplexer connection)
940947
{
941948
var token = new TimerToken(connection);
942949
var heartbeatMilliseconds = (int)connection.RawConfig.HeartbeatInterval.TotalMilliseconds;
943950
var timer = new Timer(Heartbeat, token, heartbeatMilliseconds, heartbeatMilliseconds);
944951
token.SetTimer(timer);
945-
return timer;
952+
return token;
953+
}
954+
955+
public void Dispose()
956+
{
957+
var tmp = _timer;
958+
_timer = null;
959+
if (tmp is not null) try { tmp.Dispose(); } catch { }
960+
961+
_strongRef = null; // note that this shouldn't be relevant since we've unrooted the TimerToken
962+
}
963+
964+
965+
// explanation of rooting model:
966+
//
967+
// the timer has a reference to the TimerToken; this *always* has a weak-ref,
968+
// and *may* sometimes have a strong-ref; this is so that if a consumer
969+
// drops a multiplexer, it can be garbage collected, i.e. the heartbeat timer
970+
// doesn't keep the entire thing alive forever; instead, if the heartbeat detects
971+
// the weak-ref has been collected, it can cancel the timer and *itself* go away;
972+
// however: this leaves a problem where there is *in flight work* when the consumer
973+
// drops the multiplexer; in particular, if that happens when disconnected, there
974+
// could be consumer-visible pending TCS items *in the backlog queue*; we don't want
975+
// to leave those incomplete, as that fails the contractual expectations of async/await;
976+
// instead we need to root ourselves. The natural place to do this is by rooting the
977+
// multiplexer, allowing the heartbeat to keep poking things, so that the usual
978+
// message-processing and timeout rules apply. This is why we *sometimes* also keep
979+
// a strong-ref to the same multiplexer.
980+
//
981+
// The TimerToken is rooted by the timer callback; this then roots the multiplexer,
982+
// which keeps our bridges and connections in scope - until we're sure we're done
983+
// with them.
984+
//
985+
// 1) any bridge or connection will trigger rooting by calling Root when
986+
// they change from "empty" to "non-empty" i.e. whenever there
987+
// in-flight items; this always changes the token; this includes both the
988+
// backlog and awaiting-reply queues.
989+
//
990+
// 2) the heartbeat is responsible for unrooting, after processing timeouts
991+
// etc; first it checks whether it is needed (IsRooted), which also gives
992+
// it the current token.
993+
//
994+
// 3) if so, the heartbeat will (outside of the lock) query all sources to
995+
// see if they still have outstanding work; if everyone reports negatively,
996+
// then the heartbeat calls UnRoot passing in the old token; if this still
997+
// matches (i.e. no new work came in while we were looking away), then the
998+
// strong reference is removed; note that "has outstanding work" ignores
999+
// internal-call messages; we are only interested in consumer-facing items
1000+
// (but we need to check this *here* rather than when adding, as otherwise
1001+
// the definition of "is empty, should root" becomes more complicated, which
1002+
// impacts the write path, rather than the heartbeat path.
1003+
//
1004+
// This means that the multiplexer (via the timer) lasts as long as there are
1005+
// outstanding messages; if the consumer has dropped the multiplexer, then
1006+
// there will be no new incoming messages, and after timeouts: everything
1007+
// should drop.
1008+
1009+
public void Root(ConnectionMultiplexer multiplexer)
1010+
{
1011+
lock (StrongRefSyncLock)
1012+
{
1013+
_strongRef = multiplexer;
1014+
_strongRefToken++;
1015+
}
1016+
}
1017+
1018+
public bool IsRooted(out int token)
1019+
{
1020+
lock (StrongRefSyncLock)
1021+
{
1022+
token = _strongRefToken;
1023+
return _strongRef is not null;
1024+
}
1025+
}
1026+
1027+
public void UnRoot(int token)
1028+
{
1029+
lock (StrongRefSyncLock)
1030+
{
1031+
if (token == _strongRefToken)
1032+
{
1033+
_strongRef = null;
1034+
}
1035+
}
9461036
}
9471037
}
9481038

@@ -956,8 +1046,21 @@ private void OnHeartbeat()
9561046
Trace("heartbeat");
9571047

9581048
var tmp = GetServerSnapshot();
1049+
int token = 0;
1050+
bool isRooted = pulse?.IsRooted(out token) ?? false, hasPendingCallerFacingItems = false;
1051+
9591052
for (int i = 0; i < tmp.Length; i++)
1053+
{
9601054
tmp[i].OnHeartbeat();
1055+
if (isRooted && !hasPendingCallerFacingItems)
1056+
{
1057+
hasPendingCallerFacingItems = tmp[i].HasPendingCallerFacingItems();
1058+
}
1059+
}
1060+
if (isRooted && !hasPendingCallerFacingItems)
1061+
{ // release the GC root on the heartbeat *if* the token still matches
1062+
pulse?.UnRoot(token);
1063+
}
9611064
}
9621065
catch (Exception ex)
9631066
{
@@ -1909,11 +2012,11 @@ private WriteResult TryPushMessageToBridgeSync<T>(Message message, ResultProcess
19092012
/// </summary>
19102013
public override string ToString() => string.IsNullOrWhiteSpace(ClientName) ? GetType().Name : ClientName;
19112014

1912-
internal Exception GetException(WriteResult result, Message message, ServerEndPoint? server) => result switch
2015+
internal Exception GetException(WriteResult result, Message message, ServerEndPoint? server, PhysicalBridge? bridge = null) => result switch
19132016
{
19142017
WriteResult.Success => throw new ArgumentOutOfRangeException(nameof(result), "Be sure to check result isn't successful before calling GetException."),
19152018
WriteResult.NoConnectionAvailable => ExceptionFactory.NoConnectionAvailable(this, message, server),
1916-
WriteResult.TimeoutBeforeWrite => ExceptionFactory.Timeout(this, "The timeout was reached before the message could be written to the output buffer, and it was not sent", message, server, result),
2019+
WriteResult.TimeoutBeforeWrite => ExceptionFactory.Timeout(this, null, message, server, result, bridge),
19172020
_ => ExceptionFactory.ConnectionFailure(RawConfig.IncludeDetailInExceptions, ConnectionFailureType.ProtocolFailure, "An unknown error occurred when writing the message", server),
19182021
};
19192022

@@ -1935,7 +2038,7 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T>? source, Exception u
19352038
}
19362039
}
19372040

1938-
[return: NotNullIfNotNull("defaultValue")]
2041+
[return: NotNullIfNotNull(nameof(defaultValue))]
19392042
internal T? ExecuteSyncImpl<T>(Message message, ResultProcessor<T>? processor, ServerEndPoint? server, T? defaultValue = default)
19402043
{
19412044
if (_isDisposed) throw new ObjectDisposedException(ToString());
@@ -1960,10 +2063,11 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T>? source, Exception u
19602063
var source = SimpleResultBox<T>.Get();
19612064

19622065
bool timeout = false;
2066+
WriteResult result;
19632067
lock (source)
19642068
{
19652069
#pragma warning disable CS0618 // Type or member is obsolete
1966-
var result = TryPushMessageToBridgeSync(message, processor, source, ref server);
2070+
result = TryPushMessageToBridgeSync(message, processor, source, ref server);
19672071
#pragma warning restore CS0618
19682072
if (result != WriteResult.Success)
19692073
{
@@ -1985,7 +2089,8 @@ internal static void ThrowFailed<T>(TaskCompletionSource<T>? source, Exception u
19852089
{
19862090
Interlocked.Increment(ref syncTimeouts);
19872091
// Very important not to return "source" to the pool here
1988-
throw ExceptionFactory.Timeout(this, null, message, server);
2092+
// Also note we return "success" when queueing a messages to the backlog, so we need to manually fake it back here when timing out in the backlog
2093+
throw ExceptionFactory.Timeout(this, null, message, server, message.IsBacklogged ? WriteResult.TimeoutBeforeWrite : result, server?.GetBridge(message.Command, create: false));
19892094
}
19902095
// Snapshot these so that we can recycle the box
19912096
var val = source.GetResult(out var ex, canRecycle: true); // now that we aren't locking it...
@@ -2047,7 +2152,7 @@ static async Task<T> ExecuteAsyncImpl_Awaited(ConnectionMultiplexer @this, Value
20472152

20482153
internal Task<T?> ExecuteAsyncImpl<T>(Message? message, ResultProcessor<T>? processor, object? state, ServerEndPoint? server)
20492154
{
2050-
[return: NotNullIfNotNull("tcs")]
2155+
[return: NotNullIfNotNull(nameof(tcs))]
20512156
static async Task<T?> ExecuteAsyncImpl_Awaited(ConnectionMultiplexer @this, ValueTask<WriteResult> write, TaskCompletionSource<T?>? tcs, Message message, ServerEndPoint? server)
20522157
{
20532158
var result = await write.ForAwait();

src/StackExchange.Redis/Enums/CommandStatus.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,16 @@ public enum CommandStatus
1010
/// </summary>
1111
Unknown,
1212
/// <summary>
13-
/// ConnectionMultiplexer has not yet started writing this command to redis.
13+
/// ConnectionMultiplexer has not yet started writing this command to Redis.
1414
/// </summary>
1515
WaitingToBeSent,
1616
/// <summary>
1717
/// Command has been sent to Redis.
1818
/// </summary>
1919
Sent,
20+
/// <summary>
21+
/// Command is in the backlog, waiting to be processed and written to Redis.
22+
/// </summary>
23+
WaitingInBacklog,
2024
}
2125
}

src/StackExchange.Redis/ExceptionFactory.cs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,32 @@ private static void Add(List<Tuple<string, string>> data, StringBuilder sb, stri
212212
}
213213
}
214214

215-
internal static Exception Timeout(ConnectionMultiplexer multiplexer, string? baseErrorMessage, Message message, ServerEndPoint? server, WriteResult? result = null)
215+
internal static Exception Timeout(ConnectionMultiplexer multiplexer, string? baseErrorMessage, Message message, ServerEndPoint? server, WriteResult? result = null, PhysicalBridge? bridge = null)
216216
{
217217
List<Tuple<string, string>> data = new List<Tuple<string, string>> { Tuple.Create("Message", message.CommandAndKey) };
218218
var sb = new StringBuilder();
219+
220+
// We timeout writing messages in quite different ways sync/async - so centralize messaging here.
221+
if (string.IsNullOrEmpty(baseErrorMessage) && result == WriteResult.TimeoutBeforeWrite)
222+
{
223+
baseErrorMessage = message.IsBacklogged
224+
? "The message timed out in the backlog attempting to send because no connection became available"
225+
: "The timeout was reached before the message could be written to the output buffer, and it was not sent";
226+
}
227+
228+
var lastConnectionException = bridge?.LastException as RedisConnectionException;
229+
var logConnectionException = message.IsBacklogged && lastConnectionException is not null;
230+
219231
if (!string.IsNullOrEmpty(baseErrorMessage))
220232
{
221233
sb.Append(baseErrorMessage);
234+
235+
// If we're in the situation where we've never connected
236+
if (logConnectionException && lastConnectionException is not null)
237+
{
238+
sb.Append(" - Last Connection Exception: ").Append(lastConnectionException.Message);
239+
}
240+
222241
if (message != null)
223242
{
224243
sb.Append(", command=").Append(message.Command); // no key here, note
@@ -252,17 +271,23 @@ internal static Exception Timeout(ConnectionMultiplexer multiplexer, string? bas
252271
}
253272
catch { }
254273
}
255-
256274
AddCommonDetail(data, sb, message, multiplexer, server);
257275

258-
sb.Append(" (Please take a look at this article for some common client-side issues that can cause timeouts: ");
259-
sb.Append(TimeoutHelpLink);
260-
sb.Append(')');
276+
sb.Append(" (Please take a look at this article for some common client-side issues that can cause timeouts: ")
277+
.Append(TimeoutHelpLink)
278+
.Append(')');
261279

262-
var ex = new RedisTimeoutException(sb.ToString(), message?.Status ?? CommandStatus.Unknown)
263-
{
264-
HelpLink = TimeoutHelpLink
265-
};
280+
// If we're from a backlog timeout scenario, we log a more intuitive connection exception for the timeout...because the timeout was a symptom
281+
// and we have a more direct cause: we had no connection to send it on.
282+
Exception ex = logConnectionException && lastConnectionException is not null
283+
? new RedisConnectionException(lastConnectionException.FailureType, sb.ToString(), lastConnectionException, message?.Status ?? CommandStatus.Unknown)
284+
{
285+
HelpLink = TimeoutHelpLink
286+
}
287+
: new RedisTimeoutException(sb.ToString(), message?.Status ?? CommandStatus.Unknown)
288+
{
289+
HelpLink = TimeoutHelpLink
290+
};
266291
CopyDataToException(data, ex);
267292

268293
if (multiplexer.RawConfig.IncludeDetailInExceptions) AddExceptionDetail(ex, message, server, null);
@@ -333,6 +358,7 @@ private static void AddCommonDetail(
333358
}
334359
Add(data, sb, "Server-Endpoint", "serverEndpoint", (server.EndPoint.ToString() ?? "Unknown").Replace("Unspecified/", ""));
335360
Add(data, sb, "Server-Connected-Seconds", "conn-sec", bs.ConnectedAt is DateTime dt ? (DateTime.UtcNow - dt).TotalSeconds.ToString("0.##") : "n/a");
361+
Add(data, sb, "Abort-On-Connect", "aoc", multiplexer.RawConfig.AbortOnConnectFail ? "1" : "0");
336362
}
337363
Add(data, sb, "Multiplexer-Connects", "mc", $"{multiplexer._connectAttemptCount}/{multiplexer._connectCompletedCount}/{multiplexer._connectionCloseCount}");
338364
Add(data, sb, "Manager", "mgr", multiplexer.SocketManager?.GetState());

0 commit comments

Comments
 (0)