Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Feat ADO.NET: Added dispose timeout (10 seconds) to `PoolingSessionSource`.
- Feat ADO.NET: Added `EnableImplicitSession` to support implicit sessions.

## v0.23.1
Expand Down
7 changes: 5 additions & 2 deletions src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)

return TryAcquireLease()
? new ValueTask<ISession>(new ImplicitSession(_driver, this))
: throw new ObjectDisposedException(nameof(ImplicitSessionSource));
: throw new ObjectDisposedException(nameof(ImplicitSessionSource),
"The implicit session source has been closed.");
}

private bool TryAcquireLease()
Expand All @@ -41,7 +42,9 @@ private bool TryAcquireLease()

internal void ReleaseLease()
{
if (Interlocked.Decrement(ref _activeLeaseCount) == 0 && Volatile.Read(ref _isDisposed) == 1)
Interlocked.Decrement(ref _activeLeaseCount);

if (Volatile.Read(ref _isDisposed) == 1 && _activeLeaseCount == 0)
_drainedTcs.TrySetResult();
}

Expand Down
45 changes: 35 additions & 10 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;
using Ydb.Query;

namespace Ydb.Sdk.Ado.Session;

internal sealed class PoolingSessionSource<T> : ISessionSource where T : PoolingSessionBase<T>
{
private const int DisposeTimeoutSeconds = 10;

private readonly ConcurrentStack<T> _idleSessions = new();
private readonly ConcurrentQueue<TaskCompletionSource<T?>> _waiters = new();
private readonly CancellationTokenSource _disposeCts = new();
Expand All @@ -18,16 +22,14 @@ internal sealed class PoolingSessionSource<T> : ISessionSource where T : Pooling
private readonly int _createSessionTimeout;
private readonly TimeSpan _sessionIdleTimeout;
private readonly Timer _cleanerTimer;
private readonly ILogger _logger;

private volatile int _numSessions;
private volatile int _disposed;

private bool IsDisposed => _disposed == 1;

public PoolingSessionSource(
IPoolingSessionFactory<T> sessionFactory,
YdbConnectionStringBuilder settings
)
public PoolingSessionSource(IPoolingSessionFactory<T> sessionFactory, YdbConnectionStringBuilder settings)
{
_sessionFactory = sessionFactory;
_minSessionSize = settings.MinSessionPool;
Expand All @@ -43,6 +45,7 @@ YdbConnectionStringBuilder settings
_createSessionTimeout = settings.CreateSessionTimeout;
_sessionIdleTimeout = TimeSpan.FromSeconds(settings.SessionIdleTimeout);
_cleanerTimer = new Timer(CleanIdleSessions, this, _sessionIdleTimeout, _sessionIdleTimeout);
_logger = settings.LoggerFactory.CreateLogger<PoolingSessionSource<T>>();
}

public ValueTask<ISession> OpenSession(CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -132,7 +135,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
), useSynchronizationContext: false
);
await using var disposeRegistration = _disposeCts.Token.Register(
() => waiterTcs.TrySetException(new YdbException("The session source has been shut down.")),
() => waiterTcs.TrySetException(ObjectDisposedException),
useSynchronizationContext: false
);
session = await waiterTcs.Task.ConfigureAwait(false);
Expand All @@ -156,7 +159,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
try
{
if (IsDisposed)
throw new YdbException("The session source has been shut down.");
throw ObjectDisposedException;

var session = _sessionFactory.NewSession(this);
await session.Open(cancellationToken);
Expand All @@ -167,8 +170,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
return session;
}

throw new YdbException(
$"Could not find free slot in {_sessions} when opening. Please report a bug.");
throw new YdbException($"Could not find free slot in {_sessions} when opening. Please report a bug.");
}
catch
{
Expand Down Expand Up @@ -266,6 +268,7 @@ public async ValueTask DisposeAsync()
await _cleanerTimer.DisposeAsync();
_disposeCts.Cancel();

var sw = Stopwatch.StartNew();
var spinWait = new SpinWait();
do
{
Expand All @@ -280,10 +283,32 @@ public async ValueTask DisposeAsync()
}

spinWait.SpinOnce();
} while (_numSessions > 0);
} while (_numSessions > 0 && sw.Elapsed < TimeSpan.FromSeconds(DisposeTimeoutSeconds));

