Skip to content

Commit 8e36473

Browse files
authored
Merge pull request #627 from imperugo/fix/pool-resilience-and-pubsub-handler
Fix pool resilience and PubSub silent exception swallowing
2 parents e2b05d4 + a6ff1d3 commit 8e36473

File tree

5 files changed

+141
-5
lines changed

5 files changed

+141
-5
lines changed

src/core/StackExchange.Redis.Extensions.Core/Implementations/RedisClient.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
// Copyright (c) Ugo Lattanzi. All Rights Reserved. Licensed under the MIT license. See License.txt in the project root for license information.
22

3+
using Microsoft.Extensions.Logging;
4+
35
using StackExchange.Redis.Extensions.Core.Abstractions;
46
using StackExchange.Redis.Extensions.Core.Configuration;
57

@@ -9,6 +11,7 @@ namespace StackExchange.Redis.Extensions.Core.Implementations;
911
public class RedisClient : IRedisClient
1012
{
1113
private readonly RedisConfiguration redisConfiguration;
14+
private readonly ILogger<RedisDatabase>? databaseLogger;
1215

1316
/// <summary>
1417
/// Initializes a new instance of the <see cref="RedisClient"/> class.
@@ -24,6 +27,7 @@ public RedisClient(
2427
ConnectionPoolManager = connectionPoolManager;
2528
Serializer = serializer;
2629
this.redisConfiguration = redisConfiguration;
30+
databaseLogger = redisConfiguration.LoggerFactory?.CreateLogger<RedisDatabase>();
2731
}
2832

2933
/// <inheritdoc/>
@@ -92,7 +96,8 @@ public IRedisDatabase GetDb(int dbNumber, string? keyPrefix = null)
9296
redisConfiguration.ServerEnumerationStrategy,
9397
dbNumber,
9498
redisConfiguration.MaxValueLength,
95-
keyPrefix);
99+
keyPrefix,
100+
databaseLogger);
96101
}
97102

98103
/// <inheritdoc/>

src/core/StackExchange.Redis.Extensions.Core/Implementations/RedisConnectionPoolManager.cs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,16 +109,45 @@ var nextIdx
109109
= new Random().Next(0, redisConfiguration.PoolSize);
110110
#endif
111111
connection = connections[nextIdx];
112+
113+
if (!connection.IsConnected())
114+
{
115+
// Selected connection is disconnected, try to find a connected one
116+
for (var i = 0; i < connections.Length; i++)
117+
{
118+
if (connections[i].IsConnected())
119+
{
120+
connection = connections[i];
121+
break;
122+
}
123+
}
124+
}
125+
112126
break;
113127

114128
case ConnectionSelectionStrategy.LeastLoaded:
115-
connection = connections.MinBy(x => x.TotalOutstanding());
129+
// Prefer connected connections; fall back to any if all are disconnected
130+
IStateAwareConnection? candidate = null;
131+
132+
for (var i = 0; i < connections.Length; i++)
133+
{
134+
if (!connections[i].IsConnected())
135+
continue;
136+
137+
if (candidate == null || connections[i].TotalOutstanding() < candidate.TotalOutstanding())
138+
candidate = connections[i];
139+
}
140+
141+
connection = candidate ?? connections.MinBy(x => x.TotalOutstanding());
116142
break;
117143

118144
default:
119145
throw new InvalidEnumArgumentException(nameof(redisConfiguration.ConnectionSelectionStrategy), (int)redisConfiguration.ConnectionSelectionStrategy, typeof(ConnectionSelectionStrategy));
120146
}
121147

148+
if (!connection.IsConnected() && logger.IsEnabled(LogLevel.Warning))
149+
logger.LogWarning("All Redis connections are disconnected. Using connection {HashCode} in degraded mode.", connection.Connection.GetHashCode().ToString(CultureInfo.InvariantCulture));
150+
122151
if (logger.IsEnabled(LogLevel.Debug))
123152
logger.LogDebug("Using connection {HashCode} with {OutStanding} outstanding!", connection.Connection.GetHashCode().ToString(CultureInfo.InvariantCulture), connection.TotalOutstanding().ToString(CultureInfo.InvariantCulture));
124153

src/core/StackExchange.Redis.Extensions.Core/Implementations/RedisDatabase.PubSub.cs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22

33
using System;
44
using System.Collections.Generic;
5+
using System.Threading;
56
using System.Threading.Tasks;
67

8+
using Microsoft.Extensions.Logging;
9+
710
using StackExchange.Redis.Extensions.Core.Helpers;
811

912
namespace StackExchange.Redis.Extensions.Core.Implementations;
@@ -31,8 +34,20 @@ public Task SubscribeAsync<T>(RedisChannel channel, Func<T?, Task> handler, Comm
3134

3235
return sub.SubscribeAsync(channel, Handler, flag);
3336

34-
void Handler(RedisChannel redisChannel, RedisValue value) =>
35-
_ = handler(Serializer.Deserialize<T>(value)).ConfigureAwait(false);
37+
void Handler(RedisChannel redisChannel, RedisValue value)
38+
{
39+
var task = Task.Run(async () =>
40+
{
41+
var deserialized = Serializer.Deserialize<T>(value);
42+
await handler(deserialized).ConfigureAwait(false);
43+
});
44+
45+
task.ContinueWith(
46+
t => logger.LogError(t.Exception, "Error processing subscription message on channel {Channel}", (string?)redisChannel),
47+
CancellationToken.None,
48+
TaskContinuationOptions.OnlyOnFaulted,
49+
TaskScheduler.Default);
50+
}
3651
}
3752

