diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index 78c5929b..3cc77da9 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,3 +1,5 @@ +- Added provider support for implicit sessions. + ## v0.23.1 - Fixed bug Topic Reader: NullReferenceException when handling StopPartitionSessionRequest ([#528](https://github.com/ydb-platform/ydb-dotnet-sdk/issues/528)). diff --git a/src/Ydb.Sdk/src/Ado/PoolManager.cs b/src/Ydb.Sdk/src/Ado/PoolManager.cs index 3f14c285..789af937 100644 --- a/src/Ydb.Sdk/src/Ado/PoolManager.cs +++ b/src/Ydb.Sdk/src/Ado/PoolManager.cs @@ -33,10 +33,20 @@ CancellationToken cancellationToken !cacheDriver.IsDisposed ? cacheDriver : Drivers[settings.GrpcConnectionString] = await settings.BuildDriver(); + driver.RegisterOwner(); - var factory = new PoolingSessionFactory(driver, settings); - var newSessionPool = new PoolingSessionSource(factory, settings); + ISessionSource newSessionPool; + + if (settings.MaxSessionPool > 0) + { + var factory = new PoolingSessionFactory(driver, settings); + newSessionPool = new PoolingSessionSource(factory, settings); + } + else + { + newSessionPool = new ImplicitSessionSource(driver); + } Pools[settings.ConnectionString] = newSessionPool; diff --git a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs index 2a3c1e0e..34fab45d 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs @@ -5,9 +5,12 @@ namespace Ydb.Sdk.Ado.Session; internal class ImplicitSession : ISession { - public ImplicitSession(IDriver driver) + private readonly ImplicitSessionSource _source; + + public ImplicitSession(IDriver driver, ImplicitSessionSource source) { Driver = driver; + _source = source; } public IDriver Driver { get; } @@ -47,9 +50,7 @@ public void OnNotSuccessStatusCode(StatusCode code) { } - public void Dispose() - { - } + public void Dispose() => _source.ReleaseLease(); private static YdbException NotSupportedTransaction => new("Transactions are not supported in implicit sessions"); } diff --git a/src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs new file mode 100644 index 00000000..b40b93e2 --- /dev/null +++ b/src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs @@ -0,0 +1,71 @@ +namespace Ydb.Sdk.Ado.Session; + +internal sealed class ImplicitSessionSource : ISessionSource +{ + private readonly IDriver _driver; + private readonly ManualResetEventSlim _allReleased = new(false); + + private int _state; + private int _activeLeaseCount; + + internal ImplicitSessionSource(IDriver driver) + { + _driver = driver; + } + + public ValueTask OpenSession(CancellationToken cancellationToken) + { + cancellationToken.ThrowIfCancellationRequested(); + + if (!TryAcquireLease()) + throw new ObjectDisposedException(nameof(ImplicitSessionSource)); + + return new ValueTask(new ImplicitSession(_driver, this)); + } + + private bool TryAcquireLease() + { + if (Volatile.Read(ref _state) == 2) + return false; + + var newCount = Interlocked.Increment(ref _activeLeaseCount); + + var state = Volatile.Read(ref _state); + + if (state == 2 || (state == 1 && newCount == 1)) + { + Interlocked.Decrement(ref _activeLeaseCount); + return false; + } + + return true; + } + + internal void ReleaseLease() + { + if (Interlocked.Decrement(ref _activeLeaseCount) == 0 && + Volatile.Read(ref _state) != 0) + { + _allReleased.Set(); + } + } + + public async ValueTask DisposeAsync() + { + if (Interlocked.CompareExchange(ref _state, 1, 0) != 0) + return; + + if (Volatile.Read(ref _activeLeaseCount) != 0) + _allReleased.Wait(); + + try + { + Volatile.Write(ref _state, 2); + await _driver.DisposeAsync(); + } + finally + { + _allReleased.Dispose(); + } + } +} diff --git a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs index d5ce7f51..367764ac 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs @@ -40,6 +40,7 @@ private void InitDefaultValues() _maxReceiveMessageSize = GrpcDefaultSettings.MaxReceiveMessageSize; _disableDiscovery = GrpcDefaultSettings.DisableDiscovery; _disableServerBalancer = false; + _enableImplicitSession = false; } public string Host @@ -314,6 +315,18 @@ public int CreateSessionTimeout private int _createSessionTimeout; + public bool EnableImplicitSession + { + get => _enableImplicitSession; + set + { + _enableImplicitSession = value; + SaveValue(nameof(EnableImplicitSession), value); + } + } + + private bool _enableImplicitSession; + public ILoggerFactory LoggerFactory { get; init; } = NullLoggerFactory.Instance; public ICredentialsProvider? CredentialsProvider { get; init; } @@ -495,6 +508,9 @@ static YdbConnectionOption() AddOption(new YdbConnectionOption(BoolExtractor, (builder, disableServerBalancer) => builder.DisableServerBalancer = disableServerBalancer), "DisableServerBalancer", "Disable Server Balancer"); + AddOption(new YdbConnectionOption(BoolExtractor, + (builder, enableImplicitSession) => builder.EnableImplicitSession = enableImplicitSession), + "EnableImplicitSession", "ImplicitSession"); } private static void AddOption(YdbConnectionOption option, params string[] keys) diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs new file mode 100644 index 00000000..f837a229 --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs @@ -0,0 +1,154 @@ +using Moq; +using Xunit; +using Ydb.Sdk.Ado.Session; + +namespace Ydb.Sdk.Ado.Tests.Session; + +public class YdbImplicitStressTests : TestBase +{ + private static IDriver DummyDriver() + { + var m = new Mock(MockBehavior.Loose); + m.Setup(d => d.DisposeAsync()).Returns(ValueTask.CompletedTask); + return m.Object; + } + + private sealed class Counter + { + public int Value; + public void Inc() => Interlocked.Increment(ref Value); + } + + [Fact(Timeout = 30_000)] + public async Task Dispose_WaitsForAllLeases_AndSignalsOnEmptyExactlyOnce() + { + var driver = DummyDriver(); + + var opened = new Counter(); + var closed = new Counter(); + + var source = new ImplicitSessionSource(driver); + + var workers = Enumerable.Range(0, 200).Select(async _ => + { + var rnd = Random.Shared; + for (var j = 0; j < 10; j++) + { + ISession s; + try + { + s = await source.OpenSession(CancellationToken.None); + opened.Inc(); + + await Task.Delay(rnd.Next(0, 5)); + } + catch (ObjectDisposedException) + { + return; + } + + var s2 = await source.OpenSession(CancellationToken.None); + s2.Dispose(); + + s.Dispose(); + closed.Inc(); + } + }).ToArray(); + + var disposer = Task.Run(async () => + { + await Task.Delay(10); + await source.DisposeAsync(); + }); + + await Task.WhenAll(workers.Append(disposer)); + + Assert.True(opened.Value > 0); + Assert.Equal(opened.Value, closed.Value); + + await Assert.ThrowsAsync(() => source.OpenSession(CancellationToken.None).AsTask()); + } + + [Fact(Timeout = 30_000)] + public async Task Stress_Counts_AreBalanced() + { + var driver = DummyDriver(); + + var opened = new Counter(); + var closed = new Counter(); + + var source = new ImplicitSessionSource(driver); + + var workers = Enumerable.Range(0, 200).Select(async _ => + { + var rnd = Random.Shared; + for (var j = 0; j < 10; j++) + { + ISession s; + try + { + s = await source.OpenSession(CancellationToken.None); + opened.Inc(); + + await Task.Delay(rnd.Next(0, 3)); + } + catch (ObjectDisposedException) + { + return; + } + + var s2 = await source.OpenSession(CancellationToken.None); + s2.Dispose(); + + s.Dispose(); + closed.Inc(); + } + }).ToArray(); + + var disposer = Task.Run(async () => await source.DisposeAsync()); + + await Task.WhenAll(workers.Append(disposer)); + + Assert.Equal(opened.Value, closed.Value); + Assert.True(opened.Value > 0); + + await Assert.ThrowsAsync(() => source.OpenSession(CancellationToken.None).AsTask()); + } + + [Fact(Timeout = 30_000)] + public async Task Open_RacingWithDispose_StateRemainsConsistent() + { + var driver = DummyDriver(); + + var source = new ImplicitSessionSource(driver); + + var opens = Enumerable.Range(0, 1000).Select(async _ => + { + ISession s; + try + { + s = await source.OpenSession(CancellationToken.None); + } + catch (ObjectDisposedException) + { + return 0; + } + + var s2 = await source.OpenSession(CancellationToken.None); + s2.Dispose(); + + s.Dispose(); + return 1; + }).ToArray(); + + var disposeTask = Task.Run(async () => + { + await Task.Yield(); + await source.DisposeAsync(); + }); + + await Task.WhenAll(opens.Append(disposeTask)); + + await Assert.ThrowsAsync(() => source.OpenSession(CancellationToken.None).AsTask()); + } +} diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs index e322da34..bc491a72 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbCommandTests.cs @@ -204,6 +204,122 @@ public async Task ExecuteScalar_WhenSelectNoRows_ReturnNull() .ExecuteScalarAsync()); } + [Fact] + public async Task ImplicitSession_SimpleScalar_Works() + { + await using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + await connection.OpenAsync(); + + var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT 40 + 2;"; + var scalar = await cmd.ExecuteScalarAsync(); + Assert.Equal(42, Convert.ToInt32(scalar)); + } + + [Fact] + public async Task ImplicitSession_RepeatedScalars_WorksManyTimes() + { + await using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + await connection.OpenAsync(); + + for (var i = 0; i < 30; i++) + { + var cmd = connection.CreateCommand(); + cmd.CommandText = $"SELECT {i};"; + var scalar = await cmd.ExecuteScalarAsync(); + Assert.Equal(i, Convert.ToInt32(scalar)); + } + } + + [Fact] + public void ImplicitSession_ConcurrentCommand_IsStillBlockedByBusyCheck() + { + using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + connection.Open(); + + var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT 1; SELECT 1;"; + using var reader = cmd.ExecuteReader(); + + var ex = Assert.Throws(() => cmd.ExecuteReader()); + Assert.Equal("A command is already in progress: SELECT 1; SELECT 1;", ex.Message); + } + + [Fact] + public async Task ImplicitSession_WithExplicitTransaction_UsesExplicitSessionAndCommits() + { + var table = $"Implicit_{Guid.NewGuid():N}"; + + await using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + await connection.OpenAsync(); + + try + { + await using (var create = connection.CreateCommand()) + { + create.CommandText = $""" + CREATE TABLE {table} ( + Id Int32, + Name Text, + PRIMARY KEY (Id) + ) + """; + await create.ExecuteNonQueryAsync(); + } + + var tx = connection.BeginTransaction(); + await using (var insert = connection.CreateCommand()) + { + insert.Transaction = tx; + insert.CommandText = $"INSERT INTO {table} (Id, Name) VALUES (1, 'A');"; + await insert.ExecuteNonQueryAsync(); + insert.CommandText = $"INSERT INTO {table} (Id, Name) VALUES (2, 'B');"; + await insert.ExecuteNonQueryAsync(); + } + + await tx.CommitAsync(); + + await using (var check = connection.CreateCommand()) + { + check.CommandText = $"SELECT COUNT(*) FROM {table};"; + var count = Convert.ToInt32(await check.ExecuteScalarAsync()); + Assert.Equal(2, count); + } + } + finally + { + await using var drop = connection.CreateCommand(); + drop.CommandText = $"DROP TABLE {table}"; + await drop.ExecuteNonQueryAsync(); + } + } + + [Fact] + public async Task ImplicitSession_Cancellation_AfterFirstResult_StillReturnsFirst() + { + await using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + await connection.OpenAsync(); + + var cmd = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1;" }; + using var cts = new CancellationTokenSource(); + + var reader = await cmd.ExecuteReaderAsync(cts.Token); + + await reader.ReadAsync(cts.Token); + Assert.Equal(1, reader.GetValue(0)); + Assert.True(await reader.NextResultAsync(cts.Token)); + + await cts.CancelAsync(); + + await reader.ReadAsync(cts.Token); + Assert.Equal(1, reader.GetValue(0)); + Assert.False(await reader.NextResultAsync()); + } public class Data(DbType dbType, T expected, bool isNullable = false) { diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs index 87f02ea3..8a7aa70a 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs @@ -30,6 +30,7 @@ public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnection Assert.False(ydbConnectionStringBuilder.DisableDiscovery); Assert.False(ydbConnectionStringBuilder.DisableServerBalancer); Assert.False(ydbConnectionStringBuilder.UseTls); + Assert.False(ydbConnectionStringBuilder.EnableImplicitSession); Assert.Equal("UseTls=False;Host=localhost;Port=2136;Database=/local;User=;Password=;ConnectTimeout=5;" + "KeepAlivePingDelay=10;KeepAlivePingTimeout=10;EnableMultipleHttp2Connections=False;" + @@ -54,7 +55,7 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection "Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;MinSessionPool=10;MaxSessionPool=50;" + "CreateSessionTimeout=30;SessionIdleTimeout=600;ConnectTimeout=30;KeepAlivePingDelay=30;" + "KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=true;MaxSendMessageSize=1000000;" + - "MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;" + "MaxReceiveMessageSize=1000000;DisableDiscovery=true;DisableServerBalancer=true;EnableImplicitSession=true;" ); Assert.Equal(2135, ydbConnectionStringBuilder.Port); @@ -78,9 +79,11 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection "ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" + "EnableMultipleHttp2Connections=True;" + "MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" + - "DisableDiscovery=True;DisableServerBalancer=True", ydbConnectionStringBuilder.ConnectionString); + "DisableDiscovery=True;DisableServerBalancer=True;EnableImplicitSession=True", + ydbConnectionStringBuilder.ConnectionString); Assert.True(ydbConnectionStringBuilder.DisableDiscovery); Assert.True(ydbConnectionStringBuilder.DisableServerBalancer); + Assert.True(ydbConnectionStringBuilder.EnableImplicitSession); Assert.Equal("UseTls=True;Host=server;Port=2135;Database=/my/path;User=Kirill;Password=;ConnectTimeout=30;" + "KeepAlivePingDelay=30;KeepAlivePingTimeout=60;EnableMultipleHttp2Connections=True;" + "MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;DisableDiscovery=True", diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs index 3cf2d07a..0e985d35 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs @@ -1,5 +1,6 @@ using System.Data; using Xunit; +using Ydb.Sdk.Ado.Session; using Ydb.Sdk.Ado.Tests.Utils; using Ydb.Sdk.Ado.YdbType; @@ -473,4 +474,142 @@ public async Task BulkUpsertImporter_ThrowsOnNonexistentTable() await Assert.ThrowsAsync(async () => { await importer.FlushAsync(); }); } + + [Fact] + public async Task EnableImplicitSession_WhenTrue_ButInsideTransaction_UsesPooledSession() + { + var cs = ConnectionString + ";EnableImplicitSession=true"; + + await using var conn = new YdbConnection(cs); + await conn.OpenAsync(); + + using var tx = conn.BeginTransaction(); + var cmd = conn.CreateCommand(); + cmd.Transaction = tx; + cmd.CommandText = "SELECT 1"; + var result = Convert.ToInt64(await cmd.ExecuteScalarAsync()); + Assert.Equal(1L, result); + + Assert.IsNotType(conn.Session); + } + + [Fact] + public async Task EnableImplicitSession_WhenFalse_AlwaysUsesPooledSession() + { + var cs = ConnectionString + ";EnableImplicitSession=false"; + + await using var conn = new YdbConnection(cs); + await conn.OpenAsync(); + + var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT CAST(1 AS Int64)"; + var result = (long)(await cmd.ExecuteScalarAsync())!; + Assert.Equal(1L, result); + + Assert.IsNotType(conn.Session); + } + + [Fact] + public async Task EnableImplicitSession_DifferentConnectionStrings_HaveDifferentImplicitPools() + { + var cs1 = ConnectionString + ";EnableImplicitSession=true;MinSessionPool=0;DisableDiscovery=false"; + var cs2 = ConnectionString + ";EnableImplicitSession=true;MinSessionPool=1;DisableDiscovery=false"; + + await using var conn1 = new YdbConnection(cs1); + await conn1.OpenAsync(); + var s1 = conn1.Session; + + await using var conn2 = new YdbConnection(cs2); + await conn2.OpenAsync(); + var s2 = conn2.Session; + + Assert.NotEqual(s1, s2); + } + + [Fact] + public async Task EnableImplicitSession_TwoSequentialCommands_ReusesSameSession() + { + var cs = ConnectionString + ";EnableImplicitSession=true"; + await using var conn = new YdbConnection(cs); + await conn.OpenAsync(); + + var cmd1 = conn.CreateCommand(); + cmd1.CommandText = "SELECT 1;"; + await cmd1.ExecuteScalarAsync(); + + var s1 = conn.Session; + + var cmd2 = conn.CreateCommand(); + cmd2.CommandText = "SELECT 2;"; + await cmd2.ExecuteScalarAsync(); + + var s2 = conn.Session; + + Assert.Equal(s1, s2); + } + + [Fact] + public async Task ClearPool_FireAndForget_DoesNotBlock_And_PoolsRecreate() + { + var csBase = + ConnectionString + + ";UseTls=false" + + ";DisableDiscovery=true" + + ";CreateSessionTimeout=3" + + ";ConnectTimeout=3" + + ";KeepAlivePingDelay=0;KeepAlivePingTimeout=0"; + + var csPooled = csBase; + var csImplicit = csBase + ";EnableImplicitSession=true"; + + await using (var warmPooled = new YdbConnection(csPooled)) + { + await warmPooled.OpenAsync(); + using var cmd = warmPooled.CreateCommand(); + cmd.CommandText = "SELECT 1"; + Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync())); + } + + await using (var warmImplicit = new YdbConnection(csImplicit)) + { + await warmImplicit.OpenAsync(); + using var cmd = warmImplicit.CreateCommand(); + cmd.CommandText = "SELECT 1"; + Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync())); + } + + var clearPooledTask = YdbConnection.ClearPool(new YdbConnection(csPooled)); + var clearImplicitTask = YdbConnection.ClearPool(new YdbConnection(csImplicit)); + + var done = await Task.WhenAny(Task.WhenAll(clearPooledTask, clearImplicitTask), + Task.Delay(TimeSpan.FromSeconds(2))); + Assert.True(done != Task.Delay(TimeSpan.FromSeconds(2)), "ClearPool() must not block."); + + await using (var checkPooled = new YdbConnection(csPooled)) + { + await checkPooled.OpenAsync(); + await using var cmd = checkPooled.CreateCommand(); + cmd.CommandText = "SELECT 1"; + Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync())); + } + + await using (var checkImplicit = new YdbConnection(csImplicit)) + { + await checkImplicit.OpenAsync(); + await using var cmd = checkImplicit.CreateCommand(); + cmd.CommandText = "SELECT 1"; + Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync())); + } + } + + [Fact] + public async Task EnableImplicitSession_WithDisableDiscovery_Works() + { + var cs = ConnectionString + ";EnableImplicitSession=true;DisableDiscovery=true"; + await using var conn = new YdbConnection(cs); + await conn.OpenAsync(); + using var cmd = conn.CreateCommand(); + cmd.CommandText = "SELECT 1"; + Assert.Equal(1L, Convert.ToInt64(await cmd.ExecuteScalarAsync())); + } } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbImplictConnectionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbImplictConnectionTests.cs new file mode 100644 index 00000000..c64f6a28 --- /dev/null +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbImplictConnectionTests.cs @@ -0,0 +1,73 @@ +using Xunit; + +namespace Ydb.Sdk.Ado.Tests; + +public class YdbImplictConnectionTests : TestBase +{ + [Fact] + public async Task ImplicitSession_SimpleScalar_Works() + { + await using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + await connection.OpenAsync(); + + var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT 40 + 2;"; + var scalar = await cmd.ExecuteScalarAsync(); + Assert.Equal(42, Convert.ToInt32(scalar)); + } + + [Fact] + public async Task ImplicitSession_RepeatedScalars_WorksManyTimes() + { + await using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + await connection.OpenAsync(); + + for (var i = 0; i < 30; i++) + { + var cmd = connection.CreateCommand(); + cmd.CommandText = $"SELECT {i};"; + var scalar = await cmd.ExecuteScalarAsync(); + Assert.Equal(i, Convert.ToInt32(scalar)); + } + } + + [Fact] + public void ImplicitSession_ConcurrentCommand_IsStillBlockedByBusyCheck() + { + using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + connection.Open(); + + var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT 1; SELECT 1;"; + using var reader = cmd.ExecuteReader(); + + var ex = Assert.Throws(() => cmd.ExecuteReader()); + Assert.Equal("A command is already in progress: SELECT 1; SELECT 1;", ex.Message); + } + + [Fact] + public async Task ImplicitSession_Cancellation_AfterFirstResult_StillReturnsFirst() + { + await using var connection = CreateConnection(); + connection.ConnectionString += ";EnableImplicitSession=true"; + await connection.OpenAsync(); + + var cmd = new YdbCommand(connection) { CommandText = "SELECT 1; SELECT 1;" }; + using var cts = new CancellationTokenSource(); + + var reader = await cmd.ExecuteReaderAsync(cts.Token); + + await reader.ReadAsync(cts.Token); + Assert.Equal(1, reader.GetValue(0)); + Assert.True(await reader.NextResultAsync(cts.Token)); + + await cts.CancelAsync(); + + await reader.ReadAsync(cts.Token); + Assert.Equal(1, reader.GetValue(0)); + Assert.False(await reader.NextResultAsync()); + } +}