Skip to content

Commit 4ac1c01

Browse files
KirillKurdyukovLiamHamsters
authored andcommitted
Feat ADO.NET: cache gRPC transport by gRPCConnectionString to reuse channels. (ydb-platform#520)
* Feat ADO.NET: cache gRPC transport by `gRPCConnectionString` to reuse channels. * refactoring * fix session leak in tests
1 parent a19f8bb commit 4ac1c01

File tree

13 files changed

+220
-196
lines changed

13 files changed

+220
-196
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
- Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`
1+
- Feat ADO.NET: cache gRPC transport by `gRPCConnectionString` to reuse channels.
2+
- Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`.
23
- Feat ADO.NET: Parameterized Decimal overflow check: `Precision` and `Scale`.
34
- Feat ADO.NET: Deleted support for `DateTimeOffset` was a mistake.
45
- Feat ADO.NET: Added support for `Date32`, `Datetime64`, `Timestamp64` and `Interval64` types in YDB.

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 12 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ namespace Ydb.Sdk.Ado;
66
internal static class PoolManager
77
{
88
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
9-
109
private static readonly ConcurrentDictionary<string, ISessionSource> Pools = new();
11-
private static readonly ConcurrentDictionary<string, ISessionSource> ImplicitPools = new();
1210

1311
internal static async Task<ISession> GetSession(
1412
YdbConnectionStringBuilder settings,
@@ -43,58 +41,29 @@ await PoolingSessionFactory.Create(settings), settings
4341
}
4442
}
4543

46-
internal static ISession GetImplicitSession(YdbConnectionStringBuilder settings)
44+
internal static async Task ClearPool(string connectionString)
4745
{
48-
if (ImplicitPools.TryGetValue(settings.ConnectionString, out var ready))
49-
return ready.OpenSession(CancellationToken.None).GetAwaiter().GetResult();
50-
51-
var driver = settings.BuildDriver().GetAwaiter().GetResult();
52-
ISessionSource source;
53-
54-
SemaphoreSlim.Wait();
55-
try
46+
if (Pools.Remove(connectionString, out var sessionPool))
5647
{
57-
if (!ImplicitPools.TryGetValue(settings.ConnectionString, out source))
48+
try
5849
{
59-
source = new ImplicitSessionSource(driver);
60-
ImplicitPools[settings.ConnectionString] = source;
61-
driver = null;
50+
await SemaphoreSlim.WaitAsync();
51+
52+
await sessionPool.DisposeAsync();
53+
}
54+
finally
55+
{
56+
SemaphoreSlim.Release();
6257
}
6358
}
64-
finally
65-
{
66-
SemaphoreSlim.Release();
67-
if (driver != null)
68-
driver.DisposeAsync().GetAwaiter().GetResult();
69-
}
70-
71-
return source.OpenSession(CancellationToken.None).GetAwaiter().GetResult();
72-
}
73-
74-
internal static async Task ClearPool(string connectionString)
75-
{
76-
Pools.TryRemove(connectionString, out var pooled);
77-
ImplicitPools.TryRemove(connectionString, out var implicitSrc);
78-
79-
var tasks = new List<Task>(2);
80-
if (pooled != null) tasks.Add(pooled.DisposeAsync().AsTask());
81-
if (implicitSrc != null) tasks.Add(implicitSrc.DisposeAsync().AsTask());
82-
83-
if (tasks.Count > 0)
84-
await Task.WhenAll(tasks);
8559
}
8660

8761
internal static async Task ClearAllPools()
8862
{
89-
var pooled = Pools.ToArray();
90-
var implicitArr = ImplicitPools.ToArray();
63+
var keys = Pools.Keys.ToList();
9164

92-
Pools.Clear();
93-
ImplicitPools.Clear();
65+
var tasks = keys.Select(ClearPool).ToList();
9466

95-
var tasks = new List<Task>(pooled.Length + implicitArr.Length);
96-
tasks.AddRange(pooled.Select(kv => kv.Value.DisposeAsync().AsTask()));
97-
tasks.AddRange(implicitArr.Select(kv => kv.Value.DisposeAsync().AsTask()));
9867
await Task.WhenAll(tasks);
9968
}
10069
}

src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using Microsoft.Extensions.Logging;
2-
using Microsoft.Extensions.Logging.Abstractions;
32

43
namespace Ydb.Sdk.Ado.Session;
54

@@ -9,16 +8,13 @@ internal class PoolingSessionFactory : IPoolingSessionFactory<PoolingSession>
98
private readonly bool _disableServerBalancer;
109
private readonly ILogger<PoolingSession> _logger;
1110

