Skip to content

Commit 0dae8d2

Browse files
FIFO LOCK FREE POOL
1 parent 381c821 commit 0dae8d2

File tree

8 files changed

+75
-12
lines changed

8 files changed

+75
-12
lines changed

src/Ydb.Sdk/src/Ado/PoolManager.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Collections.Concurrent;
2-
using Microsoft.Extensions.Logging.Abstractions;
32
using Ydb.Sdk.Ado.Session;
43

54
namespace Ydb.Sdk.Ado;
@@ -49,8 +48,8 @@ internal static async Task ClearPool(string connectionString)
4948
try
5049
{
5150
await SemaphoreSlim.WaitAsync();
52-
//
53-
// await sessionPool.DisposeAsync();
51+
52+
await sessionPool.DisposeAsync();
5453
}
5554
finally
5655
{
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
namespace Ydb.Sdk.Ado.Session;
22

3-
internal interface ISessionSource
3+
internal interface ISessionSource : IAsyncDisposable
44
{
55
ValueTask<ISession> OpenSession(CancellationToken cancellationToken);
66
}

src/Ydb.Sdk/src/Ado/Session/PoolingSessionFactory.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ internal class PoolingSessionFactory : IPoolingSessionFactory<PoolingSession>
99
private readonly bool _disableServerBalancer;
1010
private readonly ILogger<PoolingSession> _logger;
1111

12-
private PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory)
12+
internal PoolingSessionFactory(IDriver driver, YdbConnectionStringBuilder settings, ILoggerFactory loggerFactory)
1313
{
1414
_driver = driver;
1515
_disableServerBalancer = settings.DisableServerBalancer;
@@ -21,4 +21,6 @@ public static async Task<PoolingSessionFactory> Create(YdbConnectionStringBuilde
2121

2222
public PoolingSession NewSession(PoolingSessionSource<PoolingSession> source) =>
2323
new(_driver, source, _disableServerBalancer, _logger);
24+
25+
public ValueTask DisposeAsync() => _driver.DisposeAsync();
2426
}

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ internal sealed class PoolingSessionSource<T> : ISessionSource where T : Pooling
1010
{
1111
private readonly ConcurrentStack<T> _idleSessions = new();
1212
private readonly ConcurrentQueue<TaskCompletionSource<T?>> _waiters = new();
13+
private readonly CancellationTokenSource _disposeCts = new();
1314

1415
private readonly IPoolingSessionFactory<T> _sessionFactory;
1516
private readonly int _minSessionSize;
@@ -20,7 +21,9 @@ internal sealed class PoolingSessionSource<T> : ISessionSource where T : Pooling
2021
private readonly Timer _cleanerTimer;
2122

2223
private volatile int _numSessions;
23-
private volatile int _waiterSize;
24+
private volatile int _disposed;
25+
26+
private bool IsDisposed => _disposed == 1;
2427

2528
public PoolingSessionSource(
2629
IPoolingSessionFactory<T> sessionFactory,
@@ -43,10 +46,15 @@ YdbConnectionStringBuilder settings
4346
_cleanerTimer = new Timer(CleanIdleSessions, this, _sessionIdleTimeout, _sessionIdleTimeout);
4447
}
4548

46-
public ValueTask<ISession> OpenSession(CancellationToken cancellationToken = default) =>
47-
TryGetIdleSession(out var session)
49+
public ValueTask<ISession> OpenSession(CancellationToken cancellationToken = default)
50+
{
51+
if (IsDisposed)
52+
throw new YdbException("Session Source is disposed.");
53+
54+
return TryGetIdleSession(out var session)
4855
? new ValueTask<ISession>(session)
4956
: RentAsync(cancellationToken);
57+
}
5058

5159
[MethodImpl(MethodImplOptions.AggressiveInlining)]
5260
private bool TryGetIdleSession([NotNullWhen(true)] out T? session)
@@ -119,7 +127,14 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
119127
continue;
120128
}
121129

122-
await using var _ = finalToken.Register(() => waiterTcs.TrySetCanceled(), useSynchronizationContext: false);
130+
await using var _ = finalToken.Register(
131+
() => waiterTcs.TrySetCanceled(),
132+
useSynchronizationContext: false
133+
);
134+
await using var disposedCancellationTokenRegistration = _disposeCts.Token.Register(
135+
() => waiterTcs.TrySetException(new YdbException("Session Source is disposed.")),
136+
useSynchronizationContext: false
137+
);
123138
session = await waiterTcs.Task.ConfigureAwait(false);
124139

125140
if (CheckIdleSession(session) || TryGetIdleSession(out session))
@@ -174,10 +189,15 @@ private void WakeUpWaiter()
174189

175190
public void Return(T session)
176191
{
177-
if (session.IsBroken)
192+
if (session.IsBroken || IsDisposed)
178193
{
179194
CloseSession(session);
180195

196+
if (IsDisposed)
197+
{
198+
_ = TryDisposeCore();
199+
}
200+
181201
return;
182202
}
183203

@@ -237,9 +257,27 @@ private static void CleanIdleSessions(object? state)
237257
}
238258
}
239259
}
260+
261+
public async ValueTask DisposeAsync()
262+
{
263+
if (Interlocked.CompareExchange(ref _disposed, 1, 0) != 0)
264+
{
265+
return;
266+
}
267+
268+
await _cleanerTimer.DisposeAsync();
269+
_disposeCts.Cancel();
270+
271+
CleanIdleSessions(_idleSessions);
272+
273+
await TryDisposeCore();
274+
}
275+
276+
private ValueTask TryDisposeCore() =>
277+
_numSessions == 0 ? _sessionFactory.DisposeAsync() : ValueTask.CompletedTask;
240278
}
241279

