Skip to content

Commit cac50eb

Browse files
Feat ADO.NET: Added dispose timeout (10 seconds) to PoolingSessionSource
1 parent 56d54c7 commit cac50eb

File tree

5 files changed

+65
-11
lines changed

5 files changed

+65
-11
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
- Feat ADO.NET: Added dispose timeout (10 seconds) to `PoolingSessionSource`.
12
- Feat ADO.NET: Added `EnableImplicitSession` to support implicit sessions.
23

34
## v0.23.1

src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)
2525

2626
return TryAcquireLease()
2727
? new ValueTask<ISession>(new ImplicitSession(_driver, this))
28-
: throw new ObjectDisposedException(nameof(ImplicitSessionSource));
28+
: throw new YdbException("The implicit session source has been shut down.");
2929
}
3030

3131
private bool TryAcquireLease()

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
using System.Collections.Concurrent;
2+
using System.Diagnostics;
23
using System.Diagnostics.CodeAnalysis;
34
using System.Runtime.CompilerServices;
5+
using Microsoft.Extensions.Logging;
46
using Ydb.Query;
57

68
namespace Ydb.Sdk.Ado.Session;
79

810
internal sealed class PoolingSessionSource<T> : ISessionSource where T : PoolingSessionBase<T>
911
{
12+
private const int DisposeTimeoutSeconds = 10;
13+
1014
private readonly ConcurrentStack<T> _idleSessions = new();
1115
private readonly ConcurrentQueue<TaskCompletionSource<T?>> _waiters = new();
1216
private readonly CancellationTokenSource _disposeCts = new();
@@ -18,16 +22,14 @@ internal sealed class PoolingSessionSource<T> : ISessionSource where T : Pooling
1822
private readonly int _createSessionTimeout;
1923
private readonly TimeSpan _sessionIdleTimeout;
2024
private readonly Timer _cleanerTimer;
25+
private readonly ILogger _logger;
2126

2227
private volatile int _numSessions;
2328
private volatile int _disposed;
2429

2530
private bool IsDisposed => _disposed == 1;
2631

27-
public PoolingSessionSource(
28-
IPoolingSessionFactory<T> sessionFactory,
29-
YdbConnectionStringBuilder settings
30-
)
32+
public PoolingSessionSource(IPoolingSessionFactory<T> sessionFactory, YdbConnectionStringBuilder settings)
3133
{
3234
_sessionFactory = sessionFactory;
3335
_minSessionSize = settings.MinSessionPool;
@@ -43,6 +45,7 @@ YdbConnectionStringBuilder settings
4345
_createSessionTimeout = settings.CreateSessionTimeout;
4446
_sessionIdleTimeout = TimeSpan.FromSeconds(settings.SessionIdleTimeout);
4547
_cleanerTimer = new Timer(CleanIdleSessions, this, _sessionIdleTimeout, _sessionIdleTimeout);
48+
_logger = settings.LoggerFactory.CreateLogger<PoolingSessionSource<T>>();
4649
}
4750

4851
public ValueTask<ISession> OpenSession(CancellationToken cancellationToken = default)
@@ -156,7 +159,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
156159
try
157160
{
158161
if (IsDisposed)
159-
throw new YdbException("The session source has been shut down.");
162+
throw new YdbException("The session source has been shutdown.");
160163

161164
var session = _sessionFactory.NewSession(this);
162165
await session.Open(cancellationToken);
@@ -167,8 +170,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
167170
return session;
168171
}
169172

170-
throw new YdbException(
171-
$"Could not find free slot in {_sessions} when opening. Please report a bug.");
173+
throw new YdbException($"Could not find free slot in {_sessions} when opening. Please report a bug.");
172174
}
173175
catch
174176
{
@@ -266,6 +268,7 @@ public async ValueTask DisposeAsync()
266268
await _cleanerTimer.DisposeAsync();
267269
_disposeCts.Cancel();
268270

271+
var sw = Stopwatch.StartNew();
269272
var spinWait = new SpinWait();
270273
do
271274
{
@@ -280,9 +283,28 @@ public async ValueTask DisposeAsync()
280283
}
281284

282285
spinWait.SpinOnce();
283-
} while (_numSessions > 0);
286+
} while (_numSessions > 0 && sw.Elapsed < TimeSpan.FromSeconds(DisposeTimeoutSeconds));
287+
288+
try
289+
{
290+
await _sessionFactory.DisposeAsync();
291+
}
292+
catch (Exception e)
293+
{
294+
_logger.LogError(e, "Failed to dispose the transport driver");
295+
}
284296

