Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Feat ADO.NET: Added dispose timeout (10 seconds) to `PoolingSessionSource`.
- Feat ADO.NET: Added `EnableImplicitSession` to support implicit sessions.

## v0.23.1
Expand Down
7 changes: 5 additions & 2 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)

return TryAcquireLease()
? new ValueTask<ISession>(new ImplicitSession(_driver, this))
: throw new ObjectDisposedException(nameof(ImplicitSessionSource));
: throw new ObjectDisposedException(nameof(ImplicitSessionSource),
"The implicit session source has been closed.");
}

private bool TryAcquireLease()
Expand All @@ -41,7 +42,9 @@ private bool TryAcquireLease()

internal void ReleaseLease()
{
if (Interlocked.Decrement(ref _activeLeaseCount) == 0 && Volatile.Read(ref _isDisposed) == 1)
Interlocked.Decrement(ref _activeLeaseCount);

if (Volatile.Read(ref _isDisposed) == 1 && _activeLeaseCount == 0)
_drainedTcs.TrySetResult();
}

Expand Down
45 changes: 35 additions & 10 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using Ydb.Query;

namespace Ydb.Sdk.Ado.Session;

internal sealed class PoolingSessionSource<T> : ISessionSource where T : PoolingSessionBase<T>
{
private const int DisposeTimeoutSeconds = 10;

private readonly ConcurrentStack<T> _idleSessions = new();
private readonly ConcurrentQueue<TaskCompletionSource<T?>> _waiters = new();
private readonly CancellationTokenSource _disposeCts = new();
Expand All @@ -18,16 +22,14 @@ internal sealed class PoolingSessionSource<T> : ISessionSource where T : Pooling
private readonly int _createSessionTimeout;
private readonly TimeSpan _sessionIdleTimeout;
private readonly Timer _cleanerTimer;
private readonly ILogger _logger;

private volatile int _numSessions;
private volatile int _disposed;

private bool IsDisposed => _disposed == 1;

public PoolingSessionSource(
IPoolingSessionFactory<T> sessionFactory,
YdbConnectionStringBuilder settings
)
public PoolingSessionSource(IPoolingSessionFactory<T> sessionFactory, YdbConnectionStringBuilder settings)
{
_sessionFactory = sessionFactory;
_minSessionSize = settings.MinSessionPool;
Expand All @@ -43,6 +45,7 @@ YdbConnectionStringBuilder settings
_createSessionTimeout = settings.CreateSessionTimeout;
_sessionIdleTimeout = TimeSpan.FromSeconds(settings.SessionIdleTimeout);
_cleanerTimer = new Timer(CleanIdleSessions, this, _sessionIdleTimeout, _sessionIdleTimeout);
_logger = settings.LoggerFactory.CreateLogger<PoolingSessionSource<T>>();
}

public ValueTask<ISession> OpenSession(CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -132,7 +135,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
), useSynchronizationContext: false
);
await using var disposeRegistration = _disposeCts.Token.Register(
() => waiterTcs.TrySetException(new YdbException("The session source has been shut down.")),
() => waiterTcs.TrySetException(ObjectDisposedException),
useSynchronizationContext: false
);
session = await waiterTcs.Task.ConfigureAwait(false);
Expand All @@ -156,7 +159,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
try
{
if (IsDisposed)
throw new YdbException("The session source has been shut down.");
throw ObjectDisposedException;

var session = _sessionFactory.NewSession(this);
await session.Open(cancellationToken);
Expand All @@ -167,8 +170,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
return session;
}

throw new YdbException(
$"Could not find free slot in {_sessions} when opening. Please report a bug.");
throw new YdbException($"Could not find free slot in {_sessions} when opening. Please report a bug.");
}
catch
{
Expand Down Expand Up @@ -266,6 +268,7 @@ public async ValueTask DisposeAsync()
await _cleanerTimer.DisposeAsync();
_disposeCts.Cancel();

var sw = Stopwatch.StartNew();
var spinWait = new SpinWait();
do
{
Expand All @@ -280,10 +283,32 @@ public async ValueTask DisposeAsync()
}

spinWait.SpinOnce();
} while (_numSessions > 0);
} while (_numSessions > 0 && sw.Elapsed < TimeSpan.FromSeconds(DisposeTimeoutSeconds));