try
{
await _sessionFactory.DisposeAsync();
}
catch (Exception e)
{
_logger.LogError(e, "Failed to dispose the transport driver");
}

await _sessionFactory.DisposeAsync();
for (var i = 0; i < _maxSessionSize; i++)
{
var session = Volatile.Read(ref _sessions[i]);

if (session == null || session.CompareAndSet(PoolingSessionState.Clean, PoolingSessionState.Clean))
continue;
_logger.LogCritical("Disposal timed out: Some sessions are still active");

throw new YdbException("Timeout while disposing of the pool: some sessions are still active. " +
"This may indicate a connection leak or suspended operations.");
}
}

private Exception ObjectDisposedException =>
new ObjectDisposedException(nameof(PoolingSessionSource<T>), "The session source has been closed.");
}

internal interface IPoolingSessionFactory<T> : IAsyncDisposable where T : PoolingSessionBase<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ public async Task DisposeAsync_Cancel_WaitersSession()
Assert.Equal(0, mockFactory.NumSession);
for (var i = 0; i < maxSessionSize; i++)
{
Assert.Equal("The session source has been shut down.",
(await Assert.ThrowsAsync<YdbException>(() => waitingSessionTasks[i])).Message);
Assert.StartsWith("The session source has been closed.",
(await Assert.ThrowsAsync<ObjectDisposedException>(() => waitingSessionTasks[i])).Message);
}

Assert.Equal("The session source has been shut down.",
(await Assert.ThrowsAsync<YdbException>(async () => await sessionSource.OpenSession())).Message);
Assert.StartsWith("The session source has been closed.",
(await Assert.ThrowsAsync<ObjectDisposedException>(async () => await sessionSource.OpenSession())).Message);
}

[Fact]
Expand Down Expand Up @@ -185,10 +185,11 @@ public async Task StressTest_DisposeAsync_Close_Driver()
{
using var session = await sessionSource.OpenSession();
await Task.Yield();
Assert.False(disposeCalled);
}
catch (YdbException e)
catch (ObjectDisposedException e)
{
Assert.Equal("The session source has been shut down.", e.Message);
Assert.StartsWith("The session source has been closed.", e.Message);
}
catch (OperationCanceledException)
{
Expand All @@ -202,6 +203,34 @@ public async Task StressTest_DisposeAsync_Close_Driver()
}
}

[Fact]
public async Task DisposeAsync_WhenSessionIsLeaked_ThrowsYdbExceptionWithTimeoutMessage()
{
var disposeCalled = false;
const int maxSessionSize = 10;
var mockFactory = new MockPoolingSessionFactory(maxSessionSize)
{
Dispose = () =>
{
Volatile.Write(ref disposeCalled, true);
return ValueTask.CompletedTask;
}
};
var settings = new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize };
var sessionSource = new PoolingSessionSource<MockPoolingSession>(mockFactory, settings);

#pragma warning disable CA2012
_ = sessionSource.OpenSession(CancellationToken.None);
#pragma warning restore CA2012

Assert.Equal("Timeout while disposing of the pool: some sessions are still active. " +
"This may indicate a connection leak or suspended operations.",
(await Assert.ThrowsAsync<YdbException>(async () => await sessionSource.DisposeAsync())).Message);
Assert.True(disposeCalled);
await Assert.ThrowsAsync<ObjectDisposedException>(() =>
sessionSource.OpenSession(CancellationToken.None).AsTask());
}

[Fact]
public async Task IdleTimeout_MinSessionSize_CloseNumSessionsMinusMinSessionCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public async Task StressTest_OpenSession_RaceWithDispose_SuccessfulOpensAreNotDi
try
{
using var s = await source.OpenSession(CancellationToken.None);
await Task.Yield();
Assert.False(_isDisposed);
}
catch (ObjectDisposedException)
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ private List<Task> GenerateTasks(string connectionString) => Enumerable.Range(0,
ydbConnection.ConnectionString = connectionString;
await ydbConnection.OpenAsync();
}
catch (YdbException)
catch (ObjectDisposedException)
{
Interlocked.Add(ref _counter, i);
return;
Expand Down
Loading