285-
await _sessionFactory.DisposeAsync();
297+
for (var i = 0; i < _maxSessionSize; i++)
298+
{
299+
var session = Volatile.Read(ref _sessions[i]);
300+
301+
if (session == null || session.CompareAndSet(PoolingSessionState.Clean, PoolingSessionState.Clean))
302+
continue;
303+
_logger.LogCritical("Disposal timed out: Some sessions are still active");
304+
305+
throw new YdbException("Timeout while disposing of the pool: some sessions are still active. " +
306+
"This may indicate a connection leak or suspended operations.");
307+
}
286308
}
287309
}
288310

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ public async Task StressTest_DisposeAsync_Close_Driver()
185185
{
186186
using var session = await sessionSource.OpenSession();
187187
await Task.Yield();
188+
Assert.False(disposeCalled);
188189
}
189190
catch (YdbException e)
190191
{
@@ -201,6 +202,34 @@ public async Task StressTest_DisposeAsync_Close_Driver()
201202
Assert.True(disposeCalled);
202203
}
203204
}
205+
206+
[Fact]
207+
public async Task DisposeAsync_WhenSessionIsLeaked_ThrowsYdbExceptionWithTimeoutMessage()
208+
{
209+
var disposeCalled = false;
210+
const int maxSessionSize = 10;
211+
var mockFactory = new MockPoolingSessionFactory(maxSessionSize)
212+
{
213+
Dispose = () =>
214+
{
215+
Volatile.Write(ref disposeCalled, true);
216+
return ValueTask.CompletedTask;
217+
}
218+
};
219+
var settings = new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize };
220+
var sessionSource = new PoolingSessionSource<MockPoolingSession>(mockFactory, settings);
221+
222+
#pragma warning disable CA2012
223+
_ = sessionSource.OpenSession(CancellationToken.None);
224+
#pragma warning restore CA2012
225+
226+
Assert.Equal("Timeout while disposing of the pool: some sessions are still active. " +
227+
"This may indicate a connection leak or suspended operations.",
228+
(await Assert.ThrowsAsync<YdbException>(async () => await sessionSource.DisposeAsync())).Message);
229+
Assert.True(disposeCalled);
230+
Assert.Equal("The session source has been shut down.", (await Assert.ThrowsAsync<YdbException>(
231+
() => sessionSource.OpenSession(CancellationToken.None).AsTask())).Message);
232+
}
204233

205234
[Fact]
206235
public async Task IdleTimeout_MinSessionSize_CloseNumSessionsMinusMinSessionCount()

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public async Task StressTest_OpenSession_RaceWithDispose_SuccessfulOpensAreNotDi
3232
try
3333
{
3434
using var s = await source.OpenSession(CancellationToken.None);
35+
await Task.Yield();
3536
Assert.False(_isDisposed);
3637
}
3738
catch (ObjectDisposedException)
@@ -65,6 +66,7 @@ public async Task DisposeAsync_WhenSessionIsLeaked_ThrowsYdbExceptionWithTimeout
6566
"This may indicate a connection leak or suspended operations.",
6667
(await Assert.ThrowsAsync<YdbException>(async () => await source.DisposeAsync())).Message);
6768
Assert.True(_isDisposed);
68-
await Assert.ThrowsAsync<ObjectDisposedException>(() => source.OpenSession(CancellationToken.None).AsTask());
69+
Assert.Equal("The implicit session source has been shut down.", (await Assert.ThrowsAsync<YdbException>(
70+
() => source.OpenSession(CancellationToken.None).AsTask())).Message);
6971
}
7072
}

0 commit comments

Comments
 (0)