242-
internal interface IPoolingSessionFactory<T> where T : PoolingSessionBase<T>
280+
internal interface IPoolingSessionFactory<T> : IAsyncDisposable where T : PoolingSessionBase<T>
243281
{
244282
T NewSession(PoolingSessionSource<T> source);
245283
}

src/Ydb.Sdk/src/Ado/YdbConnection.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,15 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
9898
}
9999
catch (OperationCanceledException e)
100100
{
101-
throw new YdbException(StatusCode.Cancelled,
101+
throw new YdbException(StatusCode.ClientTransportTimeout,
102+
$"The connection pool has been exhausted, either raise 'MaxSessionPool' " +
103+
$"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " +
104+
$"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e
105+
);
106+
}
107+
catch (YdbException e) when (e.Code == StatusCode.ClientTransportTimeout)
108+
{
109+
throw new YdbException(e.Code,
102110
$"The connection pool has been exhausted, either raise 'MaxSessionPool' " +
103111
$"(currently {ConnectionStringBuilder.MaxSessionPool}) or 'CreateSessionTimeout' " +
104112
$"(currently {ConnectionStringBuilder.CreateSessionTimeout} seconds) in your connection string.", e

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,15 @@ Allocated : Allocated memory per single operation (managed only, incl
3838
| SessionReuse_Pattern | 159,954.05 ns | 2,820.324 ns | 4,044.825 ns | 220.0000 | 6.0381 | 0.7324 | - | 7307 B |
3939
| SessionReuse_HighContention_Pattern | 8,914,529.81 ns | 46,900.448 ns | 41,576.026 ns | 19756.6563 | 149.0469 | 625.0000 | 93.7500 | 5289794 B |
4040
| SessionReuse_HighIterations_Pattern | 81,211,792.96 ns | 749,115.160 ns | 664,071.077 ns | 200020.0000 | 614.8571 | - | - | 7458 B |
41+
42+
# YDB .NET SDK Session Pool Benchmarks (FIFO lock-free)
43+
44+
| Method | Mean | Error | StdDev | Median | Completed Work Items | Lock Contentions | Gen0 | Gen1 | Allocated |
45+
|-------------------------------------|-----------------:|-----------------:|-----------------:|-----------------:|---------------------:|-----------------:|---------:|---------:|----------:|
46+
| SingleThreaded_OpenClose | 64.75 ns | 1.004 ns | 0.986 ns | 64.31 ns | - | - | 0.0038 | - | 32 B |
47+
| MultiThreaded_OpenClose | 22,547.55 ns | 443.931 ns | 511.232 ns | 22,613.77 ns | 40.0020 | - | 0.9460 | - | 7888 B |
48+
| HighContention_OpenClose | 83,230.11 ns | 1,642.445 ns | 1,536.344 ns | 83,232.22 ns | 205.5402 | 0.0016 | 5.0049 | - | 42337 B |
49+
| SessionReuse_Pattern | 102,998.59 ns | 1,990.337 ns | 2,854.481 ns | 103,245.75 ns | 220.0000 | 0.0002 | 1.5869 | - | 13678 B |
50+
| SessionReuse_HighContention_Pattern | 3,735,379.14 ns | 73,499.927 ns | 116,578.682 ns | 3,694,595.05 ns | 19845.2422 | 5.7344 | 812.5000 | 132.8125 | 6750890 B |
51+
| SessionReuse_HighIterations_Pattern | 69,402,122.32 ns | 1,382,232.565 ns | 4,075,544.202 ns | 70,535,363.07 ns | 200020.0000 | - | 714.2857 | - | 6407458 B |
52+

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ public async Task SessionReuse_HighIterations_Pattern()
136136
internal class MockSessionFactory : IPoolingSessionFactory<MockPoolingSession>
137137
{
138138
public MockPoolingSession NewSession(PoolingSessionSource<MockPoolingSession> source) => new(source);
139+
140+
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
139141
}
140142

141143
internal class MockPoolingSession(PoolingSessionSource<MockPoolingSession> source)

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ internal class MockPoolingSessionFactory : IPoolingSessionFactory<MockPoolingSes
127127

128128
public MockPoolingSession NewSession(PoolingSessionSource<MockPoolingSession> source) =>
129129
new(source, Open, DeleteSession, IsBroken, Interlocked.Increment(ref _sessionNum));
130+
131+
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
130132
}
131133

132134
internal class MockPoolingSession(

0 commit comments

Comments
 (0)