Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
- Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`
- Feat ADO.NET: cache gRPC transport by `gRPCConnectionString` to reuse channels.
- Fix bug wrap-around ADO.NET: Big parameterized Decimal — `((ulong)bits[1] << 32)` -> `((ulong)(uint)bits[1] << 32)`.
- Feat ADO.NET: Parameterized Decimal overflow check: `Precision` and `Scale`.
- Feat ADO.NET: Deleted support for `DateTimeOffset` was a mistake.
- Feat ADO.NET: Added support for `Date32`, `Datetime64`, `Timestamp64` and `Interval64` types in YDB.
Expand Down
15 changes: 11 additions & 4 deletions src/Ydb.Sdk/src/Ado/PoolManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ namespace Ydb.Sdk.Ado;
internal static class PoolManager
{
private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex
private static readonly ConcurrentDictionary<string, ISessionSource> Pools = new();

internal static readonly ConcurrentDictionary<string, IDriver> Drivers = new();
internal static readonly ConcurrentDictionary<string, ISessionSource> Pools = new();

internal static async Task<ISession> GetSession(
YdbConnectionStringBuilder settings,
Expand All @@ -27,9 +29,14 @@ CancellationToken cancellationToken
return await pool.OpenSession(cancellationToken);
}

var newSessionPool = new PoolingSessionSource<PoolingSession>(
await PoolingSessionFactory.Create(settings), settings
);
var driver = Drivers.TryGetValue(settings.GrpcConnectionString, out var cacheDriver) &&
!cacheDriver.IsDisposed
? cacheDriver
: Drivers[settings.GrpcConnectionString] = await settings.BuildDriver();
driver.RegisterOwner();

var factory = new PoolingSessionFactory(driver, settings);
var newSessionPool = new PoolingSessionSource<PoolingSession>(factory, settings);

Pools[settings.ConnectionString] = newSessionPool;

Expand Down
8 changes: 2 additions & 6 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

namespace Ydb.Sdk.Ado.Session;

Expand All @@ -9,16 +8,13 @@ internal class PoolingSessionFactory : IPoolingSessionFactory<PoolingSession>
private readonly bool _disableServerBalancer;
private readonly ILogger<PoolingSession> _logger;

internal PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory)
internal PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings)
{
_driver = driver;
_disableServerBalancer = settings.DisableServerBalancer;
_logger = loggerFactory.CreateLogger<PoolingSession>();
_logger = settings.LoggerFactory.CreateLogger<PoolingSession>();
}

public static async Task<PoolingSessionFactory> Create(YdbConnectionStringBuilder settings) =>
new(await settings.BuildDriver(), settings, settings.LoggerFactory ?? NullLoggerFactory.Instance);

public PoolingSession NewSession(PoolingSessionSource<PoolingSession> source) =>
new(_driver, source, _disableServerBalancer, _logger);

Expand Down
13 changes: 9 additions & 4 deletions src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ public int CreateSessionTimeout

private int _createSessionTimeout;

public ILoggerFactory? LoggerFactory { get; init; }
public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance;

public ICredentialsProvider? CredentialsProvider { get; init; }

Expand Down Expand Up @@ -358,6 +358,12 @@ public override object this[string keyword]

private string Endpoint => $"{(UseTls ? "grpcs" : "grpc")}://{Host}:{Port}";

internal string GrpcConnectionString =>
$"UseTls={UseTls};Host={Host};Port={Port};Database={Database};User={User};Password={Password};" +
$"ConnectTimeout={ConnectTimeout};KeepAlivePingDelay={KeepAlivePingDelay};KeepAlivePingTimeout={KeepAlivePingTimeout};" +
$"EnableMultipleHttp2Connections={EnableMultipleHttp2Connections};MaxSendMessageSize={MaxSendMessageSize};" +
$"MaxReceiveMessageSize={MaxReceiveMessageSize};DisableDiscovery={DisableDiscovery}";

internal async Task<IDriver> BuildDriver()
{
var cert = RootCertificate != null ? X509Certificate.CreateFromCertFile(RootCertificate) : null;
Expand All @@ -384,11 +390,10 @@ internal async Task<IDriver> BuildDriver()
MaxSendMessageSize = MaxSendMessageSize,
MaxReceiveMessageSize = MaxReceiveMessageSize
};
var loggerFactory = LoggerFactory ?? NullLoggerFactory.Instance;

return DisableDiscovery
? new DirectGrpcChannelDriver(driverConfig, loggerFactory)
: await Driver.CreateInitialized(driverConfig, loggerFactory);
? new DirectGrpcChannelDriver(driverConfig, LoggerFactory)
: await Driver.CreateInitialized(driverConfig, LoggerFactory);
}

public override void Clear()
Expand Down
10 changes: 9 additions & 1 deletion src/Ydb.Sdk/src/IDriver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public ValueTask<IBidirectionalStream<TRequest, TResponse>> BidirectionalStreamC
where TResponse : class;

ILoggerFactory LoggerFactory { get; }

void RegisterOwner();

bool IsDisposed { get; }
}

public interface IBidirectionalStream<in TRequest, out TResponse> : IDisposable
Expand Down Expand Up @@ -63,6 +67,8 @@ public abstract class BaseDriver : IDriver
internal readonly GrpcChannelFactory GrpcChannelFactory;
internal readonly ChannelPool<GrpcChannel> ChannelPool;

private int _ownerCount;

protected int Disposed;

internal BaseDriver(
Expand Down Expand Up @@ -204,12 +210,14 @@ protected async ValueTask<CallOptions> GetCallOptions(GrpcRequestSettings settin
}

public ILoggerFactory LoggerFactory { get; }
public void RegisterOwner() => _ownerCount++;
public bool IsDisposed => Disposed == 1;

public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
if (--_ownerCount <= 0 && Interlocked.CompareExchange(ref Disposed, 1, 0) == 0)
{
await ChannelPool.DisposeAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,8 @@ public Task<TResponse>
NotImplementedException();

public ILoggerFactory LoggerFactory => null!;

public void RegisterOwner() => throw new NotImplementedException();

public bool IsDisposed => false;
}
71 changes: 71 additions & 0 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/PoolManagerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using System.Collections.Immutable;
using Xunit;

namespace Ydb.Sdk.Ado.Tests;

[Collection("PoolManagerTests")]
[CollectionDefinition("PoolManagerTests", DisableParallelization = true)]
public class PoolManagerTests
{
[Theory]
[InlineData(new[]
{
"MinSessionSize=1", "MinSessionSize=2", "MinSessionSize=3",
"MinSessionSize=1;DisableDiscovery=True", "MinSessionSize=2;DisableDiscovery=True"
}, 2, 5)] // 2 transports (by the DisableDiscovery flag), 5 pools
[InlineData(
new[] { "MinSessionSize=1", "MinSessionSize=2", "MinSessionSize=3", "MinSessionSize=4", "MinSessionSize=5" },
1, 5)] // 1 transport, 5 five pools
[InlineData(new[]
{ "MinSessionSize=1", "MinSessionSize=1", "MinSessionSize=2", "MinSessionSize=2", "MinSessionSize=3" }, 1,
3)] // duplicate rows — we expect 1 transport, 3 pools
[InlineData(new[]
{
"MinSessionSize=1;ConnectTimeout=5", "MinSessionSize=1;ConnectTimeout=6", "MinSessionSize=1;ConnectTimeout=7",
"MinSessionSize=1;ConnectTimeout=8", "MinSessionSize=1;ConnectTimeout=9"
}, 5, 5)] // 5 transport, 5 five pools
[InlineData(new[] { "MinSessionSize=1" }, 1, 1)] // simple case
public async Task PoolManager_CachingAndCleanup(string[] connectionStrings, int expectedDrivers, int expectedPools)
{
await YdbConnection.ClearAllPools();
PoolManager.Drivers.Clear();

var connections = connectionStrings
.Select(connectionString => new YdbConnection(connectionString))
.ToImmutableArray();
var parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList();
await Task.WhenAll(parallelTasks);

Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
Assert.Equal(expectedPools, PoolManager.Pools.Count);

await ClearAllConnections(connections);

parallelTasks = connections.Select(connection => connection.OpenAsync()).ToList();
await Task.WhenAll(parallelTasks);

foreach (var (_, driver) in PoolManager.Drivers)
{
Assert.False(driver.IsDisposed);
}

Assert.Equal(expectedDrivers, PoolManager.Drivers.Count);
Assert.Equal(expectedPools, PoolManager.Pools.Count);

await ClearAllConnections(connections);
}

private static async Task ClearAllConnections(IReadOnlyCollection<YdbConnection> connections)
{
foreach (var connection in connections)
await connection.CloseAsync();

await YdbConnection.ClearAllPools();
Assert.Empty(PoolManager.Pools);

foreach (var (_, driver) in PoolManager.Drivers)
{
Assert.True(driver.IsDisposed);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class PoolingSessionTests

public PoolingSessionTests()
{
var settings = new YdbConnectionStringBuilder();
var settings = new YdbConnectionStringBuilder { LoggerFactory = TestUtils.LoggerFactory };

_mockIDriver = new Mock<IDriver>(MockBehavior.Strict);
_mockIDriver.Setup(driver => driver.LoggerFactory).Returns(TestUtils.LoggerFactory);
Expand All @@ -31,7 +31,7 @@ public PoolingSessionTests()
It.Is<GrpcRequestSettings>(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId))
).ReturnsAsync(_mockAttachStream.Object);
_mockAttachStream.Setup(stream => stream.Dispose());
_poolingSessionFactory = new PoolingSessionFactory(_mockIDriver.Object, settings, TestUtils.LoggerFactory);
_poolingSessionFactory = new PoolingSessionFactory(_mockIDriver.Object, settings);
_poolingSessionSource = new PoolingSessionSource<PoolingSession>(_poolingSessionFactory, settings);
}

Expand Down
1 change: 0 additions & 1 deletion src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public async Task ExecuteReaderAsync_WhenSelectNull_ThrowFieldIsNull()
dbCommand.CommandText = "SELECT NULL";
var reader = await dbCommand.ExecuteReaderAsync();
await reader.ReadAsync();

Assert.Equal("Field is null.", Assert.Throws<InvalidCastException>(() => reader.GetFloat(0)).Message);
}

Expand Down
104 changes: 62 additions & 42 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ namespace Ydb.Sdk.Ado.Tests;

public class YdbConnectionStringBuilderTests
{
private const int MessageSize = 64 * 1024 * 1024;

[Fact]
public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnectionString()
{
Expand All @@ -23,11 +25,16 @@ public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnection
Assert.Equal(10, ydbConnectionStringBuilder.KeepAlivePingTimeout);
Assert.Equal("", ydbConnectionStringBuilder.ConnectionString);
Assert.False(ydbConnectionStringBuilder.EnableMultipleHttp2Connections);
Assert.Equal(64 * 1024 * 1024, ydbConnectionStringBuilder.MaxSendMessageSize);
Assert.Equal(64 * 1024 * 1024, ydbConnectionStringBuilder.MaxReceiveMessageSize);
Assert.Equal(MessageSize, ydbConnectionStringBuilder.MaxSendMessageSize);
Assert.Equal(MessageSize, ydbConnectionStringBuilder.MaxReceiveMessageSize);
Assert.False(ydbConnectionStringBuilder.DisableDiscovery);
Assert.False(ydbConnectionStringBuilder.DisableServerBalancer);
Assert.False(ydbConnectionStringBuilder.UseTls);

Assert.Equal("UseTls=False;Host=localhost;Port=2136;Database=/local;User=;Password=;ConnectTimeout=5;" +
"KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" +
$"MaxSendMessageSize={MessageSize};MaxReceiveMessageSize={MessageSize};DisableDiscovery=False",
ydbConnectionStringBuilder.GrpcConnectionString);
}

[Fact]
Expand All @@ -43,68 +50,81 @@ public void InitConnectionStringBuilder_WhenUnexpectedKey_ThrowException()
[Fact]
public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnectionString()
{
var connectionString = new YdbConnectionStringBuilder(
"Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;" +
"MinSessionPool=10;MaxSessionPool=50;CreateSessionTimeout=30;" +
"SessionIdleTimeout=600;" +
"ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
"EnableMultipleHttp2Connections=true;" +
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" +
"DisableDiscovery=true;DisableServerBalancer=true;"
var ydbConnectionStringBuilder = new YdbConnectionStringBuilder(
"Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;MinSessionPool=10;MaxSessionPool=50;" +
"CreateSessionTimeout=30;SessionIdleTimeout=600;ConnectTimeout=30;KeepAlivePingDelay=30;" +
"KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=true;MaxSendMessageSize=1000000;" +
"MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;"
);

Assert.Equal(2135, connectionString.Port);
Assert.Equal("server", connectionString.Host);
Assert.Equal("/my/path", connectionString.Database);
Assert.Equal(10, connectionString.MinSessionPool);
Assert.Equal(50, connectionString.MaxSessionPool);
Assert.Equal(30, connectionString.CreateSessionTimeout);
Assert.Equal(600, connectionString.SessionIdleTimeout);
Assert.Equal("Kirill", connectionString.User);
Assert.Equal(30, connectionString.ConnectTimeout);
Assert.Equal(30, connectionString.KeepAlivePingDelay);
Assert.Equal(60, connectionString.KeepAlivePingTimeout);
Assert.Null(connectionString.Password);
Assert.True(connectionString.EnableMultipleHttp2Connections);
Assert.Equal(1000000, connectionString.MaxSendMessageSize);
Assert.Equal(1000000, connectionString.MaxReceiveMessageSize);
Assert.Equal(2135, ydbConnectionStringBuilder.Port);
Assert.Equal("server", ydbConnectionStringBuilder.Host);
Assert.Equal("/my/path", ydbConnectionStringBuilder.Database);
Assert.Equal(10, ydbConnectionStringBuilder.MinSessionPool);
Assert.Equal(50, ydbConnectionStringBuilder.MaxSessionPool);
Assert.Equal(30, ydbConnectionStringBuilder.CreateSessionTimeout);
Assert.Equal(600, ydbConnectionStringBuilder.SessionIdleTimeout);
Assert.Equal("Kirill", ydbConnectionStringBuilder.User);
Assert.Equal(30, ydbConnectionStringBuilder.ConnectTimeout);
Assert.Equal(30, ydbConnectionStringBuilder.KeepAlivePingDelay);
Assert.Equal(60, ydbConnectionStringBuilder.KeepAlivePingTimeout);
Assert.Null(ydbConnectionStringBuilder.Password);
Assert.True(ydbConnectionStringBuilder.EnableMultipleHttp2Connections);
Assert.Equal(1000000, ydbConnectionStringBuilder.MaxSendMessageSize);
Assert.Equal(1000000, ydbConnectionStringBuilder.MaxReceiveMessageSize);
Assert.Equal("Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=True;" +
"MinSessionPool=10;MaxSessionPool=50;CreateSessionTimeout=30;" +
"SessionIdleTimeout=600;" +
"ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" +
"EnableMultipleHttp2Connections=True;" +
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" +
"DisableDiscovery=True;DisableServerBalancer=True", connectionString.ConnectionString);
Assert.True(connectionString.DisableDiscovery);
Assert.True(connectionString.DisableServerBalancer);
"DisableDiscovery=True;DisableServerBalancer=True", ydbConnectionStringBuilder.ConnectionString);
Assert.True(ydbConnectionStringBuilder.DisableDiscovery);
Assert.True(ydbConnectionStringBuilder.DisableServerBalancer);
Assert.Equal("UseTls=True;Host=server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=30;" +
"KeepAlivePingDelay=30;KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=True;" +
"MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;DisableDiscovery=True",
ydbConnectionStringBuilder.GrpcConnectionString);
}

[Fact]
public void Host_WhenSetInProperty_ReturnUpdatedConnectionString()
{
var connectionString = new YdbConnectionStringBuilder("Host=server;Port=2135;Database=/my/path;User=Kirill");

Assert.Equal("server", connectionString.Host);
connectionString.Host = "new_server";
Assert.Equal("new_server", connectionString.Host);

Assert.Equal("Host=new_server;Port=2135;Database=/my/path;User=Kirill", connectionString.ConnectionString);
var ydbConnectionStringBuilder =
new YdbConnectionStringBuilder("Host=server;Port=2135;Database=/my/path;User=Kirill");
Assert.Equal(
"UseTls=False;Host=server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=5;" +
"KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" +
$"MaxSendMessageSize={MessageSize};MaxReceiveMessageSize={MessageSize};DisableDiscovery=False",
ydbConnectionStringBuilder.GrpcConnectionString);
Assert.Equal("server", ydbConnectionStringBuilder.Host);
ydbConnectionStringBuilder.Host = "new_server";
Assert.Equal("new_server", ydbConnectionStringBuilder.Host);
Assert.Equal(
"UseTls=False;Host=new_server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=5;" +
"KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" +
$"MaxSendMessageSize={MessageSize};MaxReceiveMessageSize={MessageSize};DisableDiscovery=False",
ydbConnectionStringBuilder.GrpcConnectionString);
Assert.Equal("Host=new_server;Port=2135;Database=/my/path;User=Kirill",
ydbConnectionStringBuilder.ConnectionString);
}

[Fact]
public void SetProperty_WhenPropertyNeedsTrimOperation_ReturnUpdatedConnectionString()
{
var connectionString =
var ydbConnectionStringBuilder =
new YdbConnectionStringBuilder(" Host =server;Port=2135; EnableMultipleHttp2Connections =true");

Assert.Equal(2135, connectionString.Port);
Assert.Equal("server", connectionString.Host);
Assert.True(connectionString.EnableMultipleHttp2Connections);
Assert.Equal(2135, ydbConnectionStringBuilder.Port);
Assert.Equal("server", ydbConnectionStringBuilder.Host);
Assert.True(ydbConnectionStringBuilder.EnableMultipleHttp2Connections);

Assert.Equal("Host=server;Port=2135;EnableMultipleHttp2Connections=True", connectionString.ConnectionString);
Assert.Equal("Host=server;Port=2135;EnableMultipleHttp2Connections=True",
ydbConnectionStringBuilder.ConnectionString);

connectionString.EnableMultipleHttp2Connections = false;
ydbConnectionStringBuilder.EnableMultipleHttp2Connections = false;

Assert.Equal("Host=server;Port=2135;EnableMultipleHttp2Connections=False", connectionString.ConnectionString);
Assert.Equal("Host=server;Port=2135;EnableMultipleHttp2Connections=False",
ydbConnectionStringBuilder.ConnectionString);
}
}
Loading
Loading