Skip to content

Commit 56c2201

Browse files
Feat ADO.NET: Added dispose timeout (10 seconds) to PoolingSessionSource (#535)
* Feat ADO.NET: Added dispose timeout (10 seconds) to `PoolingSessionSource` * Fix race condition in ImplicitSessionSource: | OpenSession | ReleaseLease | DisposeAsync | |--------------|----------------------|------------------| | - | Decrement lease <- 0 | - | | AcquireLease | - | - | | - | - | _isDisposed == 1 | | | FAIL TrySetResult()! | | Fixed: now check _activeLeaseCount after volatile read _isDisposed
1 parent 56d54c7 commit 56c2201

File tree

6 files changed

+78
-19
lines changed

6 files changed

+78
-19
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
- Feat ADO.NET: Added dispose timeout (10 seconds) to `PoolingSessionSource`.
12
- Feat ADO.NET: Added `EnableImplicitSession` to support implicit sessions.
23

34
## v0.23.1

src/Ydb.Sdk/src/Ado/Session/ImplicitSessionSource.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)
2525

2626
return TryAcquireLease()
2727
? new ValueTask<ISession>(new ImplicitSession(_driver, this))
28-
: throw new ObjectDisposedException(nameof(ImplicitSessionSource));
28+
: throw new ObjectDisposedException(nameof(ImplicitSessionSource),
29+
"The implicit session source has been closed.");
2930
}
3031

3132
private bool TryAcquireLease()
@@ -41,7 +42,9 @@ private bool TryAcquireLease()
4142

4243
internal void ReleaseLease()
4344
{
44-
if (Interlocked.Decrement(ref _activeLeaseCount) == 0 && Volatile.Read(ref _isDisposed) == 1)
45+
Interlocked.Decrement(ref _activeLeaseCount);
46+
47+
if (Volatile.Read(ref _isDisposed) == 1 && _activeLeaseCount == 0)
4548
_drainedTcs.TrySetResult();
4649
}
4750

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
using System.Collections.Concurrent;
2+
using System.Diagnostics;
23
using System.Diagnostics.CodeAnalysis;
34
using System.Runtime.CompilerServices;
5+
using Microsoft.Extensions.Logging;
46
using Ydb.Query;
57

68
namespace Ydb.Sdk.Ado.Session;
79

810
internal sealed class PoolingSessionSource<T> : ISessionSource where T : PoolingSessionBase<T>
911
{
12+
private const int DisposeTimeoutSeconds = 10;
13+
1014
private readonly ConcurrentStack<T> _idleSessions = new();
1115
private readonly ConcurrentQueue<TaskCompletionSource<T?>> _waiters = new();
1216
private readonly CancellationTokenSource _disposeCts = new();
@@ -18,16 +22,14 @@ internal sealed class PoolingSessionSource<T> : ISessionSource where T : Pooling
1822
private readonly int _createSessionTimeout;
1923
private readonly TimeSpan _sessionIdleTimeout;
2024
private readonly Timer _cleanerTimer;
25+
private readonly ILogger _logger;
2126

2227
private volatile int _numSessions;
2328
private volatile int _disposed;
2429

2530
private bool IsDisposed => _disposed == 1;
2631

27-
public PoolingSessionSource(
28-
IPoolingSessionFactory<T> sessionFactory,
29-
YdbConnectionStringBuilder settings
30-
)
32+
public PoolingSessionSource(IPoolingSessionFactory<T> sessionFactory, YdbConnectionStringBuilder settings)
3133
{
3234
_sessionFactory = sessionFactory;
3335
_minSessionSize = settings.MinSessionPool;
@@ -43,6 +45,7 @@ YdbConnectionStringBuilder settings
4345
_createSessionTimeout = settings.CreateSessionTimeout;
4446
_sessionIdleTimeout = TimeSpan.FromSeconds(settings.SessionIdleTimeout);
4547
_cleanerTimer = new Timer(CleanIdleSessions, this, _sessionIdleTimeout, _sessionIdleTimeout);
48+
_logger = settings.LoggerFactory.CreateLogger<PoolingSessionSource<T>>();
4649
}
4750

4851
public ValueTask<ISession> OpenSession(CancellationToken cancellationToken = default)
@@ -132,7 +135,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
132135
), useSynchronizationContext: false
133136
);
134137
await using var disposeRegistration = _disposeCts.Token.Register(
135-
() => waiterTcs.TrySetException(new YdbException("The session source has been shut down.")),
138+
() => waiterTcs.TrySetException(ObjectDisposedException),
136139
useSynchronizationContext: false
137140
);
138141
session = await waiterTcs.Task.ConfigureAwait(false);
@@ -156,7 +159,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
156159
try
157160
{
158161
if (IsDisposed)
159-
throw new YdbException("The session source has been shut down.");
162+
throw ObjectDisposedException;
160163

161164
var session = _sessionFactory.NewSession(this);
162165
await session.Open(cancellationToken);
@@ -167,8 +170,7 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
167170
return session;
168171
}
169172

