Skip to content

Commit 2e3c87f

Browse files
update
1 parent 624d9d2 commit 2e3c87f

File tree

4 files changed

+76
-137
lines changed

4 files changed

+76
-137
lines changed

src/Ydb.Sdk/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
- Added provider support for implicit sessions.
1+
- Feat ADO.NET: Added `EnableImplicitSession` to support implicit sessions.
22

33
## v0.23.1
44

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ CancellationToken cancellationToken
3636
driver.RegisterOwner();
3737

3838
return Pools[settings.ConnectionString] = settings.EnableImplicitSession
39-
? new ImplicitSessionSource(driver)
39+
? new ImplicitSessionSource(driver, settings.LoggerFactory)
4040
: new PoolingSessionSource<PoolingSession>(new PoolingSessionFactory(driver, settings), settings);
4141
}
4242
finally
Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
1+
using Microsoft.Extensions.Logging;
2+
13
namespace Ydb.Sdk.Ado.Session;
24

35
internal sealed class ImplicitSessionSource : ISessionSource
46
{
7+
private const int DisposeTimeoutSeconds = 10;
8+
59
private readonly IDriver _driver;
6-
private readonly ManualResetEventSlim _allReleased = new(false);
10+
private readonly TaskCompletionSource _drainedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
11+
712
private int _isDisposed;
813
private int _activeLeaseCount;
914

10-
internal ImplicitSessionSource(IDriver driver)
15+
internal ImplicitSessionSource(IDriver driver, ILoggerFactory loggerFactory)
1116
{
1217
_driver = driver;
1318
}
@@ -16,10 +21,9 @@ public ValueTask<ISession> OpenSession(CancellationToken cancellationToken)
1621
{
1722
cancellationToken.ThrowIfCancellationRequested();
1823

19-
if (!TryAcquireLease())
20-
throw new ObjectDisposedException(nameof(ImplicitSessionSource));
21-
22-
return new ValueTask<ISession>(new ImplicitSession(_driver, this));
24+
return TryAcquireLease()
25+
? new ValueTask<ISession>(new ImplicitSession(_driver, this))
26+
: throw new ObjectDisposedException(nameof(ImplicitSessionSource));
2327
}
2428

2529
private bool TryAcquireLease()
@@ -38,18 +42,37 @@ private bool TryAcquireLease()
3842

3943
internal void ReleaseLease()
4044
{
41-
if (Interlocked.Decrement(ref _activeLeaseCount) == 0 && Volatile.Read(ref _isDisposed) != 0)
42-
_allReleased.Set();
45+
if (Interlocked.Decrement(ref _activeLeaseCount) == 0 && Volatile.Read(ref _isDisposed) == 1)
46+
_drainedTcs.SetResult();
4347
}
4448

4549
public async ValueTask DisposeAsync()
4650
{
4751
if (Interlocked.CompareExchange(ref _isDisposed, 1, 0) != 0)
4852
return;
4953

50-
if (Volatile.Read(ref _activeLeaseCount) != 0)
51-
_allReleased.Wait();
52-
53-
await _driver.DisposeAsync();
54+
try
55+
{
56+
if (Volatile.Read(ref _activeLeaseCount) != 0)
57+
{
58+
await _drainedTcs.Task.WaitAsync(TimeSpan.FromSeconds(DisposeTimeoutSeconds));
59+
}
60+
}
61+
catch (TimeoutException)
62+
{
63+
throw new YdbException("Timeout while disposing of the pool: some implicit sessions are still active. " +
64+
"This may indicate a connection leak or suspended operations.");
65+
}
66+
finally
67+
{
68+
try
69+
{
70+
await _driver.DisposeAsync();
71+
}
72+
catch (Exception)
73+
{
74+
// ignored
75+
}
76+
}
5477
}
5578
}
Lines changed: 39 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -1,154 +1,70 @@
11
using Moq;
22
using Xunit;
33
using Ydb.Sdk.Ado.Session;
4+
using Ydb.Sdk.Ado.Tests.Utils;
45

56
namespace Ydb.Sdk.Ado.Tests.Session;
67

