diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index 9448d1d9..3eff72eb 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,3 +1,6 @@ +- Optimization: On BadSession, do not invoke the `DeleteSession()` method. +- Canceling AttachStream after calling the `DeleteSession` method. +- Fixed bug: fixed issue where session was not deleted (`ClientTransportTimeout`). - Fixed bug: Grpc.Core.StatusCode.Cancelled was mapped to server's Canceled status. - ADO.NET: PoolingSessionSource 2.0 based on Npgsql pooling algorithm. - Added new ADO.NET options: diff --git a/src/Ydb.Sdk/src/Ado/PoolManager.cs b/src/Ydb.Sdk/src/Ado/PoolManager.cs index ed2712d6..a8f787b4 100644 --- a/src/Ydb.Sdk/src/Ado/PoolManager.cs +++ b/src/Ydb.Sdk/src/Ado/PoolManager.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using Ydb.Sdk.Ado.Session; using Ydb.Sdk.Pool; using Ydb.Sdk.Services.Query; @@ -9,7 +10,7 @@ internal static class PoolManager private static readonly SemaphoreSlim SemaphoreSlim = new(1); // async mutex private static readonly ConcurrentDictionary Pools = new(); - internal static async Task GetSession( + internal static async Task GetSession( YdbConnectionStringBuilder connectionString, CancellationToken cancellationToken ) diff --git a/src/Ydb.Sdk/src/Ado/Session/ISession.cs b/src/Ydb.Sdk/src/Ado/Session/ISession.cs index fc8c70ba..1aec418b 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ISession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ISession.cs @@ -6,6 +6,8 @@ namespace Ydb.Sdk.Ado.Session; internal interface ISession { + IDriver Driver { get; } + bool IsBroken { get; } ValueTask> ExecuteQuery( diff --git a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs index 182dbf44..45a251f7 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ImplicitSession.cs @@ -6,13 +6,12 @@ namespace Ydb.Sdk.Ado.Session; internal class ImplicitSession : ISession { - private readonly IDriver _driver; - public ImplicitSession(IDriver driver) { - _driver = driver; + Driver = driver; } + public IDriver Driver { get; } public bool IsBroken => false; public ValueTask> ExecuteQuery( @@ -36,7 +35,7 @@ public ValueTask> ExecuteQuery( }; request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto())); - return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); + return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); } public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) => diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs index 2002ec69..9fc99258 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs @@ -15,17 +15,19 @@ internal class PoolingSession : IPoolingSession private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5); private static readonly CreateSessionRequest CreateSessionRequest = new(); - private readonly IDriver _driver; private readonly PoolingSessionSource _poolingSessionSource; private readonly ILogger _logger; + private readonly CancellationTokenSource _attachStreamLifecycleCts = new(); private volatile bool _isBroken = true; + private volatile bool _isBadSession; private readonly bool _disableServerBalancer; private string SessionId { get; set; } = string.Empty; private long NodeId { get; set; } + public IDriver Driver { get; } public bool IsBroken => _isBroken; internal PoolingSession( @@ -35,10 +37,10 @@ internal PoolingSession( ILogger logger ) { - _driver = driver; _poolingSessionSource = poolingSessionSource; _disableServerBalancer = disableServerBalancer; _logger = logger; + Driver = driver; } public ValueTask> ExecuteQuery( @@ -60,7 +62,7 @@ public ValueTask> ExecuteQuery( }; request.Parameters.Add(parameters.ToDictionary(p => p.Key, p => p.Value.GetProto())); - return _driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); + return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); } public async Task CommitTransaction( @@ -68,7 +70,7 @@ public async Task CommitTransaction( CancellationToken cancellationToken = default ) { - var response = await _driver.UnaryCall( + var response = await Driver.UnaryCall( QueryService.CommitTransactionMethod, new CommitTransactionRequest { SessionId = SessionId, TxId = txId }, new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId } @@ -85,7 +87,7 @@ public async Task RollbackTransaction( CancellationToken cancellationToken = default ) { - var response = await _driver.UnaryCall( + var response = await Driver.UnaryCall( QueryService.RollbackTransactionMethod, new RollbackTransactionRequest { SessionId = SessionId, TxId = txId }, new GrpcRequestSettings { CancellationToken = cancellationToken, NodeId = NodeId } @@ -99,6 +101,8 @@ public async Task RollbackTransaction( public void OnNotSuccessStatusCode(StatusCode statusCode) { + _isBadSession = _isBadSession || statusCode is StatusCode.BadSession; + if (statusCode is StatusCode.BadSession or StatusCode.SessionBusy or @@ -121,7 +125,7 @@ public async Task Open(CancellationToken cancellationToken) requestSettings.ClientCapabilities.Add(SessionBalancer); } - var response = await _driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings); + var response = await Driver.UnaryCall(QueryService.CreateSessionMethod, CreateSessionRequest, requestSettings); if (response.Status.IsNotSuccess()) { @@ -138,7 +142,7 @@ public async Task Open(CancellationToken cancellationToken) { try { - using var stream = await _driver.ServerStreamCall( + using var stream = await Driver.ServerStreamCall( QueryService.AttachSessionMethod, new AttachSessionRequest { SessionId = SessionId }, new GrpcRequestSettings { NodeId = NodeId } @@ -161,10 +165,12 @@ public async Task Open(CancellationToken cancellationToken) completeTask.SetResult(); + var lifecycleAttachToken = _attachStreamLifecycleCts.Token; + try { // ReSharper disable once MethodSupportsCancellation - while (await stream.MoveNextAsync()) + while (await stream.MoveNextAsync(lifecycleAttachToken)) { var sessionState = stream.Current; @@ -215,14 +221,16 @@ public async Task DeleteSession() { try { - if (_isBroken) + _isBroken = true; + _attachStreamLifecycleCts.CancelAfter(DeleteSessionTimeout); + + if (_isBadSession) { return; } - _isBroken = true; - - var deleteSessionResponse = await _driver.UnaryCall( + _isBadSession = true; + var deleteSessionResponse = await Driver.UnaryCall( QueryService.DeleteSessionMethod, new DeleteSessionRequest { SessionId = SessionId }, new GrpcRequestSettings { TransportTimeout = DeleteSessionTimeout, NodeId = NodeId } diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs index b8880ca8..05c487ff 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs @@ -1,8 +1,3 @@ -// This file contains session pooling algorithms adapted from Npgsql -// Original source: https://github.com/npgsql/npgsql -// Copyright (c) 2002-2025, Npgsql -// Licence https://github.com/npgsql/npgsql?tab=PostgreSQL-1-ov-file - using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; using System.Threading.Channels; @@ -92,45 +87,34 @@ private async ValueTask RentAsync(CancellationToken cancellatio var finalToken = ctsGetSession.Token; - try + var session = await OpenNewSession(finalToken).ConfigureAwait(false); + if (session != null) + return session; + + while (true) { - var session = await OpenNewSession(finalToken).ConfigureAwait(false); - if (session != null) - return session; + session = await _idleSessionReader.ReadAsync(finalToken).ConfigureAwait(false); - while (true) + if (CheckIdleSession(session)) { - session = await _idleSessionReader.ReadAsync(finalToken).ConfigureAwait(false); - - if (CheckIdleSession(session)) - { - return session; - } + return session; + } - // If we're here, our waiting attempt on the idle session channel was released with a null - // (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked. - if (TryGetIdleSession(out session)) - { - return session; - } + // If we're here, our waiting attempt on the idle session channel was released with a null + // (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked. + if (TryGetIdleSession(out session)) + { + return session; + } - // We might have closed a session in the meantime and no longer be at max capacity - // so try to open a new session and if that fails, loop again. - session = await OpenNewSession(finalToken).ConfigureAwait(false); - if (session != null) - { - return session; - } + // We might have closed a session in the meantime and no longer be at max capacity + // so try to open a new session and if that fails, loop again. + session = await OpenNewSession(finalToken).ConfigureAwait(false); + if (session != null) + { + return session; } } - catch (OperationCanceledException e) - { - throw new YdbException(StatusCode.Cancelled, - $"The connection pool has been exhausted, either raise 'MaxSessionPool' " + - $"(currently {_maxSessionSize}) or 'CreateSessionTimeout' " + - $"(currently {_createSessionTimeout} seconds) in your connection string.", e - ); - } } private async ValueTask OpenNewSession(CancellationToken cancellationToken) diff --git a/src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs b/src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs deleted file mode 100644 index 6c20df0b..00000000 --- a/src/Ydb.Sdk/src/Ado/YdbAdoDefaultSettings.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace Ydb.Sdk.Ado; - -internal static class YdbAdoDefaultSettings -{ - internal const string Host = "localhost"; - - internal const int Port = 2136; - - internal const string Database = "/local"; - - internal const bool UseTls = false; -} diff --git a/src/Ydb.Sdk/src/Ado/YdbCommand.cs b/src/Ydb.Sdk/src/Ado/YdbCommand.cs index 844343b5..2025991e 100644 --- a/src/Ydb.Sdk/src/Ado/YdbCommand.cs +++ b/src/Ydb.Sdk/src/Ado/YdbCommand.cs @@ -3,7 +3,6 @@ using System.Diagnostics.CodeAnalysis; using System.Text; using Ydb.Sdk.Ado.Internal; -using Ydb.Sdk.Services.Query; using Ydb.Sdk.Value; namespace Ydb.Sdk.Ado; @@ -206,8 +205,8 @@ protected override async Task ExecuteDbDataReaderAsync(CommandBeha preparedSql.Append(sql); var execSettings = CommandTimeout > 0 - ? new ExecuteQuerySettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) } - : new ExecuteQuerySettings(); + ? new GrpcRequestSettings { TransportTimeout = TimeSpan.FromSeconds(CommandTimeout) } + : new GrpcRequestSettings(); var transaction = YdbConnection.CurrentTransaction; diff --git a/src/Ydb.Sdk/src/Ado/YdbConnection.cs b/src/Ydb.Sdk/src/Ado/YdbConnection.cs index 8cac45c2..a240e4bc 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnection.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnection.cs @@ -1,6 +1,7 @@ using System.Data; using System.Data.Common; using System.Diagnostics.CodeAnalysis; +using Ydb.Sdk.Ado.Session; using Ydb.Sdk.Services.Query; using static System.Data.IsolationLevel; @@ -24,7 +25,7 @@ private YdbConnectionStringBuilder ConnectionStringBuilder [param: AllowNull] init => _connectionStringBuilder = value; } - internal Services.Query.Session Session + internal ISession Session { get { @@ -35,7 +36,7 @@ internal Services.Query.Session Session private set => _session = value; } - private Services.Query.Session _session = null!; + private ISession _session = null!; public YdbConnection() { @@ -91,8 +92,18 @@ public override void ChangeDatabase(string databaseName) public override async Task OpenAsync(CancellationToken cancellationToken) { ThrowIfConnectionOpen(); - - Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken); + try + { + Session = await PoolManager.GetSession(ConnectionStringBuilder, cancellationToken); + } + catch (OperationCanceledException e) + { + throw new YdbException(StatusCode.Cancelled, + $"The connection pool has been exhausted, either raise 'MaxSessionPool' " + + $"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " + + $"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e + ); + } OnStateChange(ClosedToOpenEventArgs); @@ -124,7 +135,7 @@ public override async Task CloseAsync() } finally { - await _session.Release(); + _session.Close(); } } @@ -152,7 +163,7 @@ internal void OnNotSuccessStatusCode(StatusCode code) { _session.OnNotSuccessStatusCode(code); - if (!_session.IsActive) + if (_session.IsBroken) { ConnectionState = ConnectionState.Broken; } diff --git a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs index 68c5849f..4f5dab33 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs @@ -25,15 +25,15 @@ public YdbConnectionStringBuilder(string connectionString) // Init default connection string private void InitDefaultValues() { - _host = YdbAdoDefaultSettings.Host; - _port = YdbAdoDefaultSettings.Port; - _database = YdbAdoDefaultSettings.Database; + _host = "localhost"; + _port = 2136; + _database = "/local"; _minSessionPool = 0; _maxSessionPool = SessionPoolDefaultSettings.MaxSessionPool; _createSessionTimeout = SessionPoolDefaultSettings.CreateSessionTimeoutSeconds; _sessionIdleTimeout = 300; _sessionPruningInterval = 10; - _useTls = YdbAdoDefaultSettings.UseTls; + _useTls = false; _connectTimeout = GrpcDefaultSettings.ConnectTimeoutSeconds; _keepAlivePingDelay = GrpcDefaultSettings.KeepAlivePingSeconds; _keepAlivePingTimeout = GrpcDefaultSettings.KeepAlivePingTimeoutSeconds; diff --git a/src/Ydb.Sdk/src/Ado/YdbSchema.cs b/src/Ydb.Sdk/src/Ado/YdbSchema.cs index 4fb1d901..eb2a2823 100644 --- a/src/Ydb.Sdk/src/Ado/YdbSchema.cs +++ b/src/Ydb.Sdk/src/Ado/YdbSchema.cs @@ -3,8 +3,11 @@ using System.Globalization; using Ydb.Scheme; using Ydb.Scheme.V1; +using Ydb.Sdk.Ado.Internal; using Ydb.Sdk.Ado.Schema; using Ydb.Sdk.Services.Table; +using Ydb.Table; +using Ydb.Table.V1; namespace Ydb.Sdk.Ado; @@ -59,10 +62,28 @@ public static async Task DescribeTable( { try { - var describeResponse = await ydbConnection.Session - .DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings); + describeTableSettings ??= new DescribeTableSettings(); - return new YdbTable(tableName, describeResponse); + var describeResponse = await ydbConnection.Session.Driver.UnaryCall( + TableService.DescribeTableMethod, + new DescribeTableRequest + { + Path = WithSuffix(ydbConnection.Database) + tableName, + IncludeTableStats = describeTableSettings.IncludeTableStats, + IncludePartitionStats = describeTableSettings.IncludePartitionStats, + IncludeShardKeyBounds = describeTableSettings.IncludeShardKeyBounds + }, + describeTableSettings + ); + + if (describeResponse.Operation.Status.IsNotSuccess()) + { + throw YdbException.FromServer(describeResponse.Operation.Status, describeResponse.Operation.Issues); + } + + var describeResult = describeResponse.Operation.Result.Unpack(); + + return new YdbTable(tableName, describeResult); } catch (YdbException e) { @@ -241,31 +262,20 @@ private static async Task AppendDescribeTable( string? tableType, Action appendInTable) { - try - { - var describeResponse = await ydbConnection.Session - .DescribeTable(WithSuffix(ydbConnection.Database) + tableName, describeTableSettings); - var ydbTable = new YdbTable(tableName, describeResponse); + var ydbTable = await DescribeTable(ydbConnection, tableName, describeTableSettings); - var type = ydbTable.IsSystem - ? "SYSTEM_TABLE" - : ydbTable.Type switch - { - YdbTable.TableType.Table => "TABLE", - YdbTable.TableType.ColumnTable => "COLUMN_TABLE", - YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE", - _ => throw new ArgumentOutOfRangeException(nameof(tableType)) - }; - if (type.IsPattern(tableType)) + var type = ydbTable.IsSystem + ? "SYSTEM_TABLE" + : ydbTable.Type switch { - appendInTable(ydbTable, type); - } - } - catch (YdbException e) + YdbTable.TableType.Table => "TABLE", + YdbTable.TableType.ColumnTable => "COLUMN_TABLE", + YdbTable.TableType.ExternalTable => "EXTERNAL_TABLE", + _ => throw new ArgumentOutOfRangeException(nameof(tableType)) + }; + if (type.IsPattern(tableType)) { - ydbConnection.OnNotSuccessStatusCode(e.Code); - - throw; + appendInTable(ydbTable, type); } } diff --git a/src/Ydb.Sdk/src/Pool/SessionPool.cs b/src/Ydb.Sdk/src/Pool/SessionPool.cs index 28feb64e..b7e82dd6 100644 --- a/src/Ydb.Sdk/src/Pool/SessionPool.cs +++ b/src/Ydb.Sdk/src/Pool/SessionPool.cs @@ -57,6 +57,8 @@ internal async Task GetSession(CancellationToken cancellationToken = d if (session != null) // not active { Logger.LogDebug("Session[{Id}] isn't active, creating new session", session.SessionId); + + _ = DeleteSession(session); } try @@ -149,6 +151,10 @@ internal async ValueTask ReleaseSession(TSession session) { _idleSessions.Enqueue(session); } + else + { + _ = DeleteSession(session); + } } finally { @@ -162,10 +168,7 @@ private async Task DeleteSession(TSession session) { try { - if (session.IsActive) - { - await session.DeleteSession(); - } + await session.DeleteSession(); } catch (YdbException e) { diff --git a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs index 443d3b02..6820ba74 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryClient.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryClient.cs @@ -61,7 +61,8 @@ public Task Stream(string query, Func> onStrea Dictionary? parameters = null, TxMode txMode = TxMode.NoTx, ExecuteQuerySettings? settings = null) => _sessionPool.ExecOnSession(async session => await onStream(new ExecuteQueryStream( - await session.ExecuteQuery(query, parameters, settings, txMode.TransactionControl()))) + await session.ExecuteQuery(query, parameters ?? new Dictionary(), + settings ?? new GrpcRequestSettings(), txMode.TransactionControl()))) ); public Task Stream(string query, Func onStream, diff --git a/src/Ydb.Sdk/src/Services/Query/QueryTx.cs b/src/Ydb.Sdk/src/Services/Query/QueryTx.cs index c9aadf3e..15c72d37 100644 --- a/src/Ydb.Sdk/src/Services/Query/QueryTx.cs +++ b/src/Ydb.Sdk/src/Services/Query/QueryTx.cs @@ -28,10 +28,13 @@ internal QueryTx(Session session, TxMode txMode) } public async ValueTask Stream(string query, Dictionary? parameters = null, - bool commit = false, ExecuteQuerySettings? settings = null) => - new( - await _session.ExecuteQuery(query, parameters, settings, TxControl(commit)), txId => TxId = txId - ); + bool commit = false, ExecuteQuerySettings? settings = null) => new(await _session.ExecuteQuery( + query, + parameters ?? new Dictionary(), + settings ?? new GrpcRequestSettings(), + TxControl(commit) + ), txId => TxId = txId + ); public async Task> ReadAllRows(string query, Dictionary? parameters = null, bool commit = false, ExecuteQuerySettings? settings = null) diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index 2c496b06..915f69c3 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -3,11 +3,9 @@ using Ydb.Query.V1; using Ydb.Sdk.Ado; using Ydb.Sdk.Ado.Internal; +using Ydb.Sdk.Ado.Session; using Ydb.Sdk.Pool; -using Ydb.Sdk.Services.Table; using Ydb.Sdk.Value; -using Ydb.Table; -using Ydb.Table.V1; using CommitTransactionRequest = Ydb.Query.CommitTransactionRequest; using CreateSessionRequest = Ydb.Query.CreateSessionRequest; using DeleteSessionRequest = Ydb.Query.DeleteSessionRequest; @@ -141,10 +139,8 @@ protected override async Task CreateSession( protected override ValueTask DisposeDriver() => _disposingDriver ? _driver.DisposeAsync() : default; } -internal class Session : SessionBase +internal class Session : SessionBase, ISession { - internal IDriver Driver { get; } - internal Session( IDriver driver, SessionPool sessionPool, @@ -156,20 +152,21 @@ ILogger logger Driver = driver; } - internal ValueTask> ExecuteQuery( + public IDriver Driver { get; } + + public ValueTask> ExecuteQuery( string query, - Dictionary? parameters, - ExecuteQuerySettings? settings, + Dictionary parameters, + GrpcRequestSettings settings, TransactionControl? txControl) { - parameters ??= new Dictionary(); - settings = MakeGrpcRequestSettings(settings ?? new ExecuteQuerySettings()); + settings = MakeGrpcRequestSettings(settings); var request = new ExecuteQueryRequest { SessionId = SessionId, ExecMode = ExecMode.Execute, - QueryContent = new QueryContent { Text = query, Syntax = (Ydb.Query.Syntax)settings.Syntax }, + QueryContent = new QueryContent { Text = query, Syntax = Ydb.Query.Syntax.YqlV1 }, StatsMode = StatsMode.None, TxControl = txControl }; @@ -179,7 +176,13 @@ internal ValueTask> ExecuteQuery( return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); } - internal async Task CommitTransaction(string txId, CancellationToken cancellationToken = default) + public bool IsBroken => !IsActive; + + public new void OnNotSuccessStatusCode(StatusCode code) => base.OnNotSuccessStatusCode(code); + + public void Close() => Release(); + + public async Task CommitTransaction(string txId, CancellationToken cancellationToken = default) { var settings = MakeGrpcRequestSettings(new GrpcRequestSettings { CancellationToken = cancellationToken }); @@ -192,7 +195,7 @@ internal async Task CommitTransaction(string txId, CancellationToken cancellatio } } - internal async Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) + public async Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) { var settings = MakeGrpcRequestSettings(new GrpcRequestSettings { CancellationToken = cancellationToken }); @@ -205,30 +208,6 @@ internal async Task RollbackTransaction(string txId, CancellationToken cancellat } } - internal async Task DescribeTable(string path, DescribeTableSettings? settings = null) - { - settings = MakeGrpcRequestSettings(settings ?? new DescribeTableSettings()); - - var response = await Driver.UnaryCall( - TableService.DescribeTableMethod, - new DescribeTableRequest - { - Path = path, - IncludeTableStats = settings.IncludeTableStats, - IncludePartitionStats = settings.IncludePartitionStats, - IncludeShardKeyBounds = settings.IncludeShardKeyBounds - }, - settings - ); - - if (response.Operation.Status.IsNotSuccess()) - { - throw YdbException.FromServer(response.Operation.Status, response.Operation.Issues); - } - - return response.Operation.Result.Unpack(); - } - internal override async Task DeleteSession() { IsActive = false; diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs index e5f971b5..20ce8cb7 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs @@ -92,6 +92,8 @@ internal class MockSessionFactory : IPoolingSessionFactory internal class PoolingMockSession(PoolingSessionSource source) : IPoolingSession { + public IDriver Driver => throw new NotImplementedException(); + public bool IsBroken => false; public void Close() => source.Return(this); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/StressTestTank.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/StressTestTank.cs index 109442c7..916a13ad 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/StressTestTank.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/StressTestTank.cs @@ -18,8 +18,11 @@ public StressTestTank(StressTestConfig config) _logger = loggerFactory.CreateLogger(); _settings = new YdbConnectionStringBuilder(config.ConnectionString) { - CredentialsProvider = - config.SaFilePath != null ? new ServiceAccountProvider(config.SaFilePath, loggerFactory) : null + LoggerFactory = loggerFactory, + CredentialsProvider = config.SaFilePath != null + ? new ServiceAccountProvider(config.SaFilePath, loggerFactory) + : new MetadataProvider(loggerFactory), + ServerCertificates = YcCerts.GetYcServerCertificates() }; ValidateConfig(); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs index 1f22175d..6c67e223 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs @@ -39,21 +39,14 @@ public IPoolingSession NewSession(PoolingSessionSource source) => new MockPoolingSession(source, Interlocked.Increment(ref _sessionNum)); } -internal class MockPoolingSession : IPoolingSession +internal class MockPoolingSession(PoolingSessionSource source, int sessionNum) : IPoolingSession { - private readonly PoolingSessionSource _source; - - internal string SessionId { get; } - - public MockPoolingSession(PoolingSessionSource source, int sessionNum) - { - _source = source; - SessionId = $"session_{sessionNum}"; - } + internal string SessionId { get; } = $"session_{sessionNum}"; + public IDriver Driver => throw new NotImplementedException(); public bool IsBroken { get; set; } - public void Close() => _source.Return(this); + public void Close() => source.Return(this); public Task Open(CancellationToken cancellationToken) => Task.CompletedTask; diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs index 66b936f7..c59fc739 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs @@ -47,13 +47,7 @@ public async Task OnNotSuccessStatusCode_WhenStatusCodeIsNotSuccess_UpdateIsBrok bool isError) { SetupSuccessCreateSession(); - var tcsSecondMoveAttachStream = new TaskCompletionSource(); - - _mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(CancellationToken.None)) - .ReturnsAsync(true) - .Returns(new ValueTask(tcsSecondMoveAttachStream.Task)); - _mockAttachStream.SetupSequence(attachStream => attachStream.Current) - .Returns(new SessionState { Status = StatusIds.Types.StatusCode.Success }); + var tcsSecondMoveAttachStream = SetupAttachStream(); var session = _poolingSessionFactory.NewSession(_poolingSessionSource); Assert.True(session.IsBroken); await session.Open(CancellationToken.None); @@ -269,7 +263,7 @@ private TaskCompletionSource SetupAttachStream() { var tcsSecondMoveAttachStream = new TaskCompletionSource(); - _mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(CancellationToken.None)) + _mockAttachStream.SetupSequence(attachStream => attachStream.MoveNextAsync(It.IsAny())) .ReturnsAsync(true) .Returns(new ValueTask(tcsSecondMoveAttachStream.Task)); _mockAttachStream.SetupSequence(attachStream => attachStream.Current)