Skip to content

Commit 3b75646

Browse files
Feat ADO.NET: cache gRPC transport by gRPCConnectionString to reuse channels.
1 parent 531b609 commit 3b75646

File tree

9 files changed

+162
-47
lines changed

9 files changed

+162
-47
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
- Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`
1+
- Feat ADO.NET: cache gRPC transport by `gRPCConnectionString` to reuse channels.
2+
- Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`.
23
- Feat ADO.NET: Parameterized Decimal overflow check: `Precision` and `Scale`.
34
- Feat ADO.NET: Deleted support for `DateTimeOffset` was a mistake.
45
- Feat ADO.NET: Added support for `Date32`, `Datetime64`, `Timestamp64` and `Interval64` types in YDB.

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ namespace Ydb.Sdk.Ado;
66
internal static class PoolManager
77
{
88
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
9-
private static readonly ConcurrentDictionary<string, ISessionSource> Pools = new();
9+
10+
internal static readonly ConcurrentDictionary<string, IDriver> Drivers = new();
11+
internal static readonly ConcurrentDictionary<string, ISessionSource> Pools = new();
1012

1113
internal static async Task<ISession> GetSession(
1214
YdbConnectionStringBuilder settings,
@@ -27,9 +29,14 @@ CancellationToken cancellationToken
2729
return await pool.OpenSession(cancellationToken);
2830
}
2931

30-
var newSessionPool = new PoolingSessionSource<PoolingSession>(
31-
await PoolingSessionFactory.Create(settings), settings
32-
);
32+
var driver = Drivers.TryGetValue(settings.GrpcConnectionString, out var cacheDriver) &&
33+
!cacheDriver.IsDisposed
34+
? cacheDriver
35+
: Drivers[settings.GrpcConnectionString] = await settings.BuildDriver();
36+
driver.RegisterOwner();
37+
38+
var factory = new PoolingSessionFactory(driver, settings);
39+
var newSessionPool = new PoolingSessionSource<PoolingSession>(factory, settings);
3340

3441
Pools[settings.ConnectionString] = newSessionPool;
3542

src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using Microsoft.Extensions.Logging;
2-
using Microsoft.Extensions.Logging.Abstractions;
32

43
namespace Ydb.Sdk.Ado.Session;
54

@@ -9,16 +8,13 @@ internal class PoolingSessionFactory : IPoolingSessionFactory<PoolingSession>
98
private readonly bool _disableServerBalancer;
109
private readonly ILogger<PoolingSession> _logger;
1110

12-
internal PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory)
11+
internal PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings)
1312
{
1413
_driver = driver;
1514
_disableServerBalancer = settings.DisableServerBalancer;
16-
_logger = loggerFactory.CreateLogger<PoolingSession>();
15+
_logger = settings.LoggerFactory.CreateLogger<PoolingSession>();
1716
}
1817

19-
public static async Task<PoolingSessionFactory> Create(YdbConnectionStringBuilder settings) =>
20-
new(await settings.BuildDriver(), settings, settings.LoggerFactory ?? NullLoggerFactory.Instance);
21-
2218
public PoolingSession NewSession(PoolingSessionSource<PoolingSession> source) =>
2319
new(_driver, source, _disableServerBalancer, _logger);
2420

src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ public int CreateSessionTimeout
314314

315315
private int _createSessionTimeout;
316316

317-
public ILoggerFactory? LoggerFactory { get; init; }
317+
public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;
318318

319319
public ICredentialsProvider? CredentialsProvider { get; init; }
320320

@@ -358,6 +358,12 @@ public override object this[string keyword]
358358

359359
private string Endpoint => $"{(UseTls ? "grpcs" : "grpc")}://{Host}:{Port}";
360360

361+
internal string GrpcConnectionString =>
362+
$"UseTls={UseTls};Host={Host};Port={Port};Database={Database};User={User};Password={Password};" +
363+
$"ConnectTimeout={ConnectTimeout};KeepAlivePingDelay={KeepAlivePingDelay};KeepAlivePingTimeout={KeepAlivePingTimeout};" +
364+
$"EnableMultipleHttp2Connections={EnableMultipleHttp2Connections};MaxSendMessageSize={MaxSendMessageSize};" +
365+
$"MaxReceiveMessageSize={MaxReceiveMessageSize};DisableDiscovery={DisableDiscovery}";
366+
361367
internal async Task<IDriver> BuildDriver()
362368
{
363369
var cert = RootCertificate != null ? X509Certificate.CreateFromCertFile(RootCertificate) : null;
@@ -384,11 +390,10 @@ internal async Task<IDriver> BuildDriver()
384390
MaxSendMessageSize = MaxSendMessageSize,
385391
MaxReceiveMessageSize = MaxReceiveMessageSize
386392
};
387-
var loggerFactory = LoggerFactory ?? NullLoggerFactory.Instance;
388393