170-
throw new YdbException(
171-
$"Could not find free slot in {_sessions} when opening. Please report a bug.");
173+
throw new YdbException($"Could not find free slot in {_sessions} when opening. Please report a bug.");
172174
}
173175
catch
174176
{
@@ -266,6 +268,7 @@ public async ValueTask DisposeAsync()
266268
await _cleanerTimer.DisposeAsync();
267269
_disposeCts.Cancel();
268270

271+
var sw = Stopwatch.StartNew();
269272
var spinWait = new SpinWait();
270273
do
271274
{
@@ -280,10 +283,32 @@ public async ValueTask DisposeAsync()
280283
}
281284

282285
spinWait.SpinOnce();
283-
} while (_numSessions > 0);
286+
} while (_numSessions > 0 && sw.Elapsed < TimeSpan.FromSeconds(DisposeTimeoutSeconds));
287+
288+
try
289+
{
290+
await _sessionFactory.DisposeAsync();
291+
}
292+
catch (Exception e)
293+
{
294+
_logger.LogError(e, "Failed to dispose the transport driver");
295+
}
284296

285-
await _sessionFactory.DisposeAsync();
297+
for (var i = 0; i < _maxSessionSize; i++)
298+
{
299+
var session = Volatile.Read(ref _sessions[i]);
300+
301+
if (session == null || session.CompareAndSet(PoolingSessionState.Clean, PoolingSessionState.Clean))
302+
continue;
303+
_logger.LogCritical("Disposal timed out: Some sessions are still active");
304+
305+
throw new YdbException("Timeout while disposing of the pool: some sessions are still active. " +
306+
"This may indicate a connection leak or suspended operations.");
307+
}
286308
}
309+
310+
private Exception ObjectDisposedException =>
311+
new ObjectDisposedException(nameof(PoolingSessionSource<T>), "The session source has been closed.");
287312
}
288313

289314
internal interface IPoolingSessionFactory<T> : IAsyncDisposable where T : PoolingSessionBase<T>

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,12 @@ public async Task DisposeAsync_Cancel_WaitersSession()
141141
Assert.Equal(0, mockFactory.NumSession);
142142
for (var i = 0; i < maxSessionSize; i++)
143143
{
144-
Assert.Equal("The session source has been shut down.",
145-
(await Assert.ThrowsAsync<YdbException>(() => waitingSessionTasks[i])).Message);
144+
Assert.StartsWith("The session source has been closed.",
145+
(await Assert.ThrowsAsync<ObjectDisposedException>(() => waitingSessionTasks[i])).Message);
146146
}
147147

148-
Assert.Equal("The session source has been shut down.",
149-
(await Assert.ThrowsAsync<YdbException>(async () => await sessionSource.OpenSession())).Message);
148+
Assert.StartsWith("The session source has been closed.",
149+
(await Assert.ThrowsAsync<ObjectDisposedException>(async () => await sessionSource.OpenSession())).Message);
150150
}
151151

152152
[Fact]
@@ -185,10 +185,11 @@ public async Task StressTest_DisposeAsync_Close_Driver()
185185
{
186186
using var session = await sessionSource.OpenSession();
187187
await Task.Yield();
188+
Assert.False(disposeCalled);
188189
}
189-
catch (YdbException e)
190+
catch (ObjectDisposedException e)
190191
{
191-
Assert.Equal("The session source has been shut down.", e.Message);
192+
Assert.StartsWith("The session source has been closed.", e.Message);
192193
}
193194
catch (OperationCanceledException)
194195
{
@@ -202,6 +203,34 @@ public async Task StressTest_DisposeAsync_Close_Driver()
202203
}
203204
}
204205

206+
[Fact]
207+
public async Task DisposeAsync_WhenSessionIsLeaked_ThrowsYdbExceptionWithTimeoutMessage()
208+
{
209+
var disposeCalled = false;
210+
const int maxSessionSize = 10;
211+
var mockFactory = new MockPoolingSessionFactory(maxSessionSize)
212+
{
213+
Dispose = () =>
214+
{
215+
Volatile.Write(ref disposeCalled, true);
216+
return ValueTask.CompletedTask;
217+
}
218+
};
219+
var settings = new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize };
220+
var sessionSource = new PoolingSessionSource<MockPoolingSession>(mockFactory, settings);
221+
222+
#pragma warning disable CA2012
223+
_ = sessionSource.OpenSession(CancellationToken.None);
224+
#pragma warning restore CA2012
225+
226+
Assert.Equal("Timeout while disposing of the pool: some sessions are still active. " +
227+
"This may indicate a connection leak or suspended operations.",
228+
(await Assert.ThrowsAsync<YdbException>(async () => await sessionSource.DisposeAsync())).Message);
229+
Assert.True(disposeCalled);
230+
await Assert.ThrowsAsync<ObjectDisposedException>(() =>
231+
sessionSource.OpenSession(CancellationToken.None).AsTask());
232+
}
233+
205234
[Fact]
206235
public async Task IdleTimeout_MinSessionSize_CloseNumSessionsMinusMinSessionCount()
207236
{

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/YdbImplicitStressTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public async Task StressTest_OpenSession_RaceWithDispose_SuccessfulOpensAreNotDi
3232
try
3333
{
3434
using var s = await source.OpenSession(CancellationToken.None);
35+
await Task.Yield();
3536
Assert.False(_isDisposed);
3637
}
3738
catch (ObjectDisposedException)

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/YdbConnectionTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ private List<Task> GenerateTasks(string connectionString) => Enumerable.Range(0,
288288
ydbConnection.ConnectionString = connectionString;
289289
await ydbConnection.OpenAsync();
290290
}
291-
catch (YdbException)
291+
catch (ObjectDisposedException)
292292
{
293293
Interlocked.Add(ref _counter, i);
294294
return;

0 commit comments

Comments
 (0)