7-
public class YdbImplicitStressTests : TestBase
8+
public class YdbImplicitStressTests
89
{
9-
private static IDriver DummyDriver()
10+
private volatile bool _isDisposed;
11+
12+
private IDriver DummyDriver()
1013
{
1114
var m = new Mock<IDriver>(MockBehavior.Loose);
12-
m.Setup(d => d.DisposeAsync()).Returns(ValueTask.CompletedTask);
15+
m.Setup(d => d.DisposeAsync())
16+
.Callback(() => _isDisposed = true)
17+
.Returns(ValueTask.CompletedTask);
1318
return m.Object;
1419
}
1520

16-
private sealed class Counter
17-
{
18-
public int Value;
19-
public void Inc() => Interlocked.Increment(ref Value);
20-
}
21-
22-
[Fact(Timeout = 30_000)]
21+
[Fact]
2322
public async Task Dispose_WaitsForAllLeases_AndSignalsOnEmptyExactlyOnce()
2423
{
25-
var driver = DummyDriver();
26-
27-
var opened = new Counter();
28-
var closed = new Counter();
29-
30-
var source = new ImplicitSessionSource(driver);
31-
32-
var workers = Enumerable.Range(0, 200).Select(async _ =>
24+
for (var it = 0; it < 1000; it++)
3325
{
34-
var rnd = Random.Shared;
35-
for (var j = 0; j < 10; j++)
26+
var driver = DummyDriver();
27+
var source = new ImplicitSessionSource(driver, TestUtils.LoggerFactory);
28+
29+
var workers = Enumerable.Range(0, 1000).Select(async _ =>
3630
{
37-
ISession s;
31+
await Task.Delay(Random.Shared.Next(0, 5));
3832
try
3933
{
40-
s = await source.OpenSession(CancellationToken.None);
41-
opened.Inc();
42-
43-
await Task.Delay(rnd.Next(0, 5));
34+
using var s = await source.OpenSession(CancellationToken.None);
35+
Assert.False(_isDisposed);
4436
}
4537
catch (ObjectDisposedException)
4638
{
47-
return;
4839
}
40+
}).ToArray();
4941

50-
var s2 = await source.OpenSession(CancellationToken.None);
51-
s2.Dispose();
52-
53-
s.Dispose();
54-
closed.Inc();
55-
}
56-
}).ToArray();
57-
58-
var disposer = Task.Run(async () =>
59-
{
60-
await Task.Delay(10);
61-
await source.DisposeAsync();
62-
});
63-
64-
await Task.WhenAll(workers.Append(disposer));
65-
66-
Assert.True(opened.Value > 0);
67-
Assert.Equal(opened.Value, closed.Value);
68-
69-
await Assert.ThrowsAsync<ObjectDisposedException>(() => source.OpenSession(CancellationToken.None).AsTask());
70-
}
71-
72-
[Fact(Timeout = 30_000)]
73-
public async Task Stress_Counts_AreBalanced()
74-
{
75-
var driver = DummyDriver();
76-
77-
var opened = new Counter();
78-
var closed = new Counter();
79-
80-
var source = new ImplicitSessionSource(driver);
81-
82-
var workers = Enumerable.Range(0, 200).Select(async _ =>
83-
{
84-
var rnd = Random.Shared;
85-
for (var j = 0; j < 10; j++)
42+
await Task.WhenAll(workers.Append(Task.Run(async () =>
8643
{
87-
ISession s;
88-
try
89-
{
90-
s = await source.OpenSession(CancellationToken.None);
91-
opened.Inc();
92-
93-
await Task.Delay(rnd.Next(0, 3));
94-
}
95-
catch (ObjectDisposedException)
96-
{
97-
return;
98-
}
99-
100-
var s2 = await source.OpenSession(CancellationToken.None);
101-
s2.Dispose();
102-
103-
s.Dispose();
104-
closed.Inc();
105-
}
106-
}).ToArray();
107-
108-
var disposer = Task.Run(async () => await source.DisposeAsync());
109-
110-
await Task.WhenAll(workers.Append(disposer));
111-
112-
Assert.Equal(opened.Value, closed.Value);
113-
Assert.True(opened.Value > 0);
114-
115-
await Assert.ThrowsAsync<ObjectDisposedException>(() => source.OpenSession(CancellationToken.None).AsTask());
44+
await Task.Delay(Random.Shared.Next(0, 3));
45+
await source.DisposeAsync();
46+
})));
47+
48+
Assert.True(_isDisposed);
49+
await Assert.ThrowsAsync<ObjectDisposedException>(() =>
50+
source.OpenSession(CancellationToken.None).AsTask());
51+
_isDisposed = false;
52+
}
11653
}
11754

118-
[Fact(Timeout = 30_000)]
119-
public async Task Open_RacingWithDispose_StateRemainsConsistent()
55+
[Fact]
56+
public async Task DisposeAsync_WhenSessionIsLeaked_ThrowsYdbExceptionWithTimeoutMessage()
12057
{
12158
var driver = DummyDriver();
122-
123-
var source = new ImplicitSessionSource(driver);
124-
125-
var opens = Enumerable.Range(0, 1000).Select(async _ =>
126-
{
127-
ISession s;
128-
try
129-
{
130-
s = await source.OpenSession(CancellationToken.None);
131-
}
132-
catch (ObjectDisposedException)
133-
{
134-
return 0;
135-
}
136-
137-
var s2 = await source.OpenSession(CancellationToken.None);
138-
s2.Dispose();
139-
140-
s.Dispose();
141-
return 1;
142-
}).ToArray();
143-
144-
var disposeTask = Task.Run(async () =>
145-
{
146-
await Task.Yield();
147-
await source.DisposeAsync();
148-
});
149-
150-
await Task.WhenAll(opens.Append(disposeTask));
151-
59+
var source = new ImplicitSessionSource(driver, TestUtils.LoggerFactory);
60+
#pragma warning disable CA2012
61+
_ = source.OpenSession(CancellationToken.None);
62+
#pragma warning restore CA2012
63+
64+
Assert.Equal("Timeout while disposing of the pool: some implicit sessions are still active. " +
65+
"This may indicate a connection leak or suspended operations.",
66+
(await Assert.ThrowsAsync<YdbException>(async () => await source.DisposeAsync())).Message);
67+
Assert.True(_isDisposed);
15268
await Assert.ThrowsAsync<ObjectDisposedException>(() => source.OpenSession(CancellationToken.None).AsTask());
15369
}
15470
}

0 commit comments

Comments
 (0)