12-
internal PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory)
11+
internal PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings)
1312
{
1413
_driver = driver;
1514
_disableServerBalancer = settings.DisableServerBalancer;
16-
_logger = loggerFactory.CreateLogger<PoolingSession>();
15+
_logger = settings.LoggerFactory.CreateLogger<PoolingSession>();
1716
}
1817

19-
public static async Task<PoolingSessionFactory> Create(YdbConnectionStringBuilder settings) =>
20-
new(await settings.BuildDriver(), settings, settings.LoggerFactory ?? NullLoggerFactory.Instance);
21-
2218
public PoolingSession NewSession(PoolingSessionSource<PoolingSession> source) =>
2319
new(_driver, source, _disableServerBalancer, _logger);
2420

src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ private void InitDefaultValues()
4040
_maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize;
4141
_disableDiscovery = GrpcDefaultSettings.DisableDiscovery;
4242
_disableServerBalancer = false;
43-
_enableImplicitSession = false;
4443
}
4544

4645
public string Host
@@ -315,18 +314,6 @@ public int CreateSessionTimeout
315314

316315
private int _createSessionTimeout;
317316

318-
public bool EnableImplicitSession
319-
{
320-
get => _enableImplicitSession;
321-
set
322-
{
323-
_enableImplicitSession = value;
324-
SaveValue(nameof(EnableImplicitSession), value);
325-
}
326-
}
327-
328-
private bool _enableImplicitSession;
329-
330317
public ILoggerFactory? LoggerFactory { get; init; }
331318

332319
public ICredentialsProvider? CredentialsProvider { get; init; }
@@ -503,9 +490,6 @@ static YdbConnectionOption()
503490
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
504491
(builder, disableServerBalancer) => builder.DisableServerBalancer = disableServerBalancer),
505492
"DisableServerBalancer", "Disable Server Balancer");
506-
AddOption(new YdbConnectionOption<bool>(BoolExtractor,
507-
(builder, enableImplicit) => builder.EnableImplicitSession = enableImplicit),
508-
"EnableImplicitSession", "ImplicitSession");
509493
}
510494

511495
private static void AddOption(YdbConnectionOption option, params string[] keys)

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public ValueTask<IBidirectionalStream<TRequest, TResponse>> BidirectionalStreamC
3131
where TResponse : class;
3232

3333
ILoggerFactory LoggerFactory { get; }
34+
35+
void RegisterOwner();
36+
37+
bool IsDisposed { get; }
3438
}
3539

3640
public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
@@ -63,6 +67,8 @@ public abstract class BaseDriver : IDriver
6367
internal readonly GrpcChannelFactory GrpcChannelFactory;
6468
internal readonly ChannelPool<GrpcChannel> ChannelPool;
6569

70+
private int _ownerCount;
71+
6672
protected int Disposed;
6773

