Skip to content

Commit ee4e676

Browse files
bugfix: connection multiplexer should not be scoped
repository factory is scoped in non-test-mode, so the connection multiplexer part needs to be factored out
1 parent f80afdb commit ee4e676

File tree

3 files changed

+49
-30
lines changed

3 files changed

+49
-30
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using EntityDb.Common.Disposables;
2+
using StackExchange.Redis;
3+
using System.Collections.Concurrent;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
7+
namespace EntityDb.Redis.ConnectionMultiplexers;
8+
9+
internal class ConnectionMultiplexerFactory : DisposableResourceBaseClass
10+
{
11+
private readonly SemaphoreSlim _connectionSemaphore = new(1);
12+
private readonly ConcurrentDictionary<string, IConnectionMultiplexer> _connectionMultiplexers = new();
13+
14+
public async Task<IConnectionMultiplexer> CreateConnectionMultiplexer(string connectionString, CancellationToken cancellationToken)
15+
{
16+
await _connectionSemaphore.WaitAsync(cancellationToken);
17+
18+
var connectionMultiplexer = _connectionMultiplexers.GetOrAdd(connectionString, _ =>
19+
{
20+
var configurationOptions = ConfigurationOptions.Parse(connectionString);
21+
22+
return ConnectionMultiplexer.Connect(configurationOptions);
23+
});
24+
25+
_connectionSemaphore.Release();
26+
27+
return connectionMultiplexer;
28+
}
29+
30+
public override async ValueTask DisposeAsync()
31+
{
32+
await _connectionSemaphore.WaitAsync();
33+
34+
foreach (var connectionMultiplexer in _connectionMultiplexers.Values)
35+
{
36+
connectionMultiplexer.Dispose();
37+
}
38+
39+
_connectionSemaphore.Release();
40+
}
41+
}

src/EntityDb.Redis/Extensions/ServiceCollectionExtensions.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using EntityDb.Abstractions.Snapshots;
22
using EntityDb.Common.Envelopes;
33
using EntityDb.Common.Extensions;
4+
using EntityDb.Redis.ConnectionMultiplexers;
45
using EntityDb.Redis.Envelopes;
56
using EntityDb.Redis.Snapshots;
67
using Microsoft.Extensions.Configuration;
@@ -35,6 +36,8 @@ public static void AddRedisSnapshots<TSnapshot>(this IServiceCollection serviceC
3536
Func<IConfiguration, string> getConnectionString, bool testMode = false)
3637
{
3738
serviceCollection.AddJsonElementEnvelopeService();
39+
40+
serviceCollection.AddSingleton<ConnectionMultiplexerFactory>();
3841

3942
serviceCollection.Add
4043
(

src/EntityDb.Redis/Snapshots/RedisSnapshotRepositoryFactory.cs

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using EntityDb.Common.Envelopes;
44
using EntityDb.Common.Snapshots;
55
using EntityDb.Common.TypeResolvers;
6+
using EntityDb.Redis.ConnectionMultiplexers;
67
using EntityDb.Redis.Sessions;
78
using Microsoft.Extensions.DependencyInjection;
89
using Microsoft.Extensions.Options;
@@ -17,17 +18,16 @@ namespace EntityDb.Redis.Snapshots;
1718
internal class RedisSnapshotRepositoryFactory<TSnapshot> : DisposableResourceBaseClass, ISnapshotRepositoryFactory<TSnapshot>
1819
{
1920
private readonly IServiceProvider _serviceProvider;
21+
private readonly ConnectionMultiplexerFactory _connectionMultiplexerFactory;
2022
private readonly IOptionsFactory<SnapshotSessionOptions> _optionsFactory;
2123
private readonly IEnvelopeService<JsonElement> _envelopeService;
2224
private readonly string _connectionString;
2325
private readonly string _keyNamespace;
24-
private readonly SemaphoreSlim _connectionSemaphore = new(1);
25-
26-
private IConnectionMultiplexer? _connectionMultiplexer;
2726

2827
public RedisSnapshotRepositoryFactory
2928
(
3029
IServiceProvider serviceProvider,
30+
ConnectionMultiplexerFactory connectionMultiplexerFactory,
3131
IOptionsFactory<SnapshotSessionOptions> optionsFactory,
3232
IEnvelopeService<JsonElement> envelopeService,
3333
ITypeResolver typeResolver,
@@ -36,35 +36,17 @@ string keyNamespace
3636
)
3737
{
3838
_serviceProvider = serviceProvider;
39+
_connectionMultiplexerFactory = connectionMultiplexerFactory;
3940
_optionsFactory = optionsFactory;
4041
_envelopeService = envelopeService;
4142
_connectionString = connectionString;
4243
_keyNamespace = keyNamespace;
4344
}
4445

45-
private async Task<IConnectionMultiplexer> OpenConnectionMultiplexer(CancellationToken cancellationToken)
46-
{
47-
await _connectionSemaphore.WaitAsync(cancellationToken);
48-
49-
if (_connectionMultiplexer != null)
50-
{
51-
_connectionSemaphore.Release();
52-
53-
return _connectionMultiplexer;
54-
}
55-
56-
var configurationOptions = ConfigurationOptions.Parse(_connectionString);
57-
58-
_connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync(configurationOptions).WaitAsync(cancellationToken);
59-
60-
_connectionSemaphore.Release();
61-
62-
return _connectionMultiplexer;
63-
}
6446

6547
private async Task<IRedisSession> CreateSession(SnapshotSessionOptions snapshotSessionOptions, CancellationToken cancellationToken)
6648
{
67-
var connectionMultiplexer = await OpenConnectionMultiplexer(cancellationToken);
49+
var connectionMultiplexer = await _connectionMultiplexerFactory.CreateConnectionMultiplexer(_connectionString, cancellationToken);
6850

6951
return RedisSession.Create(_serviceProvider, connectionMultiplexer.GetDatabase(), snapshotSessionOptions);
7052
}
@@ -95,11 +77,4 @@ public static RedisSnapshotRepositoryFactory<TSnapshot> Create(IServiceProvider
9577
keyNamespace
9678
);
9779
}
98-
99-
public override ValueTask DisposeAsync()
100-
{
101-
_connectionMultiplexer?.Dispose();
102-
103-
return ValueTask.CompletedTask;
104-
}
10580
}

0 commit comments

Comments
 (0)