try
{
await _sessionFactory.DisposeAsync();
}
catch (Exception e)
{
_logger.LogError(e, "Failed to dispose the transport driver");
}

await _sessionFactory.DisposeAsync();
for (var i = 0; i < _maxSessionSize; i++)
{
var session = Volatile.Read(ref _sessions[i]);

if (session == null || session.CompareAndSet(PoolingSessionState.Clean, PoolingSessionState.Clean))
continue;
_logger.LogCritical("Disposal timed out: Some sessions are still active");

throw new YdbException("Timeout while disposing of the pool: some sessions are still active. " +
"This may indicate a connection leak or suspended operations.");
}
}

private Exception ObjectDisposedException =>
new ObjectDisposedException(nameof(PoolingSessionSource<T>), "The session source has been closed.");
}

internal interface IPoolingSessionFactory<T> : IAsyncDisposable where T : PoolingSessionBase<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ public async Task DisposeAsync_Cancel_WaitersSession()
Assert.Equal(0, mockFactory.NumSession);
for (var i = 0; i < maxSessionSize; i++)
{
Assert.Equal("The session source has been shut down.",
(await Assert.ThrowsAsync<YdbException>(() => waitingSessionTasks[i])).Message);
Assert.StartsWith("The session source has been closed.",
(await Assert.ThrowsAsync<ObjectDisposedException>(() => waitingSessionTasks[i])).Message);
}

Assert.Equal("The session source has been shut down.",
(await Assert.ThrowsAsync<YdbException>(async () => await sessionSource.OpenSession())).Message);
Assert.StartsWith("The session source has been closed.",
(await Assert.ThrowsAsync<ObjectDisposedException>(async () => await sessionSource.OpenSession())).Message);
}

[Fact]
Expand Down Expand Up @@ -185,10 +185,11 @@ public async Task StressTest_DisposeAsync_Close_Driver()
{
using var session = await sessionSource.OpenSession();
await Task.Yield();
Assert.False(disposeCalled);
}
catch (YdbException e)
catch (ObjectDisposedException e)
{
Assert.Equal("The session source has been shut down.", e.Message);
Assert.StartsWith("The session source has been closed.", e.Message);
}
catch (OperationCanceledException)
{
Expand All @@ -202,6 +203,34 @@ public async Task StressTest_DisposeAsync_Close_Driver()
}
}

[Fact]
public async Task DisposeAsync_WhenSessionIsLeaked_ThrowsYdbExceptionWithTimeoutMessage()
{
var disposeCalled = false;
const int maxSessionSize = 10;
var mockFactory = new MockPoolingSessionFactory(maxSessionSize)
{
Dispose = () =>
{
Volatile.Write(ref disposeCalled, true);
return ValueTask.CompletedTask;
}
};
var settings = new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize };
var sessionSource = new PoolingSessionSource<MockPoolingSession>(mockFactory, settings);

#pragma warning disable CA2012
_ = sessionSource.OpenSession(CancellationToken.None);
#pragma warning restore CA2012

Assert.Equal("Timeout while disposing of the pool: some sessions are still active. " +
"This may indicate a connection leak or suspended operations.",
(await Assert.ThrowsAsync<YdbException>(async () => await sessionSource.DisposeAsync())).Message);
Assert.True(disposeCalled);
await Assert.ThrowsAsync<ObjectDisposedException>(() =>
sessionSource.OpenSession(CancellationToken.None).AsTask());
}

[Fact]
public async Task IdleTimeout_MinSessionSize_CloseNumSessionsMinusMinSessionCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public async Task StressTest_OpenSession_RaceWithDispose_SuccessfulOpensAreNotDi
try
{
using var s = await source.OpenSession(CancellationToken.None);
await Task.Yield();
Assert.False(_isDisposed);
}
catch (ObjectDisposedException)
Expand Down
Loading