diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index 78c5929b..c3908d52 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,3 +1,5 @@ +- Feat ADO.NET: Added `EnableImplicitSession` to support implicit sessions. + ## v0.23.1 - Fixed bug Topic Reader: NullReferenceException when handling StopPartitionSessionRequest ([#528](https://github.com/ydb-platform/ydb-dotnet-sdk/issues/528)). diff --git a/src/Ydb.Sdk/src/Ado/PoolManager.cs b/src/Ydb.Sdk/src/Ado/PoolManager.cs index 3f14c285..6438c2fc 100644 --- a/src/Ydb.Sdk/src/Ado/PoolManager.cs +++ b/src/Ydb.Sdk/src/Ado/PoolManager.cs @@ -35,12 +35,9 @@ CancellationToken cancellationToken : Drivers[settings.GrpcConnectionString] = await settings.BuildDriver(); driver.RegisterOwner(); - var factory = new PoolingSessionFactory(driver, settings); - var newSessionPool = new PoolingSessionSource(factory, settings); - - Pools[settings.ConnectionString] = newSessionPool; - - return newSessionPool; + return Pools[settings.ConnectionString] = settings.EnableImplicitSession + ? new ImplicitSessionSource(driver, settings.LoggerFactory) + : new PoolingSessionSource(new PoolingSessionFactory(driver, settings), settings); } finally { diff --git a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs index 2a3c1e0e..3deabb27 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs @@ -5,9 +5,12 @@ namespace Ydb.Sdk.Ado.Session; internal class ImplicitSession : ISession { - public ImplicitSession(IDriver driver) + private readonly ImplicitSessionSource _source; + + public ImplicitSession(IDriver driver, ImplicitSessionSource source) { Driver = driver; + _source = source; } public IDriver Driver { get; } @@ -47,9 +50,7 @@ public void OnNotSuccessStatusCode(StatusCode code) { } - public void Dispose() - { - } + public void Dispose() => _source.ReleaseLease(); - private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit sessions"); + private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit session"); } diff --git a/src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs new file mode 100644 index 00000000..66d9c870 --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs @@ -0,0 +1,79 @@ +using Microsoft.Extensions.Logging; + +namespace Ydb.Sdk.Ado.Session; + +internal sealed class ImplicitSessionSource : ISessionSource +{ + private const int DisposeTimeoutSeconds = 10; + + private readonly IDriver _driver; + private readonly ILogger _logger; + private readonly TaskCompletionSource _drainedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); + + private int _isDisposed; + private int _activeLeaseCount; + + internal ImplicitSessionSource(IDriver driver, ILoggerFactory loggerFactory) + { + _driver = driver; + _logger = loggerFactory.CreateLogger(); + } + + public ValueTask OpenSession(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + return TryAcquireLease() + ? new ValueTask(new ImplicitSession(_driver, this)) + : throw new ObjectDisposedException(nameof(ImplicitSessionSource)); + } + + private bool TryAcquireLease() + { + Interlocked.Increment(ref _activeLeaseCount); + + if (Volatile.Read(ref _isDisposed) == 0) + return true; + + ReleaseLease(); + return false; + } + + internal void ReleaseLease() + { + if (Interlocked.Decrement(ref _activeLeaseCount) == 0 && Volatile.Read(ref _isDisposed) == 1) + _drainedTcs.TrySetResult(); + } + + public async ValueTask DisposeAsync() + { + if (Interlocked.CompareExchange(ref _isDisposed, 1, 0) != 0) + return; + + try + { + if (Volatile.Read(ref _activeLeaseCount) != 0) + { + await _drainedTcs.Task.WaitAsync(TimeSpan.FromSeconds(DisposeTimeoutSeconds)); + } + } + catch (TimeoutException) + { + _logger.LogCritical("Disposal timed out: Some implicit sessions are still active"); + + throw new YdbException("Timeout while disposing of the pool: some implicit sessions are still active. " + + "This may indicate a connection leak or suspended operations."); + } + finally + { + try + { + await _driver.DisposeAsync(); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to dispose the transport driver"); + } + } + } +} diff --git a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs index d5ce7f51..79ef36e7 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs @@ -40,6 +40,7 @@ private void InitDefaultValues() _maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize; _disableDiscovery = GrpcDefaultSettings.DisableDiscovery; _disableServerBalancer = false; + _enableImplicitSession = false; } public string Host @@ -314,6 +315,18 @@ public int CreateSessionTimeout private int _createSessionTimeout; + public bool EnableImplicitSession + { + get => _enableImplicitSession; + set + { + _enableImplicitSession = value; + SaveValue(nameof(EnableImplicitSession), value); + } + } + + private bool _enableImplicitSession; + public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance; public ICredentialsProvider? CredentialsProvider { get; init; } @@ -495,6 +508,9 @@ static YdbConnectionOption() AddOption(new YdbConnectionOption(BoolExtractor, (builder, disableServerBalancer) => builder.DisableServerBalancer = disableServerBalancer), "DisableServerBalancer", "Disable Server Balancer"); + AddOption(new YdbConnectionOption(BoolExtractor, + (builder, enableImplicitSession) => builder.EnableImplicitSession = enableImplicitSession), + "EnableImplicitSession", "Enable Implicit Session"); } private static void AddOption(YdbConnectionOption option, params string[] keys) diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs new file mode 100644 index 00000000..aac23983 --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs @@ -0,0 +1,70 @@ +using Moq; +using Xunit; +using Ydb.Sdk.Ado.Session; +using Ydb.Sdk.Ado.Tests.Utils; + +namespace Ydb.Sdk.Ado.Tests.Session; + +public class YdbImplicitStressTests +{ + private volatile bool _isDisposed; + + private IDriver DummyDriver() + { + var m = new Mock(MockBehavior.Loose); + m.Setup(d => d.DisposeAsync()) + .Callback(() => _isDisposed = true) + .Returns(ValueTask.CompletedTask); + return m.Object; + } + + [Fact] + public async Task StressTest_OpenSession_RaceWithDispose_SuccessfulOpensAreNotDisposed() + { + for (var it = 0; it < 1000; it++) + { + var driver = DummyDriver(); + var source = new ImplicitSessionSource(driver, TestUtils.LoggerFactory); + + var workers = Enumerable.Range(0, 1000).Select(async _ => + { + await Task.Delay(Random.Shared.Next(0, 5)); + try + { + using var s = await source.OpenSession(CancellationToken.None); + Assert.False(_isDisposed); + } + catch (ObjectDisposedException) + { + } + }).ToArray(); + + await Task.WhenAll(workers.Append(Task.Run(async () => + { + await Task.Delay(Random.Shared.Next(0, 3)); + await source.DisposeAsync(); + }))); + + Assert.True(_isDisposed); + await Assert.ThrowsAsync(() => + source.OpenSession(CancellationToken.None).AsTask()); + _isDisposed = false; + } + } + + [Fact] + public async Task DisposeAsync_WhenSessionIsLeaked_ThrowsYdbExceptionWithTimeoutMessage() + { + var driver = DummyDriver(); + var source = new ImplicitSessionSource(driver, TestUtils.LoggerFactory); +#pragma warning disable CA2012 + _ = source.OpenSession(CancellationToken.None); +#pragma warning restore CA2012 + + Assert.Equal("Timeout while disposing of the pool: some implicit sessions are still active. " + + "This may indicate a connection leak or suspended operations.", + (await Assert.ThrowsAsync(async () => await source.DisposeAsync())).Message); + Assert.True(_isDisposed); + await Assert.ThrowsAsync(() => source.OpenSession(CancellationToken.None).AsTask()); + } +} diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs index e322da34..21b2f97e 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs @@ -204,7 +204,6 @@ public async Task ExecuteScalar_WhenSelectNoRows_ReturnNull() .ExecuteScalarAsync()); } - public class Data(DbType dbType, T expected, bool isNullable = false) { public bool IsNullable { get; } = isNullable || expected == null; diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs index 87f02ea3..8a7aa70a 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs @@ -30,6 +30,7 @@ public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnection Assert.False(ydbConnectionStringBuilder.DisableDiscovery); Assert.False(ydbConnectionStringBuilder.DisableServerBalancer); Assert.False(ydbConnectionStringBuilder.UseTls); + Assert.False(ydbConnectionStringBuilder.EnableImplicitSession); Assert.Equal("UseTls=False;Host=localhost;Port=2136;Database=/local;User=;Password=;ConnectTimeout=5;" + "KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" + @@ -54,7 +55,7 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection "Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;MinSessionPool=10;MaxSessionPool=50;" + "CreateSessionTimeout=30;SessionIdleTimeout=600;ConnectTimeout=30;KeepAlivePingDelay=30;" + "KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=true;MaxSendMessageSize=1000000;" + - "MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;" + "MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;EnableImplicitSession=true;" ); Assert.Equal(2135, ydbConnectionStringBuilder.Port); @@ -78,9 +79,11 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection "ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" + "EnableMultipleHttp2Connections=True;" + "MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" + - "DisableDiscovery=True;DisableServerBalancer=True", ydbConnectionStringBuilder.ConnectionString); + "DisableDiscovery=True;DisableServerBalancer=True;EnableImplicitSession=True", + ydbConnectionStringBuilder.ConnectionString); Assert.True(ydbConnectionStringBuilder.DisableDiscovery); Assert.True(ydbConnectionStringBuilder.DisableServerBalancer); + Assert.True(ydbConnectionStringBuilder.EnableImplicitSession); Assert.Equal("UseTls=True;Host=server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=30;" + "KeepAlivePingDelay=30;KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=True;" + "MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;DisableDiscovery=True", diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs index 3cf2d07a..0779f652 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs @@ -473,4 +473,51 @@ public async Task BulkUpsertImporter_ThrowsOnNonexistentTable() await Assert.ThrowsAsync(async () => { await importer.FlushAsync(); }); } + + [Fact] + public async Task ClearPool_FireAndForget_DoesNotBlock_And_PoolsRecreate() + { + var csPooled = ConnectionString + + ";UseTls=false;DisableDiscovery=true" + + ";CreateSessionTimeout=3;ConnectTimeout=3" + + ";KeepAlivePingDelay=0;KeepAlivePingTimeout=0"; + var csImplicit = csPooled + ";EnableImplicitSession=true"; + + await using (var warmPooled = new YdbConnection(csPooled)) + { + await warmPooled.OpenAsync(); + await using var cmd = warmPooled.CreateCommand(); + cmd.CommandText = "SELECT 1"; + Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync())); + } + + await using (var warmImplicit = new YdbConnection(csImplicit)) + { + await warmImplicit.OpenAsync(); + await using var cmd = warmImplicit.CreateCommand(); + cmd.CommandText = "SELECT 1"; + Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync())); + } + + var clearPooledTask = YdbConnection.ClearPool(new YdbConnection(csPooled)); + var clearImplicitTask = YdbConnection.ClearPool(new YdbConnection(csImplicit)); + + await Task.WhenAll(clearPooledTask, clearImplicitTask); + + await using (var checkPooled = new YdbConnection(csPooled)) + { + await checkPooled.OpenAsync(); + await using var cmd = checkPooled.CreateCommand(); + cmd.CommandText = "SELECT 1"; + Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync())); + } + + await using (var checkImplicit = new YdbConnection(csImplicit)) + { + await checkImplicit.OpenAsync(); + await using var cmd = checkImplicit.CreateCommand(); + cmd.CommandText = "SELECT 1"; + Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync())); + } + } } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbImplicitConnectionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbImplicitConnectionTests.cs new file mode 100644 index 00000000..04b790ce --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbImplicitConnectionTests.cs @@ -0,0 +1,98 @@ +using Xunit; +using Ydb.Sdk.Ado.Session; + +namespace Ydb.Sdk.Ado.Tests; + +public class YdbImplicitConnectionTests : TestBase +{ + [Fact] + public async Task EnableImplicitSession_WithEnableDiscovery_Works() + { + await using var ydbConnection = new YdbConnection(ConnectionString + + ";EnableImplicitSession=true;DisableDiscovery=true"); + await ydbConnection.OpenAsync(); + Assert.Equal(1, await new YdbCommand(ydbConnection) { CommandText = "SELECT 1;" }.ExecuteScalarAsync()); + } + + [Fact] + public async Task EnableImplicitSession_WhenFalse_AlwaysUsesPooledSession() + { + await using var ydbConnection = new YdbConnection(ConnectionString + ";EnableImplicitSession=false"); + await ydbConnection.OpenAsync(); + Assert.Equal(1, await new YdbCommand(ydbConnection) { CommandText = "SELECT 1;" }.ExecuteScalarAsync()); + Assert.IsNotType(ydbConnection.Session); + } + + [Fact] + public async Task ImplicitSession_SimpleScalar_Works() + { + await using var ydbConnection = CreateConnection(); + ydbConnection.ConnectionString += ";EnableImplicitSession=true"; + await ydbConnection.OpenAsync(); + Assert.Equal(42, await new YdbCommand(ydbConnection) { CommandText = "SELECT 40 + 2;" }.ExecuteScalarAsync()); + } + + [Fact] + public async Task ImplicitSession_RepeatedScalars_WorksManyTimes() + { + await using var ydbConnection = CreateConnection(); + ydbConnection.ConnectionString += ";EnableImplicitSession=true"; + await ydbConnection.OpenAsync(); + + for (var i = 0; i < 30; i++) + { + Assert.Equal(i, await new YdbCommand(ydbConnection) { CommandText = $"SELECT {i}" }.ExecuteScalarAsync()); + } + } + + [Fact] + public void ImplicitSession_ConcurrentCommand_IsStillBlockedByBusyCheck() + { + using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + connection.Open(); + + var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT 1; SELECT 1;"; + using var reader = cmd.ExecuteReader(); + + var ex = Assert.Throws(() => cmd.ExecuteReader()); + Assert.Equal("A command is already in progress: SELECT 1; SELECT 1;", ex.Message); + } + + [Fact] + public async Task ImplicitSession_Cancellation_AfterFirstResult_StillReturnsFirst() + { + await using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + await connection.OpenAsync(); + + var cmd = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1;" }; + using var cts = new CancellationTokenSource(); + + var reader = await cmd.ExecuteReaderAsync(cts.Token); + + await reader.ReadAsync(cts.Token); + Assert.Equal(1, reader.GetValue(0)); + Assert.True(await reader.NextResultAsync(cts.Token)); + + await cts.CancelAsync(); + + await reader.ReadAsync(cts.Token); + Assert.Equal(1, reader.GetValue(0)); + Assert.False(await reader.NextResultAsync()); + } + + [Fact] + public async Task ImplicitSession_WithExplicitTransaction_UsesExplicitSessionAndCommits() + { + await using var ydbConnection = CreateConnection(); + ydbConnection.ConnectionString += ";EnableImplicitSession=true"; + await ydbConnection.OpenAsync(); + await using var transaction = ydbConnection.BeginTransaction(); + + Assert.Equal("Transactions are not supported in implicit session", + (await Assert.ThrowsAsync(async () => await new YdbCommand(ydbConnection) + { CommandText = "SELECT 1" }.ExecuteScalarAsync())).Message); + } +}