diff --git a/src/Ydb.Sdk/CHANGELOG.md b/src/Ydb.Sdk/CHANGELOG.md index c3908d52..f59e6b6b 100644 --- a/src/Ydb.Sdk/CHANGELOG.md +++ b/src/Ydb.Sdk/CHANGELOG.md @@ -1,3 +1,4 @@ +- Feat ADO.NET: Added dispose timeout (10 seconds) to `PoolingSessionSource`. - Feat ADO.NET: Added `EnableImplicitSession` to support implicit sessions. ## v0.23.1 diff --git a/src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs index 66d9c870..38887b9d 100644 --- a/src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs @@ -25,7 +25,8 @@ public ValueTask OpenSession(CancellationToken cancellationToken) return TryAcquireLease() ? new ValueTask(new ImplicitSession(_driver, this)) - : throw new ObjectDisposedException(nameof(ImplicitSessionSource)); + : throw new ObjectDisposedException(nameof(ImplicitSessionSource), + "The implicit session source has been closed."); } private bool TryAcquireLease() @@ -41,7 +42,9 @@ private bool TryAcquireLease() internal void ReleaseLease() { - if (Interlocked.Decrement(ref _activeLeaseCount) == 0 && Volatile.Read(ref _isDisposed) == 1) + Interlocked.Decrement(ref _activeLeaseCount); + + if (Volatile.Read(ref _isDisposed) == 1 && _activeLeaseCount == 0) _drainedTcs.TrySetResult(); } diff --git a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs index a0a70817..7cc57d1d 100644 --- a/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs +++ b/src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs @@ -1,12 +1,16 @@ using System.Collections.Concurrent; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Runtime.CompilerServices; +using Microsoft.Extensions.Logging; using Ydb.Query; namespace Ydb.Sdk.Ado.Session; internal sealed class PoolingSessionSource : ISessionSource where T : PoolingSessionBase { + private const int DisposeTimeoutSeconds = 10; + private readonly ConcurrentStack _idleSessions = new(); private readonly ConcurrentQueue> _waiters = new(); private readonly CancellationTokenSource _disposeCts = new(); @@ -18,16 +22,14 @@ internal sealed class PoolingSessionSource : ISessionSource where T : Pooling private readonly int _createSessionTimeout; private readonly TimeSpan _sessionIdleTimeout; private readonly Timer _cleanerTimer; + private readonly ILogger _logger; private volatile int _numSessions; private volatile int _disposed; private bool IsDisposed => _disposed == 1; - public PoolingSessionSource( - IPoolingSessionFactory sessionFactory, - YdbConnectionStringBuilder settings - ) + public PoolingSessionSource(IPoolingSessionFactory sessionFactory, YdbConnectionStringBuilder settings) { _sessionFactory = sessionFactory; _minSessionSize = settings.MinSessionPool; @@ -43,6 +45,7 @@ YdbConnectionStringBuilder settings _createSessionTimeout = settings.CreateSessionTimeout; _sessionIdleTimeout = TimeSpan.FromSeconds(settings.SessionIdleTimeout); _cleanerTimer = new Timer(CleanIdleSessions, this, _sessionIdleTimeout, _sessionIdleTimeout); + _logger = settings.LoggerFactory.CreateLogger>(); } public ValueTask OpenSession(CancellationToken cancellationToken = default) @@ -132,7 +135,7 @@ private async ValueTask RentAsync(CancellationToken cancellationToken) ), useSynchronizationContext: false ); await using var disposeRegistration = _disposeCts.Token.Register( - () => waiterTcs.TrySetException(new YdbException("The session source has been shut down.")), + () => waiterTcs.TrySetException(ObjectDisposedException), useSynchronizationContext: false ); session = await waiterTcs.Task.ConfigureAwait(false); @@ -156,7 +159,7 @@ private async ValueTask RentAsync(CancellationToken cancellationToken) try { if (IsDisposed) - throw new YdbException("The session source has been shut down."); + throw ObjectDisposedException; var session = _sessionFactory.NewSession(this); await session.Open(cancellationToken); @@ -167,8 +170,7 @@ private async ValueTask RentAsync(CancellationToken cancellationToken) return session; } - throw new YdbException( - $"Could not find free slot in {_sessions} when opening. Please report a bug."); + throw new YdbException($"Could not find free slot in {_sessions} when opening. Please report a bug."); } catch { @@ -266,6 +268,7 @@ public async ValueTask DisposeAsync() await _cleanerTimer.DisposeAsync(); _disposeCts.Cancel(); + var sw = Stopwatch.StartNew(); var spinWait = new SpinWait(); do { @@ -280,10 +283,32 @@ public async ValueTask DisposeAsync() } spinWait.SpinOnce(); - } while (_numSessions > 0); + } while (_numSessions > 0 && sw.Elapsed < TimeSpan.FromSeconds(DisposeTimeoutSeconds)); + + try + { + await _sessionFactory.DisposeAsync(); + } + catch (Exception e) + { + _logger.LogError(e, "Failed to dispose the transport driver"); + } - await _sessionFactory.DisposeAsync(); + for (var i = 0; i < _maxSessionSize; i++) + { + var session = Volatile.Read(ref _sessions[i]); + + if (session == null || session.CompareAndSet(PoolingSessionState.Clean, PoolingSessionState.Clean)) + continue; + _logger.LogCritical("Disposal timed out: Some sessions are still active"); + + throw new YdbException("Timeout while disposing of the pool: some sessions are still active. " + + "This may indicate a connection leak or suspended operations."); + } } + + private Exception ObjectDisposedException => + new ObjectDisposedException(nameof(PoolingSessionSource), "The session source has been closed."); } internal interface IPoolingSessionFactory : IAsyncDisposable where T : PoolingSessionBase diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs index f1a096af..b3ccd63a 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs @@ -141,12 +141,12 @@ public async Task DisposeAsync_Cancel_WaitersSession() Assert.Equal(0, mockFactory.NumSession); for (var i = 0; i < maxSessionSize; i++) { - Assert.Equal("The session source has been shut down.", - (await Assert.ThrowsAsync(() => waitingSessionTasks[i])).Message); + Assert.StartsWith("The session source has been closed.", + (await Assert.ThrowsAsync(() => waitingSessionTasks[i])).Message); } - Assert.Equal("The session source has been shut down.", - (await Assert.ThrowsAsync(async () => await sessionSource.OpenSession())).Message); + Assert.StartsWith("The session source has been closed.", + (await Assert.ThrowsAsync(async () => await sessionSource.OpenSession())).Message); } [Fact] @@ -185,10 +185,11 @@ public async Task StressTest_DisposeAsync_Close_Driver() { using var session = await sessionSource.OpenSession(); await Task.Yield(); + Assert.False(disposeCalled); } - catch (YdbException e) + catch (ObjectDisposedException e) { - Assert.Equal("The session source has been shut down.", e.Message); + Assert.StartsWith("The session source has been closed.", e.Message); } catch (OperationCanceledException) { @@ -202,6 +203,34 @@ public async Task StressTest_DisposeAsync_Close_Driver() } } + [Fact] + public async Task DisposeAsync_WhenSessionIsLeaked_ThrowsYdbExceptionWithTimeoutMessage() + { + var disposeCalled = false; + const int maxSessionSize = 10; + var mockFactory = new MockPoolingSessionFactory(maxSessionSize) + { + Dispose = () => + { + Volatile.Write(ref disposeCalled, true); + return ValueTask.CompletedTask; + } + }; + var settings = new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize }; + var sessionSource = new PoolingSessionSource(mockFactory, settings); + +#pragma warning disable CA2012 + _ = sessionSource.OpenSession(CancellationToken.None); +#pragma warning restore CA2012 + + Assert.Equal("Timeout while disposing of the pool: some sessions are still active. " + + "This may indicate a connection leak or suspended operations.", + (await Assert.ThrowsAsync(async () => await sessionSource.DisposeAsync())).Message); + Assert.True(disposeCalled); + await Assert.ThrowsAsync(() => + sessionSource.OpenSession(CancellationToken.None).AsTask()); + } + [Fact] public async Task IdleTimeout_MinSessionSize_CloseNumSessionsMinusMinSessionCount() { diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs index aac23983..bdea18c6 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs @@ -32,6 +32,7 @@ public async Task StressTest_OpenSession_RaceWithDispose_SuccessfulOpensAreNotDi try { using var s = await source.OpenSession(CancellationToken.None); + await Task.Yield(); Assert.False(_isDisposed); } catch (ObjectDisposedException) diff --git a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs index 0779f652..adf266d6 100644 --- a/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs +++ b/src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs @@ -288,7 +288,7 @@ private List GenerateTasks(string connectionString) => Enumerable.Range(0, ydbConnection.ConnectionString = connectionString; await ydbConnection.OpenAsync(); } - catch (YdbException) + catch (ObjectDisposedException) { Interlocked.Add(ref _counter, i); return;