6874
internal BaseDriver(
@@ -204,12 +210,14 @@ protected async ValueTask<CallOptions> GetCallOptions(GrpcRequestSettings settin
204210
}
205211

206212
public ILoggerFactory LoggerFactory { get; }
213+
public void RegisterOwner() => _ownerCount++;
214+
public bool IsDisposed => Disposed == 1;
207215

208216
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
209217

210218
public async ValueTask DisposeAsync()
211219
{
212-
if (Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
220+
if (--_ownerCount <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
213221
{
214222
await ChannelPool.DisposeAsync();
215223

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/BulkUpsertImporterBenchmark.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,8 @@ public Task<TResponse>
6363
NotImplementedException();
6464

6565
public ILoggerFactory LoggerFactory => null!;
66+
67+
public void RegisterOwner() => throw new NotImplementedException();
68+
69+
public bool IsDisposed => false;
6670
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
using System.Collections.Immutable;
2+
using Xunit;
3+
4+
namespace Ydb.Sdk.Ado.Tests;
5+
6+
[Collection("PoolManagerTests")]
7+
[CollectionDefinition("PoolManagerTests", DisableParallelization = true)]
8+
public class PoolManagerTests
9+
{
10+
[Theory]
11+
[InlineData(new[]
12+
{
13+
"MinSessionSize=1", "MinSessionSize=2", "MinSessionSize=3",
14+
"MinSessionSize=1;DisableDiscovery=True", "MinSessionSize=2;DisableDiscovery=True"
15+
}, 2, 5)] // 2 transports (by the DisableDiscovery flag), 5 pools
16+
[InlineData(
17+
new[] { "MinSessionSize=1", "MinSessionSize=2", "MinSessionSize=3", "MinSessionSize=4", "MinSessionSize=5" },
18+
1, 5)] // 1 transport, 5 five pools
19+
[InlineData(new[]
20+
{ "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=3" }, 1,
21+
3)] // duplicate rows — we expect 1 transport, 3 pools
22+
[InlineData(new[]
23+
{
24+
"MinSessionSize=1;ConnectTimeout=5", "MinSessionSize=1;ConnectTimeout=6", "MinSessionSize=1;ConnectTimeout=7",
25+
"MinSessionSize=1;ConnectTimeout=8", "MinSessionSize=1;ConnectTimeout=9"
26+
}, 5, 5)] // 5 transport, 5 five pools
27+
[InlineData(new[] { "MinSessionSize=1" }, 1, 1)] // simple case
28+
public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int expectedDrivers, int expectedPools)
29+
{
30+
await YdbConnection.ClearAllPools();
31+
PoolManager.Drivers.Clear();
32+
33+
var connections = connectionStrings
34+
.Select(connectionString => new YdbConnection(connectionString))
35+
.ToImmutableArray();
36+
var parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList();
37+
await Task.WhenAll(parallelTasks);
38+
39+
Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
40+
Assert.Equal(expectedPools, PoolManager.Pools.Count);
41+
42+
await ClearAllConnections(connections);
43+
44+
parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList();
45+
await Task.WhenAll(parallelTasks);
46+
47+
foreach (var (_, driver) in PoolManager.Drivers)
48+
{
49+
Assert.False(driver.IsDisposed);
50+
}
51+
52+
Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
53+
Assert.Equal(expectedPools, PoolManager.Pools.Count);
54+
55+
await ClearAllConnections(connections);
56+
}
57+
58+
private static async Task ClearAllConnections(IReadOnlyCollection<YdbConnection> connections)
59+
{
60+
foreach (var connection in connections)
61+
await connection.CloseAsync();
62+
63+
await YdbConnection.ClearAllPools();
64+
Assert.Empty(PoolManager.Pools);
65+
66+
foreach (var (_, driver) in PoolManager.Drivers)
67+
{
68+
Assert.True(driver.IsDisposed);
69+
}
70+
}
71+
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class PoolingSessionTests
2121

2222
public PoolingSessionTests()
2323
{
24-
var settings = new YdbConnectionStringBuilder();
24+
var settings = new YdbConnectionStringBuilder { LoggerFactory = TestUtils.LoggerFactory };
2525

2626
_mockIDriver = new Mock<IDriver>(MockBehavior.Strict);
2727
_mockIDriver.Setup(driver => driver.LoggerFactory).Returns(TestUtils.LoggerFactory);
@@ -31,7 +31,7 @@ public PoolingSessionTests()
3131
It.Is<GrpcRequestSettings>(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId))
3232
).ReturnsAsync(_mockAttachStream.Object);
3333
_mockAttachStream.Setup(stream => stream.Dispose());
34-
_poolingSessionFactory = new PoolingSessionFactory(_mockIDriver.Object, settings, TestUtils.LoggerFactory);
34+
_poolingSessionFactory = new PoolingSessionFactory(_mockIDriver.Object, settings);
3535
_poolingSessionSource = new PoolingSessionSource<PoolingSession>(_poolingSessionFactory, settings);
3636
}
3737

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ public async Task ExecuteReaderAsync_WhenSelectNull_ThrowFieldIsNull()
6767
dbCommand.CommandText = "SELECT NULL";
6868
var reader = await dbCommand.ExecuteReaderAsync();
6969
await reader.ReadAsync();
70-
7170
Assert.Equal("Field is null.", Assert.Throws<InvalidCastException>(() => reader.GetFloat(0)).Message);
7271
}
7372

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnection
2828
Assert.False(ydbConnectionStringBuilder.DisableDiscovery);
2929
Assert.False(ydbConnectionStringBuilder.DisableServerBalancer);
3030
Assert.False(ydbConnectionStringBuilder.UseTls);
31-
Assert.False(ydbConnectionStringBuilder.EnableImplicitSession);
3231
}
3332

3433
[Fact]
@@ -51,7 +50,7 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
5150
"ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
5251
"EnableMultipleHttp2Connections=true;" +
5352
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" +
54-
"DisableDiscovery=true;DisableServerBalancer=true;EnableImplicitSession=true;"
53+
"DisableDiscovery=true;DisableServerBalancer=true;"
5554
);
5655

5756
Assert.Equal(2135, connectionString.Port);
@@ -75,11 +74,9 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
7574
"ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
7675
"EnableMultipleHttp2Connections=True;" +
7776
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" +
78-
"DisableDiscovery=True;DisableServerBalancer=True;EnableImplicitSession=True",
79-
connectionString.ConnectionString);
77+
"DisableDiscovery=True;DisableServerBalancer=True", connectionString.ConnectionString);
8078
Assert.True(connectionString.DisableDiscovery);
8179
Assert.True(connectionString.DisableServerBalancer);
82-
Assert.True(connectionString.EnableImplicitSession);
8380
}
8481

8582
[Fact]

0 commit comments

Comments
 (0)