389394
return DisableDiscovery
390-
? new DirectGrpcChannelDriver(driverConfig, loggerFactory)
391-
: await Driver.CreateInitialized(driverConfig, loggerFactory);
395+
? new DirectGrpcChannelDriver(driverConfig, LoggerFactory)
396+
: await Driver.CreateInitialized(driverConfig, LoggerFactory);
392397
}
393398

394399
public override void Clear()

src/Ydb.Sdk/src/IDriver.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public ValueTask<IBidirectionalStream<TRequest, TResponse>> BidirectionalStreamC
3131
where TResponse : class;
3232

3333
ILoggerFactory LoggerFactory { get; }
34+
35+
void RegisterOwner();
36+
37+
bool IsDisposed { get; }
3438
}
3539

3640
public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
@@ -63,6 +67,8 @@ public abstract class BaseDriver : IDriver
6367
internal readonly GrpcChannelFactory GrpcChannelFactory;
6468
internal readonly ChannelPool<GrpcChannel> ChannelPool;
6569

70+
private int _ownerCount;
71+
6672
protected int Disposed;
6773

6874
internal BaseDriver(
@@ -204,12 +210,14 @@ protected async ValueTask<CallOptions> GetCallOptions(GrpcRequestSettings settin
204210
}
205211

206212
public ILoggerFactory LoggerFactory { get; }
213+
public void RegisterOwner() => _ownerCount++;
214+
public bool IsDisposed => Disposed == 1;
207215

208216
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
209217

210218
public async ValueTask DisposeAsync()
211219
{
212-
if (Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
220+
if (--_ownerCount <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
213221
{
214222
await ChannelPool.DisposeAsync();
215223

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/BulkUpsertImporterBenchmark.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,8 @@ public Task<TResponse>
6363
NotImplementedException();
6464

6565
public ILoggerFactory LoggerFactory => null!;
66+
67+
public void RegisterOwner() => throw new NotImplementedException();
68+
69+
public bool IsDisposed => false;
6670
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
using System.Collections.Immutable;
2+
using Xunit;
3+
4+
namespace Ydb.Sdk.Ado.Tests;
5+
6+
[CollectionDefinition("PoolManagerTests", DisableParallelization = true)]
7+
public class PoolManagerTests
8+
{
9+
[Theory]
10+
[InlineData(new[]
11+
{
12+
"MinSessionSize=1", "MinSessionSize=2", "MinSessionSize=3",
13+
"MinSessionSize=1;DisableDiscovery=True", "MinSessionSize=2;DisableDiscovery=True"
14+
}, 2, 5)] // 2 transports (by the DisableDiscovery flag), 5 pools
15+
[InlineData(
16+
new[] { "MinSessionSize=1", "MinSessionSize=2", "MinSessionSize=3", "MinSessionSize=4", "MinSessionSize=5" },
17+
1, 5)] // 1 transport, 5 five pools
18+
[InlineData(new[]
19+
{ "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=3" }, 1,
20+
3)] // duplicate rows — we expect 1 transport, 3 pools
21+
[InlineData(new[]
22+
{
23+
"MinSessionSize=1;ConnectTimeout=5", "MinSessionSize=1;ConnectTimeout=6", "MinSessionSize=1;ConnectTimeout=7",
24+
"MinSessionSize=1;ConnectTimeout=8", "MinSessionSize=1;ConnectTimeout=9"
25+
}, 5, 5)] // 5 transport, 5 five pools
26+
[InlineData(new[]
27+
{
28+
"MinSessionSize=1"
29+
}, 1, 1)] // simple case
30+
public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int expectedDrivers, int expectedPools)
31+
{
32+
foreach (var (_, pool) in PoolManager.Pools)
33+
{
34+
await pool.DisposeAsync();
35+
}
36+
37+
PoolManager.Pools.Clear();
38+
PoolManager.Drivers.Clear();
39+
40+
var connections = connectionStrings
41+
.Select(connectionString => new YdbConnection(connectionString))
42+
.ToImmutableArray();
43+
var parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList();
44+
await Task.WhenAll(parallelTasks);
45+
46+
Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
47+
Assert.Equal(expectedPools, PoolManager.Pools.Count);
48+
49+
await ClearAllConnections(connections);
50+
51+
parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList();
52+
await Task.WhenAll(parallelTasks);
53+
54+
foreach (var (_, driver) in PoolManager.Drivers)
55+
{
56+
Assert.False(driver.IsDisposed);
57+
}
58+
59+
Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
60+
Assert.Equal(expectedPools, PoolManager.Pools.Count);
61+
62+
await ClearAllConnections(connections);
63+
}
64+
65+
private static async Task ClearAllConnections(IReadOnlyCollection<YdbConnection> connections)
66+
{
67+
foreach (var connection in connections)
68+
await connection.CloseAsync();
69+
70+
await YdbConnection.ClearAllPools();
71+
Assert.Empty(PoolManager.Pools);
72+
73+
foreach (var (_, driver) in PoolManager.Drivers)
74+
{
75+
Assert.True(driver.IsDisposed);
76+
}
77+
}
78+
}

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class PoolingSessionTests
2121

2222
public PoolingSessionTests()
2323
{
24-
var settings = new YdbConnectionStringBuilder();
24+
var settings = new YdbConnectionStringBuilder { LoggerFactory = TestUtils.LoggerFactory};
2525

2626
_mockIDriver = new Mock<IDriver>(MockBehavior.Strict);
2727
_mockIDriver.Setup(driver => driver.LoggerFactory).Returns(TestUtils.LoggerFactory);
@@ -31,7 +31,7 @@ public PoolingSessionTests()
3131
It.Is<GrpcRequestSettings>(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId))
3232
).ReturnsAsync(_mockAttachStream.Object);
3333
_mockAttachStream.Setup(stream => stream.Dispose());
34-
_poolingSessionFactory = new PoolingSessionFactory(_mockIDriver.Object, settings, TestUtils.LoggerFactory);
34+
_poolingSessionFactory = new PoolingSessionFactory(_mockIDriver.Object, settings);
3535
_poolingSessionSource = new PoolingSessionSource<PoolingSession>(_poolingSessionFactory, settings);
3636
}
3737

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,37 @@ namespace Ydb.Sdk.Ado.Tests;
44

55
public class YdbConnectionStringBuilderTests
66
{
7+
private const int MessageSize = 64 * 1024 * 1024;
8+
79
[Fact]
810
public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnectionString()
911
{
10-
var ydbConnectionStringBuilder = new YdbConnectionStringBuilder();
12+
var connectionString = new YdbConnectionStringBuilder();
13+
14+
Assert.Equal(2136, connectionString.Port);
15+
Assert.Equal("localhost", connectionString.Host);
16+
Assert.Equal("/local", connectionString.Database);
17+
Assert.Equal(0, connectionString.MinSessionPool);
18+
Assert.Equal(100, connectionString.MaxSessionPool);
19+
Assert.Equal(5, connectionString.CreateSessionTimeout);
20+
Assert.Equal(300, connectionString.SessionIdleTimeout);
21+
Assert.Null(connectionString.User);
22+
Assert.Null(connectionString.Password);
23+
Assert.Equal(5, connectionString.ConnectTimeout);
24+
Assert.Equal(10, connectionString.KeepAlivePingDelay);
25+
Assert.Equal(10, connectionString.KeepAlivePingTimeout);
26+
Assert.Equal("", connectionString.ConnectionString);
27+
Assert.False(connectionString.EnableMultipleHttp2Connections);
28+
Assert.Equal(MessageSize, connectionString.MaxSendMessageSize);
29+
Assert.Equal(MessageSize, connectionString.MaxReceiveMessageSize);
30+
Assert.False(connectionString.DisableDiscovery);
31+
Assert.False(connectionString.DisableServerBalancer);
32+
Assert.False(connectionString.UseTls);
1133

12-
Assert.Equal(2136, ydbConnectionStringBuilder.Port);
13-
Assert.Equal("localhost", ydbConnectionStringBuilder.Host);
14-
Assert.Equal("/local", ydbConnectionStringBuilder.Database);
15-
Assert.Equal(0, ydbConnectionStringBuilder.MinSessionPool);
16-
Assert.Equal(100, ydbConnectionStringBuilder.MaxSessionPool);
17-
Assert.Equal(5, ydbConnectionStringBuilder.CreateSessionTimeout);
18-
Assert.Equal(300, ydbConnectionStringBuilder.SessionIdleTimeout);
19-
Assert.Null(ydbConnectionStringBuilder.User);
20-
Assert.Null(ydbConnectionStringBuilder.Password);
21-
Assert.Equal(5, ydbConnectionStringBuilder.ConnectTimeout);
22-
Assert.Equal(10, ydbConnectionStringBuilder.KeepAlivePingDelay);
23-
Assert.Equal(10, ydbConnectionStringBuilder.KeepAlivePingTimeout);
24-
Assert.Equal("", ydbConnectionStringBuilder.ConnectionString);
25-
Assert.False(ydbConnectionStringBuilder.EnableMultipleHttp2Connections);
26-
Assert.Equal(64 * 1024 * 1024, ydbConnectionStringBuilder.MaxSendMessageSize);
27-
Assert.Equal(64 * 1024 * 1024, ydbConnectionStringBuilder.MaxReceiveMessageSize);
28-
Assert.False(ydbConnectionStringBuilder.DisableDiscovery);
29-
Assert.False(ydbConnectionStringBuilder.DisableServerBalancer);
30-
Assert.False(ydbConnectionStringBuilder.UseTls);
34+
Assert.Equal("UseTls=False;Host=localhost;Port=2136;Database=/local;User=;Password=;ConnectTimeout=5;" +
35+
"KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" +
36+
$"MaxSendMessageSize={MessageSize};MaxReceiveMessageSize={MessageSize};DisableDiscovery=False",
37+
connectionString.GrpcConnectionString);
3138
}
3239

3340
[Fact]
@@ -44,13 +51,10 @@ public void InitConnectionStringBuilder_WhenUnexpectedKey_ThrowException()
4451
public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnectionString()
4552
{
4653
var connectionString = new YdbConnectionStringBuilder(
47-
"Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;" +
48-
"MinSessionPool=10;MaxSessionPool=50;CreateSessionTimeout=30;" +
49-
"SessionIdleTimeout=600;" +
50-
"ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
51-
"EnableMultipleHttp2Connections=true;" +
52-
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" +
53-
"DisableDiscovery=true;DisableServerBalancer=true;"
54+
"Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;MinSessionPool=10;MaxSessionPool=50;" +
55+
"CreateSessionTimeout=30;SessionIdleTimeout=600;ConnectTimeout=30;KeepAlivePingDelay=30;" +
56+
"KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=true;MaxSendMessageSize=1000000;" +
57+
"MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;"
5458
);
5559

5660
Assert.Equal(2135, connectionString.Port);
@@ -77,17 +81,29 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection
7781
"DisableDiscovery=True;DisableServerBalancer=True", connectionString.ConnectionString);
7882
Assert.True(connectionString.DisableDiscovery);
7983
Assert.True(connectionString.DisableServerBalancer);
84+
Assert.Equal("UseTls=True;Host=server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=30;" +
85+
"KeepAlivePingDelay=30;KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=True;" +
86+
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;DisableDiscovery=True",
87+
connectionString.GrpcConnectionString);
8088
}
8189

8290
[Fact]
8391
public void Host_WhenSetInProperty_ReturnUpdatedConnectionString()
8492
{
8593
var connectionString = new YdbConnectionStringBuilder("Host=server;Port=2135;Database=/my/path;User=Kirill");
86-
94+
Assert.Equal(
95+
"UseTls=False;Host=server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=5;" +
96+
"KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" +
97+
$"MaxSendMessageSize={MessageSize};MaxReceiveMessageSize={MessageSize};DisableDiscovery=False",
98+
connectionString.GrpcConnectionString);
8799
Assert.Equal("server", connectionString.Host);
88100
connectionString.Host = "new_server";
89101
Assert.Equal("new_server", connectionString.Host);
90-
102+
Assert.Equal(
103+
"UseTls=False;Host=new_server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=5;" +
104+
"KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" +
105+
$"MaxSendMessageSize={MessageSize};MaxReceiveMessageSize={MessageSize};DisableDiscovery=False",
106+
connectionString.GrpcConnectionString);
91107
Assert.Equal("Host=new_server;Port=2135;Database=/my/path;User=Kirill", connectionString.ConnectionString);
92108
}
93109

0 commit comments

Comments
 (0)