From c9ed8503ff07a07227c5bc640010ca1bdf27817e Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Thu, 17 Jul 2025 13:20:21 +0300 Subject: [PATCH 1/6] feat: PoolingSessionSource 2.0 based on Npgsql pooling algorithm --- src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs | 4 ++-- src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs b/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs index 9669afb1..bbe3ce0d 100644 --- a/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs +++ b/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs @@ -232,7 +232,7 @@ StringBuilder yql return startToken; // rollback parse IN LIST } - yql.Append(listStartToken > startToken ? sql[startToken .. listStartToken] : ' '); + yql.Append(listStartToken > startToken ? sql[startToken..listStartToken] : ' '); var paramListName = sqlParamsBuilder.AddListPrimitiveParams(findNameParams); yql.Append(paramListName); @@ -273,7 +273,7 @@ private static (string Name, int NextToken) ParseNameParam(string sql, int curTo throw new YdbException($"Have empty name parameter, invalid SQL [position: {prevToken}]"); } - return ($"${sql[prevToken .. curToken]}", curToken); + return ($"${sql[prevToken..curToken]}", curToken); } private static bool IsSqlIdentifierChar(this char c) => char.IsLetterOrDigit(c) || c == '_'; diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs index 2002ec69..babf7780 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs @@ -1,3 +1,8 @@ +// This file contains session pooling algorithms adapted from Npgsql +// Original source: https://github.com/npgsql/npgsql +// Copyright (c) 2002-2025, Npgsql +// Licence https://github.com/npgsql/npgsql?tab=PostgreSQL-1-ov-file + using Microsoft.Extensions.Logging; using Ydb.Query; using Ydb.Query.V1; From 7dd5c88007c3b66d46e498916e42307c1e6cadc0 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 22 Jul 2025 17:04:02 +0300 Subject: [PATCH 2/6] dev: Prepare ISession for switching implementation. --- src/Ydb.Sdk/src/Ado/PoolManager.cs | 3 +- src/Ydb.Sdk/src/Ado/Session/ISession.cs | 2 + .../src/Ado/Session/ImplicitSession.cs | 7 +- src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs | 27 ++++--- src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs | 12 ---- src/Ydb.Sdk/src/Ado/YdbCommand.cs | 4 +- src/Ydb.Sdk/src/Ado/YdbConnection.cs | 9 +-- .../src/Ado/YdbConnectionStringBuilder.cs | 8 +-- src/Ydb.Sdk/src/Ado/YdbSchema.cs | 60 +++++++++------- src/Ydb.Sdk/src/Pool/SessionPool.cs | 11 +-- src/Ydb.Sdk/src/Services/Query/SessionPool.cs | 70 +++++++------------ .../SessionSourceBenchmark.cs | 2 + .../StressTestTank.cs | 7 +- .../Session/PoolingSessionSourceMockTests.cs | 15 ++-- 14 files changed, 109 insertions(+), 128 deletions(-) delete mode 100644 src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs diff --git a/src/Ydb.Sdk/src/Ado/PoolManager.cs b/src/Ydb.Sdk/src/Ado/PoolManager.cs index ed2712d6..a8f787b4 100644 --- a/src/Ydb.Sdk/src/Ado/PoolManager.cs +++ b/src/Ydb.Sdk/src/Ado/PoolManager.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using Ydb.Sdk.Ado.Session; using Ydb.Sdk.Pool; using Ydb.Sdk.Services.Query; @@ -9,7 +10,7 @@ internal static class PoolManager private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex private static readonly ConcurrentDictionary Pools = new(); - internal static async Task GetSession( + internal static async Task GetSession( YdbConnectionStringBuilder connectionString, CancellationToken cancellationToken ) diff --git a/src/Ydb.Sdk/src/Ado/Session/ISession.cs b/src/Ydb.Sdk/src/Ado/Session/ISession.cs index fc8c70ba..1aec418b 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ISession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ISession.cs @@ -6,6 +6,8 @@ namespace Ydb.Sdk.Ado.Session; internal interface ISession { + IDriver Driver { get; } + bool IsBroken { get; } ValueTask> ExecuteQuery( diff --git a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs index 182dbf44..45a251f7 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs @@ -6,13 +6,12 @@ namespace Ydb.Sdk.Ado.Session; internal class ImplicitSession : ISession { - private readonly IDriver _driver; - public ImplicitSession(IDriver driver) { - _driver = driver; + Driver = driver; } + public IDriver Driver { get; } public bool IsBroken => false; public ValueTask> ExecuteQuery( @@ -36,7 +35,7 @@ public ValueTask> ExecuteQuery( }; request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto())); - return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); + return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); } public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) => diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs index babf7780..ed85d411 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs @@ -20,9 +20,9 @@ internal class PoolingSession : IPoolingSession private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5); private static readonly CreateSessionRequest CreateSessionRequest = new(); - private readonly IDriver _driver; private readonly PoolingSessionSource _poolingSessionSource; private readonly ILogger _logger; + private readonly CancellationTokenSource _attachStreamLifecycleCts = new(); private volatile bool _isBroken = true; @@ -31,6 +31,7 @@ internal class PoolingSession : IPoolingSession private string SessionId { get; set; } = string.Empty; private long NodeId { get; set; } + public IDriver Driver { get; } public bool IsBroken => _isBroken; internal PoolingSession( @@ -40,10 +41,10 @@ internal PoolingSession( ILogger logger ) { - _driver = driver; _poolingSessionSource = poolingSessionSource; _disableServerBalancer = disableServerBalancer; _logger = logger; + Driver = driver; } public ValueTask> ExecuteQuery( @@ -65,7 +66,7 @@ public ValueTask> ExecuteQuery( }; request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto())); - return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); + return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); } public async Task CommitTransaction( @@ -73,7 +74,7 @@ public async Task CommitTransaction( CancellationToken cancellationToken = default ) { - var response = await _driver.UnaryCall( + var response = await Driver.UnaryCall( QueryService.CommitTransactionMethod, new CommitTransactionRequest { SessionId = SessionId, TxId = txId }, new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId } @@ -90,7 +91,7 @@ public async Task RollbackTransaction( CancellationToken cancellationToken = default ) { - var response = await _driver.UnaryCall( + var response = await Driver.UnaryCall( QueryService.RollbackTransactionMethod, new RollbackTransactionRequest { SessionId = SessionId, TxId = txId }, new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId } @@ -126,7 +127,7 @@ public async Task Open(CancellationToken cancellationToken) requestSettings.ClientCapabilities.Add(SessionBalancer); } - var response = await _driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings); + var response = await Driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings); if (response.Status.IsNotSuccess()) { @@ -143,7 +144,7 @@ public async Task Open(CancellationToken cancellationToken) { try { - using var stream = await _driver.ServerStreamCall( + using var stream = await Driver.ServerStreamCall( QueryService.AttachSessionMethod, new AttachSessionRequest { SessionId = SessionId }, new GrpcRequestSettings { NodeId = NodeId } @@ -166,10 +167,12 @@ public async Task Open(CancellationToken cancellationToken) completeTask.SetResult(); + var lifecycleAttachToken = _attachStreamLifecycleCts.Token; + try { // ReSharper disable once MethodSupportsCancellation - while (await stream.MoveNextAsync()) + while (await stream.MoveNextAsync(lifecycleAttachToken)) { var sessionState = stream.Current; @@ -220,14 +223,10 @@ public async Task DeleteSession() { try { - if (_isBroken) - { - return; - } - _isBroken = true; + _attachStreamLifecycleCts.CancelAfter(DeleteSessionTimeout); - var deleteSessionResponse = await _driver.UnaryCall( + var deleteSessionResponse = await Driver.UnaryCall( QueryService.DeleteSessionMethod, new DeleteSessionRequest { SessionId = SessionId }, new GrpcRequestSettings { TransportTimeout = DeleteSessionTimeout, NodeId = NodeId } diff --git a/src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs b/src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs deleted file mode 100644 index 6c20df0b..00000000 --- a/src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace Ydb.Sdk.Ado; - -internal static class YdbAdoDefaultSettings -{ - internal const string Host = "localhost"; - - internal const int Port = 2136; - - internal const string Database = "/local"; - - internal const bool UseTls = false; -} diff --git a/src/Ydb.Sdk/src/Ado/YdbCommand.cs b/src/Ydb.Sdk/src/Ado/YdbCommand.cs index 844343b5..ec49c0fa 100644 --- a/src/Ydb.Sdk/src/Ado/YdbCommand.cs +++ b/src/Ydb.Sdk/src/Ado/YdbCommand.cs @@ -206,8 +206,8 @@ protected override async Task ExecuteDbDataReaderAsync(CommandBeha preparedSql.Append(sql); var execSettings = CommandTimeout > 0 - ? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) } - : new ExecuteQuerySettings(); + ? new GrpcRequestSettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) } + : new GrpcRequestSettings(); var transaction = YdbConnection.CurrentTransaction; diff --git a/src/Ydb.Sdk/src/Ado/YdbConnection.cs b/src/Ydb.Sdk/src/Ado/YdbConnection.cs index 8cac45c2..ce2e2c43 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnection.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnection.cs @@ -1,6 +1,7 @@ using System.Data; using System.Data.Common; using System.Diagnostics.CodeAnalysis; +using Ydb.Sdk.Ado.Session; using Ydb.Sdk.Services.Query; using static System.Data.IsolationLevel; @@ -24,7 +25,7 @@ private YdbConnectionStringBuilder ConnectionStringBuilder [param: AllowNull] init => _connectionStringBuilder = value; } - internal Services.Query.Session Session + internal ISession Session { get { @@ -35,7 +36,7 @@ internal Services.Query.Session Session private set => _session = value; } - private Services.Query.Session _session = null!; + private ISession _session = null!; public YdbConnection() { @@ -124,7 +125,7 @@ public override async Task CloseAsync() } finally { - await _session.Release(); + _session.Close(); } } @@ -152,7 +153,7 @@ internal void OnNotSuccessStatusCode(StatusCode code) { _session.OnNotSuccessStatusCode(code); - if (!_session.IsActive) + if (_session.IsBroken) { ConnectionState = ConnectionState.Broken; } diff --git a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs index 68c5849f..4f5dab33 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs @@ -25,15 +25,15 @@ public YdbConnectionStringBuilder(string connectionString) // Init default connection string private void InitDefaultValues() { - _host = YdbAdoDefaultSettings.Host; - _port = YdbAdoDefaultSettings.Port; - _database = YdbAdoDefaultSettings.Database; + _host = "localhost"; + _port = 2136; + _database = "/local"; _minSessionPool = 0; _maxSessionPool = SessionPoolDefaultSettings.MaxSessionPool; _createSessionTimeout = SessionPoolDefaultSettings.CreateSessionTimeoutSeconds; _sessionIdleTimeout = 300; _sessionPruningInterval = 10; - _useTls = YdbAdoDefaultSettings.UseTls; + _useTls = false; _connectTimeout = GrpcDefaultSettings.ConnectTimeoutSeconds; _keepAlivePingDelay = GrpcDefaultSettings.KeepAlivePingSeconds; _keepAlivePingTimeout = GrpcDefaultSettings.KeepAlivePingTimeoutSeconds; diff --git a/src/Ydb.Sdk/src/Ado/YdbSchema.cs b/src/Ydb.Sdk/src/Ado/YdbSchema.cs index 4fb1d901..eb2a2823 100644 --- a/src/Ydb.Sdk/src/Ado/YdbSchema.cs +++ b/src/Ydb.Sdk/src/Ado/YdbSchema.cs @@ -3,8 +3,11 @@ using System.Globalization; using Ydb.Scheme; using Ydb.Scheme.V1; +using Ydb.Sdk.Ado.Internal; using Ydb.Sdk.Ado.Schema; using Ydb.Sdk.Services.Table; +using Ydb.Table; +using Ydb.Table.V1; namespace Ydb.Sdk.Ado; @@ -59,10 +62,28 @@ public static async Task DescribeTable( { try { - var describeResponse = await ydbConnection.Session - .DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings); + describeTableSettings ??= new DescribeTableSettings(); - return new YdbTable(tableName, describeResponse); + var describeResponse = await ydbConnection.Session.Driver.UnaryCall( + TableService.DescribeTableMethod, + new DescribeTableRequest + { + Path = WithSuffix(ydbConnection.Database) + tableName, + IncludeTableStats = describeTableSettings.IncludeTableStats, + IncludePartitionStats = describeTableSettings.IncludePartitionStats, + IncludeShardKeyBounds = describeTableSettings.IncludeShardKeyBounds + }, + describeTableSettings + ); + + if (describeResponse.Operation.Status.IsNotSuccess()) + { + throw YdbException.FromServer(describeResponse.Operation.Status, describeResponse.Operation.Issues); + } + + var describeResult = describeResponse.Operation.Result.Unpack(); + + return new YdbTable(tableName, describeResult); } catch (YdbException e) { @@ -241,31 +262,20 @@ private static async Task AppendDescribeTable( string? tableType, Action appendInTable) { - try - { - var describeResponse = await ydbConnection.Session - .DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings); - var ydbTable = new YdbTable(tableName, describeResponse); + var ydbTable = await DescribeTable(ydbConnection, tableName, describeTableSettings); - var type = ydbTable.IsSystem - ? "SYSTEM_TABLE" - : ydbTable.Type switch - { - YdbTable.TableType.Table => "TABLE", - YdbTable.TableType.ColumnTable => "COLUMN_TABLE", - YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE", - _ => throw new ArgumentOutOfRangeException(nameof(tableType)) - }; - if (type.IsPattern(tableType)) + var type = ydbTable.IsSystem + ? "SYSTEM_TABLE" + : ydbTable.Type switch { - appendInTable(ydbTable, type); - } - } - catch (YdbException e) + YdbTable.TableType.Table => "TABLE", + YdbTable.TableType.ColumnTable => "COLUMN_TABLE", + YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE", + _ => throw new ArgumentOutOfRangeException(nameof(tableType)) + }; + if (type.IsPattern(tableType)) { - ydbConnection.OnNotSuccessStatusCode(e.Code); - - throw; + appendInTable(ydbTable, type); } } diff --git a/src/Ydb.Sdk/src/Pool/SessionPool.cs b/src/Ydb.Sdk/src/Pool/SessionPool.cs index 28feb64e..b7e82dd6 100644 --- a/src/Ydb.Sdk/src/Pool/SessionPool.cs +++ b/src/Ydb.Sdk/src/Pool/SessionPool.cs @@ -57,6 +57,8 @@ internal async Task GetSession(CancellationToken cancellationToken = d if (session != null) // not active { Logger.LogDebug("Session[{Id}] isn't active, creating new session", session.SessionId); + + _ = DeleteSession(session); } try @@ -149,6 +151,10 @@ internal async ValueTask ReleaseSession(TSession session) { _idleSessions.Enqueue(session); } + else + { + _ = DeleteSession(session); + } } finally { @@ -162,10 +168,7 @@ private async Task DeleteSession(TSession session) { try { - if (session.IsActive) - { - await session.DeleteSession(); - } + await session.DeleteSession(); } catch (YdbException e) { diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index 2c496b06..d843fae0 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -3,11 +3,9 @@ using Ydb.Query.V1; using Ydb.Sdk.Ado; using Ydb.Sdk.Ado.Internal; +using Ydb.Sdk.Ado.Session; using Ydb.Sdk.Pool; -using Ydb.Sdk.Services.Table; using Ydb.Sdk.Value; -using Ydb.Table; -using Ydb.Table.V1; using CommitTransactionRequest = Ydb.Query.CommitTransactionRequest; using CreateSessionRequest = Ydb.Query.CreateSessionRequest; using DeleteSessionRequest = Ydb.Query.DeleteSessionRequest; @@ -20,14 +18,15 @@ internal sealed class SessionPool : SessionPool, IAsyncDisposable { private static readonly CreateSessionRequest CreateSessionRequest = new(); - private readonly IDriver _driver; private readonly bool _disposingDriver; private readonly ILogger _loggerSession; + internal readonly IDriver Driver; + internal SessionPool(IDriver driver, SessionPoolConfig sessionPoolConfig) : base(driver.LoggerFactory.CreateLogger(), sessionPoolConfig) { - _driver = driver; + Driver = driver; _disposingDriver = sessionPoolConfig.DisposeDriver; _loggerSession = driver.LoggerFactory.CreateLogger(); } @@ -46,7 +45,7 @@ protected override async Task CreateSession( requestSettings.ClientCapabilities.Add("session-balancer"); } - var response = await _driver.UnaryCall( + var response = await Driver.UnaryCall( QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings @@ -59,13 +58,13 @@ protected override async Task CreateSession( var sessionId = response.SessionId; var nodeId = response.NodeId; - var session = new Session(_driver, this, sessionId, nodeId, _loggerSession); + var session = new Session(Driver, this, sessionId, nodeId, _loggerSession); _ = Task.Run(async () => { try { - using var stream = await _driver.ServerStreamCall( + using var stream = await Driver.ServerStreamCall( QueryService.AttachSessionMethod, new AttachSessionRequest { SessionId = sessionId }, new GrpcRequestSettings { NodeId = nodeId } @@ -138,13 +137,11 @@ protected override async Task CreateSession( return session; } - protected override ValueTask DisposeDriver() => _disposingDriver ? _driver.DisposeAsync() : default; + protected override ValueTask DisposeDriver() => _disposingDriver ? Driver.DisposeAsync() : default; } -internal class Session : SessionBase +internal class Session : SessionBase, ISession { - internal IDriver Driver { get; } - internal Session( IDriver driver, SessionPool sessionPool, @@ -156,20 +153,21 @@ ILogger logger Driver = driver; } - internal ValueTask> ExecuteQuery( - string query, - Dictionary? parameters, - ExecuteQuerySettings? settings, + public IDriver Driver { get; } + + public ValueTask> ExecuteQuery( + string query, + Dictionary parameters, + GrpcRequestSettings settings, TransactionControl? txControl) { - parameters ??= new Dictionary(); - settings = MakeGrpcRequestSettings(settings ?? new ExecuteQuerySettings()); + settings = MakeGrpcRequestSettings(settings); var request = new ExecuteQueryRequest { SessionId = SessionId, ExecMode = ExecMode.Execute, - QueryContent = new QueryContent { Text = query, Syntax = (Ydb.Query.Syntax)settings.Syntax }, + QueryContent = new QueryContent { Text = query, Syntax = Ydb.Query.Syntax.YqlV1 }, StatsMode = StatsMode.None, TxControl = txControl }; @@ -179,7 +177,13 @@ internal ValueTask> ExecuteQuery( return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); } - internal async Task CommitTransaction(string txId, CancellationToken cancellationToken = default) + public bool IsBroken => !IsActive; + + public new void OnNotSuccessStatusCode(StatusCode code) => base.OnNotSuccessStatusCode(code); + + public void Close() => Release(); + + public async Task CommitTransaction(string txId, CancellationToken cancellationToken = default) { var settings = MakeGrpcRequestSettings(new GrpcRequestSettings { CancellationToken = cancellationToken }); @@ -192,7 +196,7 @@ internal async Task CommitTransaction(string txId, CancellationToken cancellatio } } - internal async Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) + public async Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) { var settings = MakeGrpcRequestSettings(new GrpcRequestSettings { CancellationToken = cancellationToken }); @@ -205,30 +209,6 @@ internal async Task RollbackTransaction(string txId, CancellationToken cancellat } } - internal async Task DescribeTable(string path, DescribeTableSettings? settings = null) - { - settings = MakeGrpcRequestSettings(settings ?? new DescribeTableSettings()); - - var response = await Driver.UnaryCall( - TableService.DescribeTableMethod, - new DescribeTableRequest - { - Path = path, - IncludeTableStats = settings.IncludeTableStats, - IncludePartitionStats = settings.IncludePartitionStats, - IncludeShardKeyBounds = settings.IncludeShardKeyBounds - }, - settings - ); - - if (response.Operation.Status.IsNotSuccess()) - { - throw YdbException.FromServer(response.Operation.Status, response.Operation.Issues); - } - - return response.Operation.Result.Unpack(); - } - internal override async Task DeleteSession() { IsActive = false; diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs index e5f971b5..a8cec541 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs @@ -92,6 +92,8 @@ internal class MockSessionFactory : IPoolingSessionFactory internal class PoolingMockSession(PoolingSessionSource source) : IPoolingSession { + public IDriver Driver => throw new NotImplementedException(); + public bool IsBroken => false; public void Close() => source.Return(this); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/StressTestTank.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/StressTestTank.cs index 109442c7..916a13ad 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/StressTestTank.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/StressTestTank.cs @@ -18,8 +18,11 @@ public StressTestTank(StressTestConfig config) _logger = loggerFactory.CreateLogger(); _settings = new YdbConnectionStringBuilder(config.ConnectionString) { - CredentialsProvider = - config.SaFilePath != null ? new ServiceAccountProvider(config.SaFilePath, loggerFactory) : null + LoggerFactory = loggerFactory, + CredentialsProvider = config.SaFilePath != null + ? new ServiceAccountProvider(config.SaFilePath, loggerFactory) + : new MetadataProvider(loggerFactory), + ServerCertificates = YcCerts.GetYcServerCertificates() }; ValidateConfig(); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs index 1f22175d..6c67e223 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs @@ -39,21 +39,14 @@ public IPoolingSession NewSession(PoolingSessionSource source) => new MockPoolingSession(source, Interlocked.Increment(ref _sessionNum)); } -internal class MockPoolingSession : IPoolingSession +internal class MockPoolingSession(PoolingSessionSource source, int sessionNum) : IPoolingSession { - private readonly PoolingSessionSource _source; - - internal string SessionId { get; } - - public MockPoolingSession(PoolingSessionSource source, int sessionNum) - { - _source = source; - SessionId = $"session_{sessionNum}"; - } + internal string SessionId { get; } = $"session_{sessionNum}"; + public IDriver Driver => throw new NotImplementedException(); public bool IsBroken { get; set; } - public void Close() => _source.Return(this); + public void Close() => source.Return(this); public Task Open(CancellationToken cancellationToken) => Task.CompletedTask; From 73b6a8d56b95c56587462c698e71a84cc98338d3 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 22 Jul 2025 17:58:26 +0300 Subject: [PATCH 3/6] updating --- src/Ydb.Sdk/CHANGELOG.md | 3 +++ src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs | 4 ++-- src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs | 14 +++++++++----- .../src/Ado/Session/PoolingSessionSource.cs | 5 ----- src/Ydb.Sdk/src/Ado/YdbCommand.cs | 1 - src/Ydb.Sdk/src/Services/Query/QueryClient.cs | 3 ++- src/Ydb.Sdk/src/Services/Query/QueryTx.cs | 11 +++++++---- src/Ydb.Sdk/src/Services/Query/SessionPool.cs | 17 ++++++++--------- .../SessionSourceBenchmark.cs | 2 +- .../Session/PoolingSessionTests.cs | 2 +- 10 files changed, 33 insertions(+), 29 deletions(-) diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index 9448d1d9..3eff72eb 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,3 +1,6 @@ +- Optimization: On BadSession, do not invoke the `DeleteSession()` method. +- Canceling AttachStream after calling the `DeleteSession` method. +- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`). - Fixed bug: Grpc.Core.StatusCode.Cancelled was mapped to server's Canceled status. - ADO.NET: PoolingSessionSource 2.0 based on Npgsql pooling algorithm. - Added new ADO.NET options: diff --git a/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs b/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs index bbe3ce0d..9669afb1 100644 --- a/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs +++ b/src/Ydb.Sdk/src/Ado/Internal/SqlParser.cs @@ -232,7 +232,7 @@ StringBuilder yql return startToken; // rollback parse IN LIST } - yql.Append(listStartToken > startToken ? sql[startToken..listStartToken] : ' '); + yql.Append(listStartToken > startToken ? sql[startToken .. listStartToken] : ' '); var paramListName = sqlParamsBuilder.AddListPrimitiveParams(findNameParams); yql.Append(paramListName); @@ -273,7 +273,7 @@ private static (string Name, int NextToken) ParseNameParam(string sql, int curTo throw new YdbException($"Have empty name parameter, invalid SQL [position: {prevToken}]"); } - return ($"${sql[prevToken..curToken]}", curToken); + return ($"${sql[prevToken .. curToken]}", curToken); } private static bool IsSqlIdentifierChar(this char c) => char.IsLetterOrDigit(c) || c == '_'; diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs index ed85d411..7d25cd4a 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs @@ -1,8 +1,3 @@ -// This file contains session pooling algorithms adapted from Npgsql -// Original source: https://github.com/npgsql/npgsql -// Copyright (c) 2002-2025, Npgsql -// Licence https://github.com/npgsql/npgsql?tab=PostgreSQL-1-ov-file - using Microsoft.Extensions.Logging; using Ydb.Query; using Ydb.Query.V1; @@ -25,6 +20,7 @@ internal class PoolingSession : IPoolingSession private readonly CancellationTokenSource _attachStreamLifecycleCts = new(); private volatile bool _isBroken = true; + private volatile bool _isBadSession = false; private readonly bool _disableServerBalancer; @@ -105,6 +101,8 @@ public async Task RollbackTransaction( public void OnNotSuccessStatusCode(StatusCode statusCode) { + _isBadSession = _isBadSession || statusCode is StatusCode.BadSession; + if (statusCode is StatusCode.BadSession or StatusCode.SessionBusy or @@ -226,6 +224,12 @@ public async Task DeleteSession() _isBroken = true; _attachStreamLifecycleCts.CancelAfter(DeleteSessionTimeout); + if (_isBadSession) + { + return; + } + + _isBadSession = true; var deleteSessionResponse = await Driver.UnaryCall( QueryService.DeleteSessionMethod, new DeleteSessionRequest { SessionId = SessionId }, diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs index b8880ca8..fc80f3a8 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs @@ -1,8 +1,3 @@ -// This file contains session pooling algorithms adapted from Npgsql -// Original source: https://github.com/npgsql/npgsql -// Copyright (c) 2002-2025, Npgsql -// Licence https://github.com/npgsql/npgsql?tab=PostgreSQL-1-ov-file - using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading.Channels; diff --git a/src/Ydb.Sdk/src/Ado/YdbCommand.cs b/src/Ydb.Sdk/src/Ado/YdbCommand.cs index ec49c0fa..2025991e 100644 --- a/src/Ydb.Sdk/src/Ado/YdbCommand.cs +++ b/src/Ydb.Sdk/src/Ado/YdbCommand.cs @@ -3,7 +3,6 @@ using System.Diagnostics.CodeAnalysis; using System.Text; using Ydb.Sdk.Ado.Internal; -using Ydb.Sdk.Services.Query; using Ydb.Sdk.Value; namespace Ydb.Sdk.Ado; diff --git a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs index 443d3b02..6820ba74 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs @@ -61,7 +61,8 @@ public Task Stream(string query, Func> onStrea Dictionary? parameters = null, TxMode txMode = TxMode.NoTx, ExecuteQuerySettings? settings = null) => _sessionPool.ExecOnSession(async session => await onStream(new ExecuteQueryStream( - await session.ExecuteQuery(query, parameters, settings, txMode.TransactionControl()))) + await session.ExecuteQuery(query, parameters ?? new Dictionary(), + settings ?? new GrpcRequestSettings(), txMode.TransactionControl()))) ); public Task Stream(string query, Func onStream, diff --git a/src/Ydb.Sdk/src/Services/Query/QueryTx.cs b/src/Ydb.Sdk/src/Services/Query/QueryTx.cs index c9aadf3e..15c72d37 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryTx.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryTx.cs @@ -28,10 +28,13 @@ internal QueryTx(Session session, TxMode txMode) } public async ValueTask Stream(string query, Dictionary? parameters = null, - bool commit = false, ExecuteQuerySettings? settings = null) => - new( - await _session.ExecuteQuery(query, parameters, settings, TxControl(commit)), txId => TxId = txId - ); + bool commit = false, ExecuteQuerySettings? settings = null) => new(await _session.ExecuteQuery( + query, + parameters ?? new Dictionary(), + settings ?? new GrpcRequestSettings(), + TxControl(commit) + ), txId => TxId = txId + ); public async Task> ReadAllRows(string query, Dictionary? parameters = null, bool commit = false, ExecuteQuerySettings? settings = null) diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index d843fae0..915f69c3 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -18,15 +18,14 @@ internal sealed class SessionPool : SessionPool, IAsyncDisposable { private static readonly CreateSessionRequest CreateSessionRequest = new(); + private readonly IDriver _driver; private readonly bool _disposingDriver; private readonly ILogger _loggerSession; - internal readonly IDriver Driver; - internal SessionPool(IDriver driver, SessionPoolConfig sessionPoolConfig) : base(driver.LoggerFactory.CreateLogger(), sessionPoolConfig) { - Driver = driver; + _driver = driver; _disposingDriver = sessionPoolConfig.DisposeDriver; _loggerSession = driver.LoggerFactory.CreateLogger(); } @@ -45,7 +44,7 @@ protected override async Task CreateSession( requestSettings.ClientCapabilities.Add("session-balancer"); } - var response = await Driver.UnaryCall( + var response = await _driver.UnaryCall( QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings @@ -58,13 +57,13 @@ protected override async Task CreateSession( var sessionId = response.SessionId; var nodeId = response.NodeId; - var session = new Session(Driver, this, sessionId, nodeId, _loggerSession); + var session = new Session(_driver, this, sessionId, nodeId, _loggerSession); _ = Task.Run(async () => { try { - using var stream = await Driver.ServerStreamCall( + using var stream = await _driver.ServerStreamCall( QueryService.AttachSessionMethod, new AttachSessionRequest { SessionId = sessionId }, new GrpcRequestSettings { NodeId = nodeId } @@ -137,7 +136,7 @@ protected override async Task CreateSession( return session; } - protected override ValueTask DisposeDriver() => _disposingDriver ? Driver.DisposeAsync() : default; + protected override ValueTask DisposeDriver() => _disposingDriver ? _driver.DisposeAsync() : default; } internal class Session : SessionBase, ISession @@ -156,8 +155,8 @@ ILogger logger public IDriver Driver { get; } public ValueTask> ExecuteQuery( - string query, - Dictionary parameters, + string query, + Dictionary parameters, GrpcRequestSettings settings, TransactionControl? txControl) { diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs index a8cec541..20ce8cb7 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs @@ -93,7 +93,7 @@ internal class MockSessionFactory : IPoolingSessionFactory internal class PoolingMockSession(PoolingSessionSource source) : IPoolingSession { public IDriver Driver => throw new NotImplementedException(); - + public bool IsBroken => false; public void Close() => source.Return(this); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs index 66b936f7..52b38ead 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs @@ -269,7 +269,7 @@ private TaskCompletionSource SetupAttachStream() { var tcsSecondMoveAttachStream = new TaskCompletionSource(); - _mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(CancellationToken.None)) + _mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(It.IsAny())) .ReturnsAsync(true) .Returns(new ValueTask(tcsSecondMoveAttachStream.Task)); _mockAttachStream.SetupSequence(attachStream => attachStream.Current) From 80a1655b7613c92e95681406e518df7498c88b31 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 22 Jul 2025 18:14:10 +0300 Subject: [PATCH 4/6] fix test --- .../test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs index 52b38ead..c59fc739 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs @@ -47,13 +47,7 @@ public async Task OnNotSuccessStatusCode_WhenStatusCodeIsNotSuccess_UpdateIsBrok bool isError) { SetupSuccessCreateSession(); - var tcsSecondMoveAttachStream = new TaskCompletionSource(); - - _mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(CancellationToken.None)) - .ReturnsAsync(true) - .Returns(new ValueTask(tcsSecondMoveAttachStream.Task)); - _mockAttachStream.SetupSequence(attachStream => attachStream.Current) - .Returns(new SessionState { Status = StatusIds.Types.StatusCode.Success }); + var tcsSecondMoveAttachStream = SetupAttachStream(); var session = _poolingSessionFactory.NewSession(_poolingSessionSource); Assert.True(session.IsBroken); await session.Open(CancellationToken.None); From b3aab22e712776dc24f042909e0f8f136329243a Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 22 Jul 2025 18:19:48 +0300 Subject: [PATCH 5/6] fix --- src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs index 7d25cd4a..9fc99258 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs @@ -20,7 +20,7 @@ internal class PoolingSession : IPoolingSession private readonly CancellationTokenSource _attachStreamLifecycleCts = new(); private volatile bool _isBroken = true; - private volatile bool _isBadSession = false; + private volatile bool _isBadSession; private readonly bool _disableServerBalancer; From 55ba0af20da96e417d9c466292b5511e98322f54 Mon Sep 17 00:00:00 2001 From: KirillKurdyukov Date: Tue, 22 Jul 2025 18:35:24 +0300 Subject: [PATCH 6/6] fix --- .../src/Ado/Session/PoolingSessionSource.cs | 53 ++++++++----------- src/Ydb.Sdk/src/Ado/YdbConnection.cs | 14 ++++- 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs index fc80f3a8..05c487ff 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs @@ -87,45 +87,34 @@ private async ValueTask RentAsync(CancellationToken cancellatio var finalToken = ctsGetSession.Token; - try + var session = await OpenNewSession(finalToken).ConfigureAwait(false); + if (session != null) + return session; + + while (true) { - var session = await OpenNewSession(finalToken).ConfigureAwait(false); - if (session != null) - return session; + session = await _idleSessionReader.ReadAsync(finalToken).ConfigureAwait(false); - while (true) + if (CheckIdleSession(session)) { - session = await _idleSessionReader.ReadAsync(finalToken).ConfigureAwait(false); - - if (CheckIdleSession(session)) - { - return session; - } + return session; + } - // If we're here, our waiting attempt on the idle session channel was released with a null - // (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked. - if (TryGetIdleSession(out session)) - { - return session; - } + // If we're here, our waiting attempt on the idle session channel was released with a null + // (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked. + if (TryGetIdleSession(out session)) + { + return session; + } - // We might have closed a session in the meantime and no longer be at max capacity - // so try to open a new session and if that fails, loop again. - session = await OpenNewSession(finalToken).ConfigureAwait(false); - if (session != null) - { - return session; - } + // We might have closed a session in the meantime and no longer be at max capacity + // so try to open a new session and if that fails, loop again. + session = await OpenNewSession(finalToken).ConfigureAwait(false); + if (session != null) + { + return session; } } - catch (OperationCanceledException e) - { - throw new YdbException(StatusCode.Cancelled, - $"The connection pool has been exhausted, either raise 'MaxSessionPool' " + - $"(currently {_maxSessionSize}) or 'CreateSessionTimeout' " + - $"(currently {_createSessionTimeout} seconds) in your connection string.", e - ); - } } private async ValueTask OpenNewSession(CancellationToken cancellationToken) diff --git a/src/Ydb.Sdk/src/Ado/YdbConnection.cs b/src/Ydb.Sdk/src/Ado/YdbConnection.cs index ce2e2c43..a240e4bc 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnection.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnection.cs @@ -92,8 +92,18 @@ public override void ChangeDatabase(string databaseName) public override async Task OpenAsync(CancellationToken cancellationToken) { ThrowIfConnectionOpen(); - - Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken); + try + { + Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken); + } + catch (OperationCanceledException e) + { + throw new YdbException(StatusCode.Cancelled, + $"The connection pool has been exhausted, either raise 'MaxSessionPool' " + + $"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " + + $"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e + ); + } OnStateChange(ClosedToOpenEventArgs);