Skip to content

Commit 1819ec9

Browse files
fix
1 parent e32b7da commit 1819ec9

File tree

3 files changed

+94
-41
lines changed

3 files changed

+94
-41
lines changed

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ internal sealed class PoolingSessionSource<T> : ISessionSource where T : Pooling
2020
private readonly Timer _cleanerTimer;
2121

2222
private volatile int _numSessions;
23+
private volatile int _waiterSize;
2324

2425
public PoolingSessionSource(
2526
IPoolingSessionFactory<T> sessionFactory,
@@ -93,8 +94,24 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
9394

9495
while (true)
9596
{
97+
// Statement order is important
9698
var waiterTcs = new TaskCompletionSource<T?>(TaskCreationOptions.RunContinuationsAsynchronously);
9799
_waiters.Enqueue(waiterTcs);
100+
if (TryGetIdleSession(out session))
101+
{
102+
if (!waiterTcs.TrySetResult(null))
103+
{
104+
var raceSession = await waiterTcs.Task;
105+
106+
if (CheckIdleSession(raceSession))
107+
{
108+
_idleSessions.Push(raceSession);
109+
}
110+
}
111+
112+
return session;
113+
}
114+
98115
await using var _ = finalToken.Register(() => waiterTcs.TrySetCanceled(), useSynchronizationContext: false);
99116
session = await waiterTcs.Task.ConfigureAwait(false);
100117

@@ -143,8 +160,9 @@ private async ValueTask<ISession> RentAsync(CancellationToken cancellationToken)
143160

144161
private void WakeUpWaiter()
145162
{
146-
if (_waiters.TryDequeue(out var waiter))
147-
waiter.TrySetResult(null); // wake up waiter!
163+
if (_waiters.TryDequeue(out var waiter) && waiter.TrySetResult(null))
164+
{
165+
} // wake up waiter!
148166
}
149167

150168
public void Return(T session)
@@ -160,11 +178,12 @@ public void Return(T session)
160178
session.IdleStartTime = DateTime.Now;
161179
session.Set(PoolingSessionState.In);
162180

163-
if (_waiters.TryDequeue(out var waiter))
181+
while (_waiters.TryDequeue(out var waiter))
164182
{
165-
waiter.TrySetResult(session);
166-
167-
return;
183+
if (waiter.TrySetResult(session))
184+
{
185+
return;
186+
}
168187
}
169188

170189
_idleSessions.Push(session);

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ Allocated : Allocated memory per single operation (managed only, incl
3939

4040
# YDB .NET SDK Session Source Benchmarks (Npgsql)
4141

42-
| Method | Mean | Error | StdDev | Median | Completed Work Items | Lock Contentions | Gen0 | Allocated |
43-
|--------------------------|--------------:|-------------:|-------------:|--------------:|---------------------:|-----------------:|-------:|----------:|
44-
| SingleThreaded_OpenClose | 25.82 ns | 0.141 ns | 0.125 ns | 25.78 ns | - | - | - | - |
45-
| MultiThreaded_OpenClose | 20,893.61 ns | 829.087 ns | 2,431.569 ns | 19,694.30 ns | 20.0033 | 0.0303 | 0.5188 | 4526 B |
46-
| HighContention_OpenClose | 108,688.27 ns | 2,160.177 ns | 3,298.819 ns | 108,755.99 ns | 100.0017 | 3.8002 | 2.5635 | 21839 B |
47-
| SessionReuse_Pattern | 130,849.34 ns | 2,616.397 ns | 4,977.967 ns | 129,920.39 ns | 20.0000 | 5.4443 | 0.4883 | 4588 B |
48-
42+
| Method | Mean | Error | StdDev | Completed Work Items | Lock Contentions | Gen0 | Allocated |
43+
|-------------------------------------|-----------------:|-----------------:|-----------------:|---------------------:|-----------------:|---------:|----------:|
44+
| SingleThreaded_OpenClose | 59.98 ns | 0.241 ns | 0.226 ns | - | - | 0.0038 | 32 B |
45+
| MultiThreaded_OpenClose | 24,978.42 ns | 479.471 ns | 1,082.246 ns | 40.0022 | 0.0001 | 0.9460 | 7888 B |
46+
| HighContention_OpenClose | 95,835.23 ns | 2,333.340 ns | 6,769.444 ns | 207.0631 | 0.0012 | 5.2490 | 43468 B |
47+
| SessionReuse_Pattern | 125,216.56 ns | 2,480.604 ns | 6,621.235 ns | 220.0000 | - | 1.5869 | 13680 B |
48+
| SessionReuse_HighIterations_Pattern | 69,903,786.07 ns | 1,700,283.319 ns | 5,013,324.093 ns | 200020.0000 | - | 714.2857 | 6407595 B |

src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Session/PoolingSessionSourceMockTests.cs

Lines changed: 62 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,50 +28,84 @@ public async Task Reuse_Session_Before_Creating_new()
2828
[Fact]
2929
public async Task Creating_Session_Throw_Exception()
3030
{
31-
const string errorMessage = "Error on open session";
32-
const int maxSessionSize = 200;
31+
for (var it = 0; it < 100_000; it++)
32+
{
33+
const string errorMessage = "Error on open session";
34+
const int maxSessionSize = 200;
35+
36+
var mockPoolingSessionFactory = new MockPoolingSessionFactory
37+
{
38+
Open = sessionNum =>
39+
sessionNum <= maxSessionSize * 2
40+
? Task.FromException(new YdbException(errorMessage))
41+
: Task.CompletedTask
42+
};
43+
44+
var sessionSource = new PoolingSessionSource<MockPoolingSession>(
45+
mockPoolingSessionFactory, new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize }
46+
);
47+
48+
var tasks = new List<Task>();
49+
var countSuccess = 0;
3350

51+
for (var i = 0; i < maxSessionSize * 4; i++)
52+
{
53+
tasks.Add(Task.Run(async () =>
54+
{
55+
try
56+
{
57+
var session = await sessionSource.OpenSession();
58+
// ReSharper disable once AccessToModifiedClosure
59+
Interlocked.Increment(ref countSuccess);
60+
Assert.True(session.SessionId() > maxSessionSize * 2);
61+
session.Close();
62+
}
63+
catch (YdbException e)
64+
{
65+
Assert.Equal(errorMessage, e.Message);
66+
}
67+
}));
68+
}
69+
70+
await Task.WhenAll(tasks);
71+
Assert.Equal(maxSessionSize * 2, Volatile.Read(ref countSuccess));
72+
Assert.True(maxSessionSize * 3 >= mockPoolingSessionFactory.SessionNum);
73+
Assert.True(maxSessionSize * 2 < mockPoolingSessionFactory.SessionNum);
74+
}
75+
}
76+
77+
[Fact]
78+
public async Task HighContention_OpenClose_NotCanceledException()
79+
{
3480
var mockPoolingSessionFactory = new MockPoolingSessionFactory
3581
{
36-
Open = sessionNum =>
37-
sessionNum <= maxSessionSize * 2
38-
? Task.FromException(new YdbException(errorMessage))
39-
: Task.CompletedTask
82+
Open = async _ => await Task.Yield()
4083
};
84+
const int highContentionTasks = 100;
85+
const int maxSessionSize = highContentionTasks / 2;
4186

4287
var sessionSource = new PoolingSessionSource<MockPoolingSession>(
4388
mockPoolingSessionFactory, new YdbConnectionStringBuilder { MaxSessionPool = maxSessionSize }
4489
);
4590

46-
var tasks = new List<Task>();
47-
var countSuccess = 0;
48-
49-
for (var i = 0; i < maxSessionSize * 4; i++)
91+
for (var it = 0; it < 100_000; it++)
5092
{
51-
tasks.Add(Task.Run(async () =>
93+
var tasks = new Task[highContentionTasks];
94+
95+
for (var i = 0; i < highContentionTasks; i++)
5296
{
53-
try
97+
tasks[i] = Task.Run(async () =>
5498
{
5599
var session = await sessionSource.OpenSession();
56-
// ReSharper disable once AccessToModifiedClosure
57-
Interlocked.Increment(ref countSuccess);
58-
Assert.True(session.SessionId() > maxSessionSize * 2);
100+
Assert.True(session.SessionId() <= maxSessionSize);
101+
await Task.Yield();
59102
session.Close();
60-
}
61-
catch (YdbException e)
62-
{
63-
Assert.Equal(errorMessage, e.Message);
64-
}
65-
}));
66-
}
103+
});
104+
}
67105

68-
await Task.WhenAll(tasks);
69-
Assert.Equal(maxSessionSize * 2, Volatile.Read(ref countSuccess));
70-
Assert.True(maxSessionSize * 3 >= mockPoolingSessionFactory.SessionNum);
71-
Assert.True(maxSessionSize * 2 < mockPoolingSessionFactory.SessionNum);
106+
await Task.WhenAll(tasks);
107+
}
72108
}
73-
74-
75109
}
76110

77111
internal static class ISessionExtension

0 commit comments

Comments
 (0)