diff --git a/src/Ydb.Sdk/src/Ado/PoolManager.cs b/src/Ydb.Sdk/src/Ado/PoolManager.cs index a8f787b4..dfb299fe 100644 --- a/src/Ydb.Sdk/src/Ado/PoolManager.cs +++ b/src/Ydb.Sdk/src/Ado/PoolManager.cs @@ -11,11 +11,11 @@ internal static class PoolManager private static readonly ConcurrentDictionary Pools = new(); internal static async Task GetSession( - YdbConnectionStringBuilder connectionString, + YdbConnectionStringBuilder settings, CancellationToken cancellationToken ) { - if (Pools.TryGetValue(connectionString.ConnectionString, out var sessionPool)) + if (Pools.TryGetValue(settings.ConnectionString, out var sessionPool)) { return await sessionPool.GetSession(cancellationToken); } @@ -24,21 +24,21 @@ CancellationToken cancellationToken { await SemaphoreSlim.WaitAsync(cancellationToken); - if (Pools.TryGetValue(connectionString.ConnectionString, out var pool)) + if (Pools.TryGetValue(settings.ConnectionString, out var pool)) { return await pool.GetSession(cancellationToken); } var newSessionPool = new SessionPool( - await connectionString.BuildDriver(), + await settings.BuildDriver(), new SessionPoolConfig( - MaxSessionPool: connectionString.MaxSessionPool, - CreateSessionTimeout: connectionString.CreateSessionTimeout, + MaxSessionPool: settings.MaxSessionPool, + CreateSessionTimeout: settings.CreateSessionTimeout, DisposeDriver: true ) ); - Pools[connectionString.ConnectionString] = newSessionPool; + Pools[settings.ConnectionString] = newSessionPool; return await newSessionPool.GetSession(cancellationToken); } diff --git a/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs index 86bdbcf7..dbdbe861 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ISessionSource.cs @@ -1,8 +1,6 @@ namespace Ydb.Sdk.Ado.Session; -internal interface ISessionSource where TSession : ISession +internal interface ISessionSource : IAsyncDisposable { - ValueTask OpenSession(CancellationToken cancellationToken); - - void Return(TSession session); + ValueTask OpenSession(CancellationToken cancellationToken); } diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs index 9fc99258..07ab0b44 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs @@ -8,42 +8,40 @@ namespace Ydb.Sdk.Ado.Session; -internal class PoolingSession : IPoolingSession +internal class PoolingSession : PoolingSessionBase { private const string SessionBalancer = "session-balancer"; private static readonly TimeSpan DeleteSessionTimeout = TimeSpan.FromSeconds(5); private static readonly CreateSessionRequest CreateSessionRequest = new(); - private readonly PoolingSessionSource _poolingSessionSource; private readonly ILogger _logger; + private readonly bool _disableServerBalancer; + private readonly CancellationTokenSource _attachStreamLifecycleCts = new(); private volatile bool _isBroken = true; private volatile bool _isBadSession; - private readonly bool _disableServerBalancer; - private string SessionId { get; set; } = string.Empty; private long NodeId { get; set; } - public IDriver Driver { get; } - public bool IsBroken => _isBroken; + public override IDriver Driver { get; } + public override bool IsBroken => _isBroken; internal PoolingSession( IDriver driver, - PoolingSessionSource poolingSessionSource, + PoolingSessionSource poolingSessionSource, bool disableServerBalancer, ILogger logger - ) + ) : base(poolingSessionSource) { - _poolingSessionSource = poolingSessionSource; _disableServerBalancer = disableServerBalancer; _logger = logger; Driver = driver; } - public ValueTask> ExecuteQuery( + public override ValueTask> ExecuteQuery( string query, Dictionary parameters, GrpcRequestSettings settings, @@ -65,10 +63,7 @@ public ValueTask> ExecuteQuery( return Driver.ServerStreamCall(QueryService.ExecuteQueryMethod, request, settings); } - public async Task CommitTransaction( - string txId, - CancellationToken cancellationToken = default - ) + public override async Task CommitTransaction(string txId, CancellationToken cancellationToken = default) { var response = await Driver.UnaryCall( QueryService.CommitTransactionMethod, @@ -82,10 +77,7 @@ public async Task CommitTransaction( } } - public async Task RollbackTransaction( - string txId, - CancellationToken cancellationToken = default - ) + public override async Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) { var response = await Driver.UnaryCall( QueryService.RollbackTransactionMethod, @@ -99,7 +91,7 @@ public async Task RollbackTransaction( } } - public void OnNotSuccessStatusCode(StatusCode statusCode) + public override void OnNotSuccessStatusCode(StatusCode statusCode) { _isBadSession = _isBadSession || statusCode is StatusCode.BadSession; @@ -116,7 +108,7 @@ StatusCode.ClientTransportTimeout or } } - public async Task Open(CancellationToken cancellationToken) + internal override async Task Open(CancellationToken cancellationToken) { var requestSettings = new GrpcRequestSettings { CancellationToken = cancellationToken }; @@ -217,7 +209,7 @@ public async Task Open(CancellationToken cancellationToken) await completeTask.Task; } - public async Task DeleteSession() + internal override async Task DeleteSession() { try { @@ -248,6 +240,4 @@ public async Task DeleteSession() SessionId, NodeId); } } - - public void Close() => _poolingSessionSource.Return(this); } diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs index 2d6fb6c1..9019c658 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs @@ -1,20 +1,26 @@ using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; namespace Ydb.Sdk.Ado.Session; -internal class PoolingSessionFactory : IPoolingSessionFactory +internal class PoolingSessionFactory : IPoolingSessionFactory { private readonly IDriver _driver; private readonly bool _disableServerBalancer; private readonly ILogger _logger; - public PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory) + internal PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory) { _driver = driver; _disableServerBalancer = settings.DisableServerBalancer; _logger = loggerFactory.CreateLogger(); } - public IPoolingSession NewSession(PoolingSessionSource source) => - new PoolingSession(_driver, source, _disableServerBalancer, _logger); + public static async Task Create(YdbConnectionStringBuilder settings) => + new(await settings.BuildDriver(), settings, settings.LoggerFactory ?? NullLoggerFactory.Instance); + + public PoolingSession NewSession(PoolingSessionSource source) => + new(_driver, source, _disableServerBalancer, _logger); + + public ValueTask DisposeAsync() => _driver.DisposeAsync(); } diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs index 05c487ff..d3a815e5 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs @@ -1,89 +1,97 @@ +using System.Collections.Concurrent; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; -using System.Threading.Channels; +using Ydb.Query; +using Ydb.Sdk.Value; namespace Ydb.Sdk.Ado.Session; -internal sealed class PoolingSessionSource : ISessionSource +internal sealed class PoolingSessionSource : ISessionSource where T : PoolingSessionBase { - private readonly IPoolingSessionFactory _sessionFactory; + private readonly ConcurrentStack _idleSessions = new(); + private readonly ConcurrentQueue> _waiters = new(); + private readonly CancellationTokenSource _disposeCts = new(); + private readonly IPoolingSessionFactory _sessionFactory; private readonly int _minSessionSize; private readonly int _maxSessionSize; - - private readonly ChannelReader _idleSessionReader; - private readonly ChannelWriter _idleSessionWriter; - + private readonly T?[] _sessions; private readonly int _createSessionTimeout; - - private readonly Timer _pruningTimer; - private readonly int _pruningSampleSize; - private readonly int _pruningMedianIndex; - private readonly TimeSpan _pruningSamplingInterval; - private readonly int[] _pruningSamples; - private volatile bool _pruningTimerEnabled; - private int _pruningSampleIndex; + private readonly TimeSpan _sessionIdleTimeout; + private readonly Timer _cleanerTimer; private volatile int _numSessions; - private volatile int _idleCount; + private volatile int _disposed; + + private bool IsDisposed => _disposed == 1; public PoolingSessionSource( - IPoolingSessionFactory sessionFactory, + IPoolingSessionFactory sessionFactory, YdbConnectionStringBuilder settings ) { _sessionFactory = sessionFactory; - _minSessionSize = settings.MinSessionPool; _maxSessionSize = settings.MaxSessionPool; + if (_minSessionSize > _maxSessionSize) { throw new ArgumentException( $"Connection can't have 'Max Session Pool' {_maxSessionSize} under 'Min Session Pool' {_minSessionSize}"); } - var channel = Channel.CreateUnbounded(); - _idleSessionReader = channel.Reader; - _idleSessionWriter = channel.Writer; - + _sessions = new T?[_maxSessionSize]; _createSessionTimeout = settings.CreateSessionTimeout; + _sessionIdleTimeout = TimeSpan.FromSeconds(settings.SessionIdleTimeout); + _cleanerTimer = new Timer(CleanIdleSessions, this, _sessionIdleTimeout, _sessionIdleTimeout); + } + + public ValueTask OpenSession(CancellationToken cancellationToken = default) + { + cancellationToken.ThrowIfCancellationRequested(); - if (settings.SessionPruningInterval > settings.SessionIdleTimeout) + return TryGetIdleSession(out var session) + ? new ValueTask(session) + : RentAsync(cancellationToken); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool TryGetIdleSession([NotNullWhen(true)] out T? session) + { + while (_idleSessions.TryPop(out session)) { - throw new ArgumentException( - $"YdbConnection can't have {nameof(settings.SessionIdleTimeout)} {settings.SessionIdleTimeout} (in seconds) under {nameof(settings.SessionPruningInterval)} {settings.SessionPruningInterval} (in seconds)"); + if (CheckIdleSession(session)) + { + return true; + } } - _pruningTimer = new Timer(PruneIdleSessions, this, Timeout.Infinite, Timeout.Infinite); - _pruningSampleSize = DivideRoundingUp(settings.SessionIdleTimeout, settings.SessionPruningInterval); - _pruningMedianIndex = DivideRoundingUp(_pruningSampleSize, 2) - 1; // - 1 to go from length to index - _pruningSamplingInterval = TimeSpan.FromSeconds(settings.SessionPruningInterval); - _pruningSamples = new int[_pruningSampleSize]; - _pruningTimerEnabled = false; + return false; } - public ValueTask OpenSession(CancellationToken cancellationToken = default) => - TryGetIdleSession(out var session) ? new ValueTask(session) : RentAsync(cancellationToken); - - public void Return(IPoolingSession session) + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private bool CheckIdleSession([NotNullWhen(true)] T? session) { + if (session == null) + { + return false; + } + if (session.IsBroken) { CloseSession(session); - return; + + return false; } - Interlocked.Increment(ref _idleCount); - _idleSessionWriter.TryWrite(session); + return session.CompareAndSet(PoolingSessionState.In, PoolingSessionState.Out); } - private async ValueTask RentAsync(CancellationToken cancellationToken) + private async ValueTask RentAsync(CancellationToken cancellationToken) { using var ctsGetSession = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); if (_createSessionTimeout > 0) - { ctsGetSession.CancelAfter(TimeSpan.FromSeconds(_createSessionTimeout)); - } var finalToken = ctsGetSession.Token; @@ -93,66 +101,79 @@ private async ValueTask RentAsync(CancellationToken cancellatio while (true) { - session = await _idleSessionReader.ReadAsync(finalToken).ConfigureAwait(false); - - if (CheckIdleSession(session)) + // Statement order is important + var waiterTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + _waiters.Enqueue(waiterTcs); + if (_idleSessions.TryPop(out session)) { - return session; + if (!waiterTcs.TrySetResult(null)) + { + if (waiterTcs.Task is { IsCompleted: true, Result: not null } t) + { + _idleSessions.Push(t.Result); + } + + WakeUpWaiter(); + } + + if (CheckIdleSession(session)) + return session; + + session = await OpenNewSession(finalToken).ConfigureAwait(false); + if (session != null) + return session; + + continue; } - // If we're here, our waiting attempt on the idle session channel was released with a null - // (or bad session), or we're in sync mode. Check again if a new idle session has appeared since we last checked. - if (TryGetIdleSession(out session)) - { + await using var _ = finalToken.Register( + () => waiterTcs.TrySetCanceled(), + useSynchronizationContext: false + ); + await using var disposeRegistration = _disposeCts.Token.Register( + () => waiterTcs.TrySetException(new YdbException("The session source has been shut down.")), + useSynchronizationContext: false + ); + session = await waiterTcs.Task.ConfigureAwait(false); + + if (CheckIdleSession(session) || TryGetIdleSession(out session)) return session; - } - // We might have closed a session in the meantime and no longer be at max capacity - // so try to open a new session and if that fails, loop again. session = await OpenNewSession(finalToken).ConfigureAwait(false); if (session != null) - { return session; - } } } - private async ValueTask OpenNewSession(CancellationToken cancellationToken) + private async ValueTask OpenNewSession(CancellationToken cancellationToken) { for (var numSessions = _numSessions; numSessions < _maxSessionSize; numSessions = _numSessions) { if (Interlocked.CompareExchange(ref _numSessions, numSessions + 1, numSessions) != numSessions) - { continue; - } try { + if (IsDisposed) + throw new YdbException("The session source has been shut down."); + var session = _sessionFactory.NewSession(this); await session.Open(cancellationToken); - // Only start pruning if we've incremented open count past _min. - // Note that we don't do it only once, on equality, because the thread which incremented open count past _min might get exception - // on Session.Open due to timeout, CancellationToken or other reasons. - if (numSessions >= _minSessionSize) + for (var i = 0; i < _maxSessionSize; i++) { - UpdatePruningTimer(); + if (Interlocked.CompareExchange(ref _sessions[i], session, null) == null) + return session; } - return session; + throw new YdbException( + $"Could not find free slot in {_sessions} when opening. Please report a bug."); } catch { - // Physical open failed, decrement the open and busy counter back down. Interlocked.Decrement(ref _numSessions); - // In case there's a waiting attempt on the channel, we write a null to the idle session channel - // to wake it up, so it will try opening (and probably throw immediately) - // Statement order is important since we have synchronous completions on the channel. - _idleSessionWriter.TryWrite(null); - - // Just in case we always call UpdatePruningTimer for failed physical open - UpdatePruningTimer(); + WakeUpWaiter(); throw; } @@ -161,130 +182,156 @@ private async ValueTask RentAsync(CancellationToken cancellatio return null; } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool TryGetIdleSession([NotNullWhen(true)] out IPoolingSession? session) + private void WakeUpWaiter() { - while (_idleSessionReader.TryRead(out session)) + while (_waiters.TryDequeue(out var waiter) && waiter.TrySetResult(null)) { - if (CheckIdleSession(session)) - { - return true; - } - } - - return false; + } // wake up waiter! } - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private bool CheckIdleSession([NotNullWhen(true)] IPoolingSession? session) + public void Return(T session) { - if (session == null) + if (session.IsBroken || IsDisposed) { - return false; + CloseSession(session); + + return; } - // Only decrement when the session has a value. - Interlocked.Decrement(ref _idleCount); + // Statement order is important + session.IdleStartTime = DateTime.Now; + session.Set(PoolingSessionState.In); - if (session.IsBroken) + while (_waiters.TryDequeue(out var waiter)) { - CloseSession(session); - - return false; + if (waiter.TrySetResult(session)) + { + return; + } } - return true; + _idleSessions.Push(session); + + WakeUpWaiter(); } - private void CloseSession(IPoolingSession session) + private void CloseSession(T session) { - session.DeleteSession(); + var i = 0; + for (; i < _maxSessionSize; i++) + if (Interlocked.CompareExchange(ref _sessions[i], null, session) == session) + break; - var numSessions = Interlocked.Decrement(ref _numSessions); + if (i == _maxSessionSize) + return; - // If a session has been closed for any reason, we write a null to the idle session channel to wake up - // a waiter, who will open a new physical connection - // Statement order is important since we have synchronous completions on the channel. - _idleSessionWriter.TryWrite(null); + _ = session.DeleteSession(); - // Only turn off the timer one time, when it was this Close that brought Open back to _min. - if (numSessions == _minSessionSize) - { - UpdatePruningTimer(); - } + Interlocked.Decrement(ref _numSessions); + + // If a session has been closed for any reason, we write a null to the idle sessions to wake up + // a waiter, who will open a new session. + WakeUpWaiter(); } - private void UpdatePruningTimer() + private static void CleanIdleSessions(object? state) { - lock (_pruningTimer) + var pool = (PoolingSessionSource)state!; + var now = DateTime.Now; + + for (var i = 0; i < pool._maxSessionSize; i++) { - var numSessions = _numSessions; - if (numSessions > _minSessionSize && !_pruningTimerEnabled) - { - _pruningTimerEnabled = true; - _pruningTimer.Change(_pruningSamplingInterval, Timeout.InfiniteTimeSpan); - } - else if (numSessions <= _minSessionSize && _pruningTimerEnabled) + var session = Volatile.Read(ref pool._sessions[i]); + + if ( + session != null && + pool._numSessions > pool._minSessionSize && + session.IdleStartTime + pool._sessionIdleTimeout <= now && + session.CompareAndSet(PoolingSessionState.In, PoolingSessionState.Clean) + ) { - _pruningTimer.Change(Timeout.Infinite, Timeout.Infinite); - _pruningSampleIndex = 0; - _pruningTimerEnabled = false; + pool.CloseSession(session); } } } - private static void PruneIdleSessions(object? state) + public async ValueTask DisposeAsync() { - var pool = (PoolingSessionSource)state!; - var samples = pool._pruningSamples; - int toPrune; - lock (pool._pruningTimer) + if (Interlocked.CompareExchange(ref _disposed, 1, 0) != 0) { - // Check if we might have been contending with DisablePruning. - if (!pool._pruningTimerEnabled) - return; - - var sampleIndex = pool._pruningSampleIndex; - samples[sampleIndex] = pool._idleCount; - if (sampleIndex != pool._pruningSampleSize - 1) - { - pool._pruningSampleIndex = sampleIndex + 1; - pool._pruningTimer.Change(pool._pruningSamplingInterval, Timeout.InfiniteTimeSpan); - return; - } - - // Calculate median value for pruning, reset index and timer, and release the lock. - Array.Sort(samples); - toPrune = samples[pool._pruningMedianIndex]; - pool._pruningSampleIndex = 0; - pool._pruningTimer.Change(pool._pruningSamplingInterval, Timeout.InfiniteTimeSpan); + return; } - while (toPrune > 0 && - pool._numSessions > pool._minSessionSize && - pool._idleSessionReader.TryRead(out var session) && - session != null) + await _cleanerTimer.DisposeAsync(); + _disposeCts.Cancel(); + + var spinWait = new SpinWait(); + do { - if (pool.CheckIdleSession(session)) + for (var i = 0; i < _maxSessionSize; i++) { - pool.CloseSession(session); + var session = Volatile.Read(ref _sessions[i]); + + if (session != null && session.CompareAndSet(PoolingSessionState.In, PoolingSessionState.Clean)) + { + CloseSession(session); + } } - toPrune--; - } + spinWait.SpinOnce(); + } while (_numSessions > 0); + + await _sessionFactory.DisposeAsync(); } +} - private static int DivideRoundingUp(int dividend, int divisor) => (dividend + divisor - 1) / divisor; +internal interface IPoolingSessionFactory : IAsyncDisposable where T : PoolingSessionBase +{ + T NewSession(PoolingSessionSource source); } -internal interface IPoolingSessionFactory +internal enum PoolingSessionState { - IPoolingSession NewSession(PoolingSessionSource source); + In, + Out, + Clean } -internal interface IPoolingSession : ISession +internal abstract class PoolingSessionBase : ISession where T : PoolingSessionBase { - Task Open(CancellationToken cancellationToken); + private readonly PoolingSessionSource _source; + + private int _state = (int)PoolingSessionState.Out; + + protected PoolingSessionBase(PoolingSessionSource source) + { + _source = source; + } + + internal bool CompareAndSet(PoolingSessionState expected, PoolingSessionState actual) => + Interlocked.CompareExchange(ref _state, (int)actual, (int)expected) == (int)expected; + + internal void Set(PoolingSessionState state) => Interlocked.Exchange(ref _state, (int)state); + + internal DateTime IdleStartTime { get; set; } + + public abstract IDriver Driver { get; } + + public abstract bool IsBroken { get; } + + internal abstract Task Open(CancellationToken cancellationToken); + + internal abstract Task DeleteSession(); + + public abstract ValueTask> ExecuteQuery(string query, + Dictionary parameters, GrpcRequestSettings settings, + TransactionControl? txControl); + + public abstract Task CommitTransaction(string txId, CancellationToken cancellationToken = default); + + public abstract Task RollbackTransaction(string txId, CancellationToken cancellationToken = default); + + public abstract void OnNotSuccessStatusCode(StatusCode code); - Task DeleteSession(); + public void Close() => _source.Return((T)this); } diff --git a/src/Ydb.Sdk/src/Ado/YdbConnection.cs b/src/Ydb.Sdk/src/Ado/YdbConnection.cs index 7e0ae746..de1e3c4d 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnection.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnection.cs @@ -116,7 +116,7 @@ public override async Task OpenAsync(CancellationToken cancellationToken) } catch (OperationCanceledException e) { - throw new YdbException(StatusCode.Cancelled, + throw new YdbException(StatusCode.ClientTransportTimeout, $"The connection pool has been exhausted, either raise 'MaxSessionPool' " + $"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " + $"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e diff --git a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs index 4f5dab33..d135d1ea 100644 --- a/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs +++ b/src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs @@ -32,7 +32,6 @@ private void InitDefaultValues() _maxSessionPool = SessionPoolDefaultSettings.MaxSessionPool; _createSessionTimeout = SessionPoolDefaultSettings.CreateSessionTimeoutSeconds; _sessionIdleTimeout = 300; - _sessionPruningInterval = 10; _useTls = false; _connectTimeout = GrpcDefaultSettings.ConnectTimeoutSeconds; _keepAlivePingDelay = GrpcDefaultSettings.KeepAlivePingSeconds; @@ -160,24 +159,6 @@ public int SessionIdleTimeout private int _sessionIdleTimeout; - public int SessionPruningInterval - { - get => _sessionPruningInterval; - set - { - if (value <= 0) - { - throw new ArgumentOutOfRangeException(nameof(value), value, - "Invalid session pruning interval: " + value); - } - - _sessionPruningInterval = value; - SaveValue(nameof(SessionPruningInterval), value); - } - } - - private int _sessionPruningInterval; - public bool UseTls { get => _useTls; @@ -473,10 +454,10 @@ static YdbConnectionOption() (builder, password) => builder.Password = password), "Password", "PWD", "PSW"); AddOption(new YdbConnectionOption(IntExtractor, (builder, maxSessionPool) => builder.MaxSessionPool = maxSessionPool), "MaxSessionPool", "Max Session Pool", "Maximum Pool Size", - "MaximumPoolSize", "Max Pool Size", "MaxPoolSize"); + "MaximumPoolSize", "Max Pool Size", "MaxPoolSize", "MaxSessionSize", "Max Session Size"); AddOption(new YdbConnectionOption(IntExtractor, (builder, minSessionSize) => builder.MinSessionPool = minSessionSize), "MinSessionPool", "Min Session Pool", "Minimum Pool Size", - "MinimumPoolSize", "Min Pool Size", "MinPoolSize"); + "MinimumPoolSize", "Min Pool Size", "MinPoolSize", "MinSessionSize", "Min Session Size"); AddOption(new YdbConnectionOption(BoolExtractor, (builder, useTls) => builder.UseTls = useTls), "UseTls", "Use Tls"); AddOption(new YdbConnectionOption(StringExtractor, @@ -507,9 +488,6 @@ static YdbConnectionOption() AddOption(new YdbConnectionOption(IntExtractor, (builder, sessionIdleTimeout) => builder.SessionIdleTimeout = sessionIdleTimeout), "SessionIdleTimeout", "Session Idle Timeout"); - AddOption(new YdbConnectionOption(IntExtractor, - (builder, sessionPruningInterval) => builder.SessionPruningInterval = sessionPruningInterval), - "SessionPruningInterval", "Session Pruning Interval"); AddOption(new YdbConnectionOption(BoolExtractor, (builder, disableServerBalancer) => builder.DisableServerBalancer = disableServerBalancer), "DisableServerBalancer", "Disable Server Balancer"); diff --git a/src/Ydb.Sdk/src/Ado/YdbSchema.cs b/src/Ydb.Sdk/src/Ado/YdbSchema.cs index eb2a2823..36bb77df 100644 --- a/src/Ydb.Sdk/src/Ado/YdbSchema.cs +++ b/src/Ydb.Sdk/src/Ado/YdbSchema.cs @@ -423,9 +423,7 @@ CancellationToken cancellationToken ); var operation = response.Operation; - var status = Status.FromProto(operation.Status, operation.Issues); - - if (status.IsNotSuccess) + if (operation.Status.IsNotSuccess()) { throw YdbException.FromServer(operation.Status, operation.Issues); } diff --git a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs index 915f69c3..fe2e06da 100644 --- a/src/Ydb.Sdk/src/Services/Query/SessionPool.cs +++ b/src/Ydb.Sdk/src/Services/Query/SessionPool.cs @@ -180,7 +180,7 @@ public ValueTask> ExecuteQuery( public new void OnNotSuccessStatusCode(StatusCode code) => base.OnNotSuccessStatusCode(code); - public void Close() => Release(); + public void Close() => Release().AsTask().GetAwaiter().GetResult(); public async Task CommitTransaction(string txId, CancellationToken cancellationToken = default) { diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md index 774fe92e..393bb083 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md @@ -17,6 +17,12 @@ Allocated : Allocated memory per single operation (managed only, incl 1024B) 1 ns : 1 Nanosecond (0.000000001 sec) +BenchmarkDotNet v0.15.1, macOS Sequoia 15.5 (24F74) [Darwin 24.5.0] +Apple M2 Pro, 1 CPU, 12 logical and 12 physical cores +.NET SDK 9.0.201 +[Host] : .NET 8.0.2 (8.0.224.6711), Arm64 RyuJIT AdvSIMD +DefaultJob : .NET 8.0.2 (8.0.224.6711), Arm64 RyuJIT AdvSIMD + # YDB .NET SDK Session Pool V1 On Semaphore-Based | Method | Mean | Error | StdDev | Completed Work Items | Lock Contentions | Gen0 | Gen1 | Allocated | @@ -38,3 +44,14 @@ Allocated : Allocated memory per single operation (managed only, incl | SessionReuse_Pattern | 159,954.05 ns | 2,820.324 ns | 4,044.825 ns | 220.0000 | 6.0381 | 0.7324 | - | 7307 B | | SessionReuse_HighContention_Pattern | 8,914,529.81 ns | 46,900.448 ns | 41,576.026 ns | 19756.6563 | 149.0469 | 625.0000 | 93.7500 | 5289794 B | | SessionReuse_HighIterations_Pattern | 81,211,792.96 ns | 749,115.160 ns | 664,071.077 ns | 200020.0000 | 614.8571 | - | - | 7458 B | + +# YDB .NET SDK Session Pool Benchmarks (FIFO lock-free) + +| Method | Mean | Error | StdDev | Completed Work Items | Lock Contentions | Gen0 | Gen1 | Allocated | +|-------------------------------------|-----------------:|-----------------:|-----------------:|---------------------:|-----------------:|---------:|---------:|----------:| +| SingleThreaded_OpenClose | 60.71 ns | 0.441 ns | 0.368 ns | - | - | 0.0038 | - | 32 B | +| MultiThreaded_OpenClose | 23,368.69 ns | 464.175 ns | 1,129.867 ns | 40.0049 | - | 0.9460 | - | 7887 B | +| HighContention_OpenClose | 91,700.72 ns | 1,803.206 ns | 3,842.780 ns | 204.6633 | 0.0007 | 5.0049 | - | 41951 B | +| SessionReuse_Pattern | 117,545.11 ns | 2,226.365 ns | 4,014.595 ns | 220.0000 | 0.0001 | 1.5869 | - | 13656 B | +| SessionReuse_HighContention_Pattern | 7,463,819.00 ns | 148,409.083 ns | 364,050.038 ns | 19044.6172 | 1.1719 | 765.6250 | 125.0000 | 6367528 B | +| SessionReuse_HighIterations_Pattern | 70,844,972.06 ns | 1,400,128.942 ns | 3,589,066.009 ns | 200020.0000 | - | 750.0000 | - | 6407440 B | \ No newline at end of file diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs index f7683ade..56a4916d 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs @@ -9,7 +9,7 @@ namespace Ydb.Sdk.Ado.Benchmarks; [ThreadingDiagnoser] public class SessionSourceBenchmark { - private PoolingSessionSource _poolingSessionSource = null!; + private PoolingSessionSource _poolingSessionSource = null!; private const int SessionPoolSize = 50; private const int ConcurrentTasks = 20; @@ -18,7 +18,7 @@ public void Setup() { var settings = new YdbConnectionStringBuilder { MaxSessionPool = SessionPoolSize }; - _poolingSessionSource = new PoolingSessionSource(new MockSessionFactory(), settings); + _poolingSessionSource = new PoolingSessionSource(new MockSessionFactory(), settings); } [Benchmark] @@ -133,34 +133,35 @@ public async Task SessionReuse_HighIterations_Pattern() } } -internal class MockSessionFactory : IPoolingSessionFactory +internal class MockSessionFactory : IPoolingSessionFactory { - public IPoolingSession NewSession(PoolingSessionSource source) => new PoolingMockSession(source); + public MockPoolingSession NewSession(PoolingSessionSource source) => new(source); + + public ValueTask DisposeAsync() => ValueTask.CompletedTask; } -internal class PoolingMockSession(PoolingSessionSource source) : IPoolingSession +internal class MockPoolingSession(PoolingSessionSource source) + : PoolingSessionBase(source) { - public IDriver Driver => throw new NotImplementedException(); - - public bool IsBroken => false; - - public void Close() => source.Return(this); - - public async Task Open(CancellationToken cancellationToken) => await Task.Yield(); + public override IDriver Driver => null!; + public override bool IsBroken => false; - public Task DeleteSession() => Task.CompletedTask; + internal override async Task Open(CancellationToken cancellationToken) => await Task.Yield(); + internal override Task DeleteSession() => Task.CompletedTask; - public ValueTask> ExecuteQuery(string query, + public override ValueTask> ExecuteQuery( + string query, Dictionary parameters, GrpcRequestSettings settings, - TransactionControl? txControl) => throw new NotImplementedException(); + TransactionControl? txControl + ) => throw new NotImplementedException(); - public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) => + public override Task CommitTransaction(string txId, CancellationToken cancellationToken = default) => throw new NotImplementedException(); - public Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) => + public override Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) => throw new NotImplementedException(); - public void OnNotSuccessStatusCode(StatusCode code) + public override void OnNotSuccessStatusCode(StatusCode code) { } } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs index 6c67e223..9ad917c6 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using Xunit; using Ydb.Query; using Ydb.Sdk.Ado.Session; @@ -9,59 +10,450 @@ public class PoolingSessionSourceMockTests { [Fact] public void MinSessionPool_bigger_than_MaxSessionPool_throws() => Assert.Throws(() => - new PoolingSessionSource(new MockPoolingSessionFactory(), + new PoolingSessionSource(new MockPoolingSessionFactory(1), new YdbConnectionStringBuilder { MaxSessionPool = 1, MinSessionPool = 2 }) ); [Fact] - public void SessionPruningInterval_bigger_than_SessionIdleTimeout_throws() => Assert.Throws(() => - new PoolingSessionSource(new MockPoolingSessionFactory(), new YdbConnectionStringBuilder - { SessionPruningInterval = 5, SessionIdleTimeout = 1 }) - ); + public async Task Reuse_Session_Before_Creating_new() + { + var sessionSource = new PoolingSessionSource(new MockPoolingSessionFactory(1), + new YdbConnectionStringBuilder()); + var session = await sessionSource.OpenSession(); + var sessionId = session.SessionId(); + session.Close(); + session = await sessionSource.OpenSession(); + Assert.Equal(sessionId, session.SessionId()); + } [Fact] - public async Task Reuse_Session_Before_Creating_new() + public async Task Creating_Session_Throw_Exception() + { + for (var it = 0; it < 10_000; it++) + { + const string errorMessage = "Error on open session"; + const int maxSessionSize = 200; + + var mockPoolingSessionFactory = new MockPoolingSessionFactory(maxSessionSize) + { + Open = sessionNum => + sessionNum <= maxSessionSize * 2 + ? Task.FromException(new YdbException(errorMessage)) + : Task.CompletedTask + }; + + var sessionSource = new PoolingSessionSource( + mockPoolingSessionFactory, new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize } + ); + + var tasks = new List(); + var countSuccess = 0; + + for (var i = 0; i < maxSessionSize * 4; i++) + { + tasks.Add(Task.Run(async () => + { + try + { + var session = await sessionSource.OpenSession(); + // ReSharper disable once AccessToModifiedClosure + Interlocked.Increment(ref countSuccess); + Assert.True(session.SessionId() > maxSessionSize * 2); + session.Close(); + } + catch (YdbException e) + { + Assert.Equal(errorMessage, e.Message); + } + })); + } + + await Task.WhenAll(tasks); + Assert.Equal(maxSessionSize * 2, Volatile.Read(ref countSuccess)); + Assert.True(maxSessionSize * 3 >= mockPoolingSessionFactory.SessionOpenedCount); + Assert.True(maxSessionSize * 2 < mockPoolingSessionFactory.SessionOpenedCount); + } + } + + [Fact] + public async Task HighContention_OpenClose_NotCanceledException() + { + const int highContentionTasks = 100; + const int maxSessionSize = highContentionTasks / 2; + var mockPoolingSessionFactory = new MockPoolingSessionFactory(maxSessionSize); + var sessionSource = new PoolingSessionSource( + mockPoolingSessionFactory, new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize } + ); + + for (var it = 0; it < 100_000; it++) + { + var tasks = new Task[highContentionTasks]; + + for (var i = 0; i < highContentionTasks; i++) + { + tasks[i] = Task.Run(async () => + { + var session = await sessionSource.OpenSession(); + Assert.True(session.SessionId() <= maxSessionSize); + await Task.Yield(); + session.Close(); + }); + } + + await Task.WhenAll(tasks); + } + } + + [Fact] + public async Task DisposeAsync_Cancel_WaitersSession() + { + const int maxSessionSize = 10; + var mockFactory = new MockPoolingSessionFactory(maxSessionSize); + var sessionSource = new PoolingSessionSource( + mockFactory, new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize } + ); + + var openSessions = new List(); + var waitingSessionTasks = new List(); + for (var i = 0; i < maxSessionSize; i++) + { + openSessions.Add(await sessionSource.OpenSession()); + } + + for (var i = 0; i < maxSessionSize; i++) + { + waitingSessionTasks.Add(Task.Run(async () => + { + var session = await sessionSource.OpenSession(); + session.Close(); + })); + } + + var disposeTask = Task.Run(async () => await sessionSource.DisposeAsync()); + Assert.Equal(maxSessionSize, mockFactory.NumSession); + await Task.Delay(5_000); + for (var i = 0; i < maxSessionSize; i++) + { + openSessions[i].Close(); + } + + await disposeTask; + Assert.Equal(0, mockFactory.NumSession); + for (var i = 0; i < maxSessionSize; i++) + { + Assert.Equal("The session source has been shut down.", + (await Assert.ThrowsAsync(() => waitingSessionTasks[i])).Message); + } + + Assert.Equal("The session source has been shut down.", + (await Assert.ThrowsAsync(async () => await sessionSource.OpenSession())).Message); + } + + [Fact] + public async Task StressTest_DisposeAsync_Close_Driver() + { + const int contentionTasks = 200; + const int maxSessionSize = 100; + for (var it = 0; it < 100_000; it++) + { + var disposeCalled = false; + var mockFactory = new MockPoolingSessionFactory(maxSessionSize) + { + Dispose = () => + { + Volatile.Write(ref disposeCalled, true); + return ValueTask.CompletedTask; + } + }; + var settings = new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize }; + var sessionSource = new PoolingSessionSource(mockFactory, settings); + var openSessionTasks = new List(); + for (var i = 0; i < contentionTasks; i++) + { + var itCopy = it; + var iCopy = i; + + openSessionTasks.Add(Task.Run(async () => + { + if (itCopy % contentionTasks == iCopy) + { + await sessionSource.DisposeAsync(); + return; + } + + try + { + var session = await sessionSource.OpenSession(); + await Task.Yield(); + session.Close(); + } + catch (YdbException e) + { + Assert.Equal("The session source has been shut down.", e.Message); + } + catch (OperationCanceledException) + { + } + })); + } + + await Task.WhenAll(openSessionTasks); + Assert.Equal(0, mockFactory.NumSession); + Assert.True(disposeCalled); + } + } + + [Fact] + public async Task IdleTimeout_MinSessionSize_CloseNumSessionsMinusMinSessionCount() + { + const int maxSessionSize = 50; + const int minSessionSize = 10; + const int idleTimeoutSeconds = 1; + + var mockFactory = new MockPoolingSessionFactory(maxSessionSize); + var settings = new YdbConnectionStringBuilder + { + SessionIdleTimeout = idleTimeoutSeconds, + MaxSessionPool = maxSessionSize, + MinSessionPool = minSessionSize + }; + var sessionSource = new PoolingSessionSource(mockFactory, settings); + + var openSessions = new List(); + for (var it = 0; it < maxSessionSize; it++) + { + openSessions.Add(await sessionSource.OpenSession()); + } + + foreach (var it in openSessions) + { + it.Close(); + } + + await Task.Delay(TimeSpan.FromSeconds(idleTimeoutSeconds * 5)); // cleaning idle sessions + Assert.Equal(minSessionSize, mockFactory.NumSession); + + var openSessionTasks = new List>(); + for (var it = 0; it < minSessionSize; it++) + { + openSessionTasks.Add(Task.Run(async () => await sessionSource.OpenSession())); + } + + foreach (var it in openSessionTasks) + { + (await it).Close(); + } + + Assert.Equal(minSessionSize, mockFactory.NumSession); + Assert.Equal(maxSessionSize, mockFactory.SessionOpenedCount); + } + + [Fact] + public async Task StressTest_HighContention_OpenClose() + { + var cts = new CancellationTokenSource(); + cts.CancelAfter(TimeSpan.FromMinutes(1)); + + const int maxSessionSize = 50; + const int minSessionSize = 10; + const int highContentionTasks = maxSessionSize * 5; + var sessionIdIsBroken = new ConcurrentDictionary(); + + var mockFactory = new MockPoolingSessionFactory(maxSessionSize) + { + IsBroken = sessionNum => + { + var isBroken = Random.Shared.NextDouble() < 0.2; + sessionIdIsBroken[sessionNum] = isBroken; + return isBroken; + }, + Open = sessionNum => + { + sessionIdIsBroken[sessionNum] = false; + return Task.CompletedTask; + } + }; + var settings = new YdbConnectionStringBuilder + { MaxSessionPool = maxSessionSize, MinSessionPool = minSessionSize }; + var sessionSource = new PoolingSessionSource(mockFactory, settings); + + var workers = new List(); + for (var it = 0; it < highContentionTasks; it++) + { + workers.Add(Task.Run(async () => + { + try + { + while (!cts.IsCancellationRequested) + { + var session = await sessionSource.OpenSession(cts.Token); + Assert.False(sessionIdIsBroken[session.SessionId()]); + session.Close(); + await Task.Delay(Random.Shared.Next(maxSessionSize), cts.Token); + } + } + catch (OperationCanceledException) + { + } + }, cts.Token)); + } + + await Task.WhenAll(workers); + } + + [Fact] + public async Task Get_Session_From_Exhausted_Pool() { - var sessionSource = new PoolingSessionSource(new MockPoolingSessionFactory(), new YdbConnectionStringBuilder()); - var session = (MockPoolingSession)await sessionSource.OpenSession(); - var sessionId = session.SessionId; + var mockFactory = new MockPoolingSessionFactory(1); + var settings = new YdbConnectionStringBuilder + { + MaxSessionPool = 1, + MinSessionPool = 0 + }; + + var sessionSource = new PoolingSessionSource(mockFactory, settings); + var session = await sessionSource.OpenSession(); + var cts = new CancellationTokenSource(); + cts.CancelAfter(500); + + await Assert.ThrowsAsync(async () => await sessionSource.OpenSession(cts.Token)); session.Close(); - session = (MockPoolingSession)await sessionSource.OpenSession(); - Assert.Equal(sessionId, session.SessionId); + + Assert.Equal(1, mockFactory.NumSession); + Assert.Equal(1, mockFactory.SessionOpenedCount); + } + + [Fact] + public async Task Return_IsBroken_Session() + { + const int maxSessionSize = 10; + var mockFactory = new MockPoolingSessionFactory(maxSessionSize) { IsBroken = _ => true }; + var settings = new YdbConnectionStringBuilder + { + MaxSessionPool = maxSessionSize, + MinSessionPool = 0 + }; + var sessionSource = new PoolingSessionSource(mockFactory, settings); + + for (var it = 0; it < maxSessionSize * 2; it++) + { + var session = await sessionSource.OpenSession(); + session.Close(); + } + + Assert.Equal(0, mockFactory.NumSession); + Assert.Equal(maxSessionSize * 2, mockFactory.SessionOpenedCount); + } + + [Fact] + public async Task CheckIdleSession_WhenIsBrokenInStack_CreateNewSession() + { + var isBroken = false; + const int maxSessionSize = 10; + // ReSharper disable once AccessToModifiedClosure + var mockFactory = new MockPoolingSessionFactory(maxSessionSize) { IsBroken = _ => isBroken }; + var settings = new YdbConnectionStringBuilder + { + MaxSessionPool = maxSessionSize, + MinSessionPool = 0 + }; + var sessionSource = new PoolingSessionSource(mockFactory, settings); + + var openSessions = new List(); + for (var it = 0; it < maxSessionSize; it++) + { + openSessions.Add(await sessionSource.OpenSession()); + } + + foreach (var session in openSessions) + { + session.Close(); + } + + Assert.Equal(maxSessionSize, mockFactory.NumSession); + + isBroken = true; + for (var it = 0; it < maxSessionSize; it++) + { + var session = await sessionSource.OpenSession(); + isBroken = false; + session.Close(); + } + + Assert.Equal(1, mockFactory.NumSession); + Assert.Equal(maxSessionSize + 1, mockFactory.SessionOpenedCount); } } -internal class MockPoolingSessionFactory : IPoolingSessionFactory +internal static class ISessionExtension { - private int _sessionNum; - - public IPoolingSession NewSession(PoolingSessionSource source) => - new MockPoolingSession(source, Interlocked.Increment(ref _sessionNum)); + internal static int SessionId(this ISession session) => ((MockPoolingSession)session).SessionId; } -internal class MockPoolingSession(PoolingSessionSource source, int sessionNum) : IPoolingSession +internal class MockPoolingSessionFactory(int maxSessionSize) : IPoolingSessionFactory { - internal string SessionId { get; } = $"session_{sessionNum}"; + private int _sessionOpened; + private int _numSession; - public IDriver Driver => throw new NotImplementedException(); - public bool IsBroken { get; set; } + internal int SessionOpenedCount => Volatile.Read(ref _sessionOpened); + internal int NumSession => Volatile.Read(ref _numSession); - public void Close() => source.Return(this); + internal Func Open { private get; init; } = _ => Task.CompletedTask; + internal Func IsBroken { private get; init; } = _ => false; + internal Func Dispose { private get; init; } = () => ValueTask.CompletedTask; - public Task Open(CancellationToken cancellationToken) => Task.CompletedTask; + public MockPoolingSession NewSession(PoolingSessionSource source) => + new(source, + async sessionCountOpened => + { + await Open(sessionCountOpened); - public Task DeleteSession() => Task.CompletedTask; + Assert.True(Interlocked.Increment(ref _numSession) <= maxSessionSize); - public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) => - throw new NotImplementedException(); + await Task.Yield(); + }, + () => + { + Assert.True(Interlocked.Decrement(ref _numSession) >= 0); + + return Task.CompletedTask; + }, + sessionNum => IsBroken(sessionNum), + Interlocked.Increment(ref _sessionOpened) + ); + + public ValueTask DisposeAsync() => Dispose(); +} + +internal class MockPoolingSession( + PoolingSessionSource source, + Func mockOpen, + Func mockDeleteSession, + Func mockIsBroken, + int sessionNum +) : PoolingSessionBase(source) +{ + public int SessionId => sessionNum; + public override IDriver Driver => null!; + public override bool IsBroken => mockIsBroken(sessionNum); + + internal override Task Open(CancellationToken cancellationToken) => mockOpen(sessionNum); + internal override Task DeleteSession() => mockDeleteSession(); + + public override ValueTask> ExecuteQuery( + string query, + Dictionary parameters, + GrpcRequestSettings settings, + TransactionControl? txControl + ) => throw new NotImplementedException(); - public Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) => + public override Task CommitTransaction(string txId, CancellationToken cancellationToken = default) => throw new NotImplementedException(); - public ValueTask> ExecuteQuery(string query, - Dictionary parameters, GrpcRequestSettings settings, - TransactionControl? txControl) => + public override Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) => throw new NotImplementedException(); - public void OnNotSuccessStatusCode(StatusCode code) => throw new NotImplementedException(); + public override void OnNotSuccessStatusCode(StatusCode code) + { + } } diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs index c59fc739..f50d8a88 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionTests.cs @@ -17,7 +17,7 @@ public class PoolingSessionTests private readonly Mock _mockIDriver; private readonly Mock> _mockAttachStream = new(MockBehavior.Strict); private readonly PoolingSessionFactory _poolingSessionFactory; - private readonly PoolingSessionSource _poolingSessionSource; + private readonly PoolingSessionSource _poolingSessionSource; public PoolingSessionTests() { @@ -32,7 +32,7 @@ public PoolingSessionTests() ).ReturnsAsync(_mockAttachStream.Object); _mockAttachStream.Setup(stream => stream.Dispose()); _poolingSessionFactory = new PoolingSessionFactory(_mockIDriver.Object, settings, TestUtils.LoggerFactory); - _poolingSessionSource = new PoolingSessionSource(_poolingSessionFactory, settings); + _poolingSessionSource = new PoolingSessionSource(_poolingSessionFactory, settings); } [Theory] @@ -186,8 +186,8 @@ public async Task Delete_WhenSuccessOpenSession_IsBroken() _mockIDriver.Setup(driver => driver.UnaryCall( QueryService.DeleteSessionMethod, It.Is(request => request.SessionId.Equals(SessionId)), - It.Is(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId)) - ); + It.Is(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId) + )).ReturnsAsync(new DeleteSessionResponse { Status = StatusIds.Types.StatusCode.Success }); Assert.False(session.IsBroken); await session.DeleteSession(); Assert.True(session.IsBroken); @@ -206,8 +206,8 @@ public async Task Delete_WhenSuccessOpenSessionBadSessionInAttachStream_IsBroken _mockIDriver.Setup(driver => driver.UnaryCall( QueryService.DeleteSessionMethod, It.Is(request => request.SessionId.Equals(SessionId)), - It.Is(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId)) - ); + It.Is(grpcRequestSettings => grpcRequestSettings.NodeId == NodeId) + )).ReturnsAsync(new DeleteSessionResponse { Status = StatusIds.Types.StatusCode.Success }); Assert.False(session.IsBroken); session.OnNotSuccessStatusCode(StatusCode.BadSession); await session.DeleteSession(); diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs index 62372c4d..736b1e96 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionStringBuilderTests.cs @@ -16,7 +16,6 @@ public void InitDefaultValues_WhenEmptyConstructorInvoke_ReturnDefaultConnection Assert.Equal(100, ydbConnectionStringBuilder.MaxSessionPool); Assert.Equal(5, ydbConnectionStringBuilder.CreateSessionTimeout); Assert.Equal(300, ydbConnectionStringBuilder.SessionIdleTimeout); - Assert.Equal(10, ydbConnectionStringBuilder.SessionPruningInterval); Assert.Null(ydbConnectionStringBuilder.User); Assert.Null(ydbConnectionStringBuilder.Password); Assert.Equal(5, ydbConnectionStringBuilder.ConnectTimeout); @@ -47,7 +46,7 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection var connectionString = new YdbConnectionStringBuilder( "Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=true;" + "MinSessionPool=10;MaxSessionPool=50;CreateSessionTimeout=30;" + - "SessionIdleTimeout=600;SessionPruningInterval=20;" + + "SessionIdleTimeout=600;" + "ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" + "EnableMultipleHttp2Connections=true;" + "MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" + @@ -61,7 +60,6 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection Assert.Equal(50, connectionString.MaxSessionPool); Assert.Equal(30, connectionString.CreateSessionTimeout); Assert.Equal(600, connectionString.SessionIdleTimeout); - Assert.Equal(20, connectionString.SessionPruningInterval); Assert.Equal("Kirill", connectionString.User); Assert.Equal(30, connectionString.ConnectTimeout); Assert.Equal(30, connectionString.KeepAlivePingDelay); @@ -72,7 +70,7 @@ public void InitConnectionStringBuilder_WhenExpectedKeys_ReturnUpdatedConnection Assert.Equal(1000000, connectionString.MaxReceiveMessageSize); Assert.Equal("Host=server;Port=2135;Database=/my/path;User=Kirill;UseTls=True;" + "MinSessionPool=10;MaxSessionPool=50;CreateSessionTimeout=30;" + - "SessionIdleTimeout=600;SessionPruningInterval=20;" + + "SessionIdleTimeout=600;" + "ConnectTimeout=30;KeepAlivePingDelay=30;KeepAlivePingTimeout=60;" + "EnableMultipleHttp2Connections=True;" + "MaxSendMessageSize=1000000;MaxReceiveMessageSize=1000000;" + diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs index 80dea452..06d004b4 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs @@ -19,31 +19,19 @@ public sealed class YdbConnectionTests : TestBase [Fact] public async Task ClearPool_WhenHasActiveConnection_CloseActiveConnectionOnClose() { - var tasks = GenerateTasks(); - tasks.Add(YdbConnection.ClearPool(CreateConnection())); - tasks.AddRange(GenerateTasks()); - await Task.WhenAll(tasks); - Assert.Equal(9900, _counter); + var connectionString = ConnectionString + ";MaxSessionPool=100"; - tasks = GenerateTasks(); - tasks.Add(YdbConnection.ClearPool(CreateConnection())); + var tasks = GenerateTasks(connectionString); + tasks.Add(YdbConnection.ClearPool(new YdbConnection(connectionString))); + tasks.AddRange(GenerateTasks(connectionString)); await Task.WhenAll(tasks); - Assert.Equal(14850, _counter); - } + Assert.Equal(999000, _counter); - [Fact] - public async Task ClearPoolAllPools_WhenHasActiveConnection_CloseActiveConnectionOnClose() - { - var tasks = GenerateTasks(); - tasks.Add(YdbConnection.ClearAllPools()); - tasks.AddRange(GenerateTasks()); + tasks = GenerateTasks(connectionString); + tasks.Add(YdbConnection.ClearPool(new YdbConnection(connectionString))); await Task.WhenAll(tasks); - Assert.Equal(9900, _counter); - - tasks = GenerateTasks(); - tasks.Add(YdbConnection.ClearAllPools()); - await Task.WhenAll(tasks); - Assert.Equal(14850, _counter); + Assert.Equal(1498500, _counter); + await YdbConnection.ClearPool(new YdbConnection(connectionString)); } // docker cp ydb-local:/ydb_certs/ca.pem ~/ @@ -211,7 +199,7 @@ public async Task DisableDiscovery_WhenPropertyIsTrue_SimpleWorking() public async Task OpenAsync_WhenCancelTokenIsCanceled_ThrowYdbException() { await using var connection = CreateConnection(); - connection.ConnectionString = ConnectionString; + connection.ConnectionString = ConnectionString + ";MinSessionPool=1"; using var cts = new CancellationTokenSource(); cts.Cancel(); Assert.Equal("The connection pool has been exhausted, either raise 'MaxSessionPool' (currently 10) " + @@ -296,9 +284,22 @@ public async Task ExecuteReaderAsync_WhenExecutedYdbDataReaderThenCancelTokenIsC Assert.False(await ydbDataReader.NextResultAsync()); } - private List GenerateTasks() => Enumerable.Range(0, 100).Select(async i => + private List GenerateTasks(string connectionString) => Enumerable.Range(0, 1000).Select(async i => { - await using var connection = await CreateOpenConnectionAsync(); + YdbConnection ydbConnection; + try + { + ydbConnection = CreateConnection(); + ydbConnection.ConnectionString = connectionString; + await ydbConnection.OpenAsync(); + } + catch (YdbException) + { + Interlocked.Add(ref _counter, i); + return; + } + + await using var connection = ydbConnection; var command = connection.CreateCommand(); command.CommandText = "SELECT " + i; var scalar = (int)(await command.ExecuteScalarAsync())!; diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs index c3a54f89..f4c468dc 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbDataSourceTests.cs @@ -28,18 +28,21 @@ public async Task OpenConnectionAsync_WhenMaxSessionPool10_ReturnOpenConnection( [Fact] public void CreateCommand_FromDataSource_ReturnDbCommand() { + var dataSource = new YdbDataSource(ConnectionString + ";MaxSessionPool=5"); for (var i = 0; i < SelectedCount; i++) { - using var command = _dataSource.CreateCommand("SELECT 1;"); + using var command = dataSource.CreateCommand("SELECT 1;"); Assert.Equal(1, command.ExecuteScalar()); } - _dataSource.Dispose(); + dataSource.Dispose(); for (var i = 0; i < SelectedCount; i++) { - using var command = _dataSource.CreateCommand("SELECT 1;"); + using var command = dataSource.CreateCommand("SELECT 1;"); Assert.Equal(1, command.ExecuteScalar()); } + + dataSource.Dispose(); } [Fact]