3853
/// <inheritdoc/>

src/core/StackExchange.Redis.Extensions.Core/Implementations/RedisDatabase.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
using System.Runtime.InteropServices;
99
using System.Threading.Tasks;
1010

11+
using Microsoft.Extensions.Logging;
12+
using Microsoft.Extensions.Logging.Abstractions;
13+
1114
using StackExchange.Redis.Extensions.Core.Abstractions;
1215
using StackExchange.Redis.Extensions.Core.Configuration;
1316
using StackExchange.Redis.Extensions.Core.Extensions;
@@ -26,6 +29,7 @@ public partial class RedisDatabase : IRedisDatabase
2629
private readonly string keyPrefix;
2730
private readonly uint maxValueLength;
2831
private readonly int dbNumber;
32+
private readonly ILogger logger;
2933

3034
/// <summary>
3135
/// Initializes a new instance of the <see cref="RedisDatabase"/> class.
@@ -36,20 +40,23 @@ public partial class RedisDatabase : IRedisDatabase
3640
/// <param name="dbNumber">The database to use.</param>
3741
/// <param name="maxvalueLength">The max lenght of the cache object.</param>
3842
/// <param name="keyPrefix">The key prefix.</param>
43+
/// <param name="logger">The logger.</param>
3944
public RedisDatabase(
4045
IRedisConnectionPoolManager connectionPoolManager,
4146
ISerializer serializer,
4247
ServerEnumerationStrategy serverEnumerationStrategy,
4348
int dbNumber,
4449
uint maxvalueLength,
45-
string? keyPrefix = null)
50+
string? keyPrefix = null,
51+
ILogger<RedisDatabase>? logger = null)
4652
{
4753
Serializer = serializer ?? throw new ArgumentNullException(nameof(serializer));
4854
this.connectionPoolManager = connectionPoolManager ?? throw new ArgumentNullException(nameof(connectionPoolManager));
4955
this.serverEnumerationStrategy = serverEnumerationStrategy;
5056
this.dbNumber = dbNumber;
5157
this.keyPrefix = keyPrefix ?? string.Empty;
5258
maxValueLength = maxvalueLength;
59+
this.logger = logger ?? (ILogger)NullLogger<RedisDatabase>.Instance;
5360
}
5461

5562
/// <inheritdoc/>
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Copyright (c) Ugo Lattanzi. All Rights Reserved. Licensed under the MIT license. See License.txt in the project root for license information.
2+
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
using Xunit;
8+
9+
namespace StackExchange.Redis.Extensions.Core.Tests;
10+
11+
public abstract partial class CacheClientTestBase
12+
{
13+
[Fact]
14+
public async Task SubscribeAsync_ValidMessage_ShouldInvokeHandler_Async()
15+
{
16+
var channel = new RedisChannel(Guid.NewGuid().ToString(), RedisChannel.PatternMode.Literal);
17+
var received = new TaskCompletionSource<string?>();
18+
19+
await Sut.GetDefaultDatabase().SubscribeAsync<string>(channel, msg =>
20+
{
21+
received.TrySetResult(msg);
22+
return Task.CompletedTask;
23+
});
24+
25+
await Sut.GetDefaultDatabase().PublishAsync(channel, "hello");
26+
27+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
28+
cts.Token.Register(() => received.TrySetCanceled());
29+
30+
var result = await received.Task;
31+
32+
Assert.Equal("hello", result);
33+
34+
await Sut.GetDefaultDatabase().UnsubscribeAllAsync();
35+
}
36+
37+
[Fact]
38+
public async Task SubscribeAsync_HandlerThrows_ShouldNotCrashProcess_Async()
39+
{
40+
var channel = new RedisChannel(Guid.NewGuid().ToString(), RedisChannel.PatternMode.Literal);
41+
var secondMessageReceived = new TaskCompletionSource<bool>();
42+
var firstHandlerCompleted = new TaskCompletionSource<bool>();
43+
var callCount = 0;
44+
45+
await Sut.GetDefaultDatabase().SubscribeAsync<string>(channel, _ =>
46+
{
47+
var count = Interlocked.Increment(ref callCount);
48+
49+
if (count == 1)
50+
{
51+
firstHandlerCompleted.TrySetResult(true);
52+
throw new InvalidOperationException("Simulated handler failure");
53+
}
54+
55+
secondMessageReceived.TrySetResult(true);
56+
return Task.CompletedTask;
57+
});
58+
59+
// First message — handler throws, should be logged not crash
60+
await Sut.GetDefaultDatabase().PublishAsync(channel, "msg1");
61+
62+
// Wait for first handler to complete before sending second message
63+
using var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(5));
64+
cts1.Token.Register(() => firstHandlerCompleted.TrySetCanceled());
65+
await firstHandlerCompleted.Task;
66+
67+
// Second message — handler should still be active
68+
await Sut.GetDefaultDatabase().PublishAsync(channel, "msg2");
69+
70+
using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(5));
71+
cts2.Token.Register(() => secondMessageReceived.TrySetCanceled());
72+
73+
var result = await secondMessageReceived.Task;
74+
75+
Assert.True(result, "Second message should have been received even after first handler threw");
76+
Assert.True(callCount >= 2);
77+
78+
await Sut.GetDefaultDatabase().UnsubscribeAllAsync();
79+
}
80+
}

0 commit comments

Comments
 (0)