Skip to content

Commit 1241e8f

Browse files
KirillKurdyukovLiamHamsters
authored andcommitted
dev: Ydb.Sdk.Ado.Stress.Loader (ydb-platform#480)
1 parent 1e23c76 commit 1241e8f

File tree

12 files changed

+617
-12
lines changed

12 files changed

+617
-12
lines changed

src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@
66
using System.Diagnostics.CodeAnalysis;
77
using System.Runtime.CompilerServices;
88
using System.Threading.Channels;
9-
using Microsoft.Extensions.Logging;
109

1110
namespace Ydb.Sdk.Ado.Session;
1211

1312
internal sealed class PoolingSessionSource : ISessionSource<IPoolingSession>
1413
{
15-
private readonly ILogger<PoolingSessionSource> _logger;
1614
private readonly IPoolingSessionFactory _sessionFactory;
1715

1816
private readonly int _minSessionSize;
@@ -35,12 +33,10 @@ internal sealed class PoolingSessionSource : ISessionSource<IPoolingSession>
3533
private volatile int _idleCount;
3634

3735
public PoolingSessionSource(
38-
IDriver driver,
3936
IPoolingSessionFactory sessionFactory,
4037
YdbConnectionStringBuilder settings
4138
)
4239
{
43-
_logger = driver.LoggerFactory.CreateLogger<PoolingSessionSource>();
4440
_sessionFactory = sessionFactory;
4541

4642
_minSessionSize = settings.MinSessionPool;
@@ -71,7 +67,7 @@ YdbConnectionStringBuilder settings
7167
_pruningTimerEnabled = false;
7268
}
7369

74-
public ValueTask<IPoolingSession> OpenSession(CancellationToken cancellationToken) =>
70+
public ValueTask<IPoolingSession> OpenSession(CancellationToken cancellationToken = default) =>
7571
TryGetIdleSession(out var session) ? new ValueTask<IPoolingSession>(session) : RentAsync(cancellationToken);
7672

7773
public void Return(IPoolingSession session)
@@ -153,7 +149,7 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio
153149

154150
// Only start pruning if we've incremented open count past _min.
155151
// Note that we don't do it only once, on equality, because the thread which incremented open count past _min might get exception
156-
// on NpgsqlSession.Open due to timeout, CancellationToken or other reasons.
152+
// on Session.Open due to timeout, CancellationToken or other reasons.
157153
if (numSessions >= _minSessionSize)
158154
{
159155
UpdatePruningTimer();
@@ -280,10 +276,6 @@ private static void PruneIdleSessions(object? state)
280276
pool._pruningTimer.Change(pool._pruningSamplingInterval, Timeout.InfiniteTimeSpan);
281277
}
282278

283-
if (pool._logger.IsEnabled(LogLevel.Debug))
284-
{
285-
}
286-
287279
while (toPrune > 0 &&
288280
pool._numSessions > pool._minSessionSize &&
289281
pool._idleSessionReader.TryRead(out var session) &&
Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,37 @@
1+
// * Legends *
2+
Mean : Arithmetic mean of all
3+
measurements
4+
Error : Half of 99.9% confidence
5+
interval
6+
StdDev : Standard deviation of all
7+
measurements
8+
Median : Value separating the higher half of all measurements (50th
9+
percentile)
10+
Completed Work Items : The number of work items that have been processed in ThreadPool (per single
11+
operation)
12+
Lock Contentions : The number of times there was contention upon trying to take a Monitor's lock (per single
13+
operation)
14+
Gen0 : GC Generation 0 collects per 1000
15+
operations
16+
Allocated : Allocated memory per single operation (managed only, inclusive, 1KB =
17+
1024B)
18+
1 ns : 1 Nanosecond (0.000000001 sec)
19+
120
# YDB .NET SDK Session Pool Benchmarks
221

322
| Method | Mean | Error | StdDev | Completed Work Items | Lock Contentions | Gen0 | Allocated |
423
|--------------------------|-------------:|------------:|------------:|---------------------:|-----------------:|-------:|----------:|
524
| SingleThreaded_OpenClose | 130.2 ns | 0.91 ns | 0.71 ns | 0.0000 | - | 0.0257 | 216 B |
625
| MultiThreaded_OpenClose | 41,667.8 ns | 1,065.07 ns | 3,140.37 ns | 20.0018 | 0.3466 | 1.0376 | 8851 B |
726
| HighContention_OpenClose | 130,331.1 ns | 2,569.39 ns | 6,106.44 ns | 100.0000 | 1.9094 | 5.1270 | 43421 B |
8-
| SessionReuse_Pattern | 204,351.2 ns | 4,038.25 ns | 7,485.16 ns | 20.0000 | 3.6716 | 5.6152 | 47762 B |
27+
| SessionReuse_Pattern | 204,351.2 ns | 4,038.25 ns | 7,485.16 ns | 20.0000 | 3.6716 | 5.6152 | 47762 B |
28+
29+
# YDB .NET SDK Session Source Benchmarks (Npgsql)
30+
31+
| Method | Mean | Error | StdDev | Median | Completed Work Items | Lock Contentions | Gen0 | Allocated |
32+
|--------------------------|--------------:|-------------:|-------------:|--------------:|---------------------:|-----------------:|-------:|----------:|
33+
| SingleThreaded_OpenClose | 25.82 ns | 0.141 ns | 0.125 ns | 25.78 ns | - | - | - | - |
34+
| MultiThreaded_OpenClose | 20,893.61 ns | 829.087 ns | 2,431.569 ns | 19,694.30 ns | 20.0033 | 0.0303 | 0.5188 | 4526 B |
35+
| HighContention_OpenClose | 108,688.27 ns | 2,160.177 ns | 3,298.819 ns | 108,755.99 ns | 100.0017 | 3.8002 | 2.5635 | 21839 B |
36+
| SessionReuse_Pattern | 130,849.34 ns | 2,616.397 ns | 4,977.967 ns | 129,920.39 ns | 20.0000 | 5.4443 | 0.4883 | 4588 B |
37+
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
using BenchmarkDotNet.Attributes;
2+
using Ydb.Query;
3+
using Ydb.Sdk.Ado.Session;
4+
using Ydb.Sdk.Value;
5+
6+
namespace Ydb.Sdk.Ado.Benchmarks;
7+
8+
[MemoryDiagnoser]
9+
[ThreadingDiagnoser]
10+
public class SessionSourceBenchmark
11+
{
12+
private PoolingSessionSource _poolingSessionSource = null!;
13+
private const int SessionPoolSize = 50;
14+
private const int ConcurrentTasks = 20;
15+
16+
[GlobalSetup]
17+
public void Setup()
18+
{
19+
var settings = new YdbConnectionStringBuilder { MaxSessionPool = SessionPoolSize };
20+
21+
_poolingSessionSource = new PoolingSessionSource(new MockSessionFactory(), settings);
22+
}
23+
24+
[Benchmark]
25+
public async Task SingleThreaded_OpenClose()
26+
{
27+
var session = await _poolingSessionSource.OpenSession();
28+
session.Close();
29+
}
30+
31+
[Benchmark]
32+
public async Task MultiThreaded_OpenClose()
33+
{
34+
var tasks = new Task[ConcurrentTasks];
35+
36+
for (var i = 0; i < ConcurrentTasks; i++)
37+
{
38+
tasks[i] = Task.Run(async () =>
39+
{
40+
var session = await _poolingSessionSource.OpenSession();
41+
session.Close();
42+
});
43+
}
44+
45+
await Task.WhenAll(tasks);
46+
}
47+
48+
[Benchmark]
49+
public async Task HighContention_OpenClose()
50+
{
51+
const int highContentionTasks = 100;
52+
var tasks = new Task[highContentionTasks];
53+
54+
for (var i = 0; i < highContentionTasks; i++)
55+
{
56+
tasks[i] = Task.Run(async () =>
57+
{
58+
var session = await _poolingSessionSource.OpenSession();
59+
session.Close();
60+
});
61+
}
62+
63+
await Task.WhenAll(tasks);
64+
}
65+
66+
[Benchmark]
67+
public async Task SessionReuse_Pattern()
68+
{
69+
const int iterations = 10;
70+
var tasks = new Task[ConcurrentTasks];
71+
72+
for (var i = 0; i < ConcurrentTasks; i++)
73+
{
74+
tasks[i] = Task.Run(async () =>
75+
{
76+
for (var j = 0; j < iterations; j++)
77+
{
78+
var session = await _poolingSessionSource.OpenSession();
79+
session.Close();
80+
}
81+
});
82+
}
83+
84+
await Task.WhenAll(tasks);
85+
}
86+
}
87+
88+
internal class MockSessionFactory : IPoolingSessionFactory
89+
{
90+
public IPoolingSession NewSession(PoolingSessionSource source) => new PoolingMockSession(source);
91+
}
92+
93+
internal class PoolingMockSession(PoolingSessionSource source) : IPoolingSession
94+
{
95+
public bool IsBroken => false;
96+
97+
public void Close() => source.Return(this);
98+
99+
public Task Open(CancellationToken cancellationToken) => Task.CompletedTask;
100+
101+
public Task DeleteSession() => Task.CompletedTask;
102+
103+
public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(string query,
104+
Dictionary<string, YdbValue> parameters, GrpcRequestSettings settings,
105+
TransactionControl? txControl) => throw new NotImplementedException();
106+
107+
public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
108+
throw new NotImplementedException();
109+
110+
public Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
111+
throw new NotImplementedException();
112+
113+
public void OnNotSuccessStatusCode(StatusCode code)
114+
{
115+
}
116+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
using System.CommandLine;
2+
using System.CommandLine.Binding;
3+
4+
namespace Ydb.Sdk.Ado.Stress.Loader;
5+
6+
public static class Cli
7+
{
8+
private static readonly Argument<string> ConnectionString = new(
9+
"connectionString",
10+
"YDB connection string ADO NET format"
11+
);
12+
13+
private static readonly Option<int> PeakRps = new("--peak-rps", () => 1000,
14+
"Peak RPS load (top of the step)");
15+
16+
private static readonly Option<int> MediumRps = new("--medium-rps", () => 100,
17+
"Medium RPS load (middle of the step)");
18+
19+
private static readonly Option<int> MinRps = new("--min-rps", () => 1,
20+
"Minimum RPS load (bottom of the step, 1-2 RPS)");
21+
22+
private static readonly Option<int> PeakDurationSeconds = new("--peak-duration", () => 600,
23+
"Duration of peak load in seconds");
24+
25+
private static readonly Option<int> MediumDurationSeconds = new("--medium-duration", () => 1800,
26+
"Duration of medium load in seconds");
27+
28+
private static readonly Option<int> MinDurationSeconds = new("--min-duration", () => 1800,
29+
"Duration of minimum load in seconds");
30+
31+
private static readonly Option<int> TotalTestTimeSeconds = new("--total-time", () => 14400,
32+
"Total test duration in seconds");
33+
34+
private static readonly Option<string?> SaFilePath = new("--sa-file-path",
35+
"Path to Service Account file for authentication");
36+
37+
private static readonly Option<string> TestQuery = new("--test-query",
38+
() => "SELECT 1 as test_column",
39+
"SQL query to execute during stress test"
40+
);
41+
42+
public static readonly RootCommand RootCommand = new("YDB ADO.NET Stress Test Tank - Variable Load Generator")
43+
{
44+
ConnectionString,
45+
PeakRps,
46+
MediumRps,
47+
MinRps,
48+
PeakDurationSeconds,
49+
MediumDurationSeconds,
50+
MinDurationSeconds,
51+
TotalTestTimeSeconds,
52+
SaFilePath,
53+
TestQuery
54+
};
55+
56+
static Cli()
57+
{
58+
RootCommand.SetHandler(async config =>
59+
{
60+
var stressLoader = new StressTestTank(config);
61+
await stressLoader.RunAsync();
62+
},
63+
new ConfigBinder(
64+
ConnectionString,
65+
PeakRps,
66+
MediumRps,
67+
MinRps,
68+
PeakDurationSeconds,
69+
MediumDurationSeconds,
70+
MinDurationSeconds,
71+
TotalTestTimeSeconds,
72+
SaFilePath,
73+
TestQuery
74+
)
75+
);
76+
}
77+
}
78+
79+
public class ConfigBinder(
80+
Argument<string> connectionString,
81+
Option<int> peakRps,
82+
Option<int> mediumRps,
83+
Option<int> minRps,
84+
Option<int> peakDurationSeconds,
85+
Option<int> mediumDurationSeconds,
86+
Option<int> minDurationSeconds,
87+
Option<int> totalTestTimeSeconds,
88+
Option<string?> saFilePath,
89+
Option<string> testQuery
90+
) : BinderBase<StressTestConfig>
91+
{
92+
protected override StressTestConfig GetBoundValue(BindingContext bindingContext) => new(
93+
ConnectionString: bindingContext.ParseResult.GetValueForArgument(connectionString),
94+
PeakRps: bindingContext.ParseResult.GetValueForOption(peakRps),
95+
MediumRps: bindingContext.ParseResult.GetValueForOption(mediumRps),
96+
MinRps: bindingContext.ParseResult.GetValueForOption(minRps),
97+
PeakDurationSeconds: bindingContext.ParseResult.GetValueForOption(peakDurationSeconds),
98+
MediumDurationSeconds: bindingContext.ParseResult.GetValueForOption(mediumDurationSeconds),
99+
MinDurationSeconds: bindingContext.ParseResult.GetValueForOption(minDurationSeconds),
100+
TotalTestTimeSeconds: bindingContext.ParseResult.GetValueForOption(totalTestTimeSeconds),
101+
SaFilePath: bindingContext.ParseResult.GetValueForOption(saFilePath),
102+
TestQuery: bindingContext.ParseResult.GetValueForOption(testQuery)!
103+
);
104+
}
105+
106+
public record StressTestConfig(
107+
string ConnectionString,
108+
int PeakRps,
109+
int MediumRps,
110+
int MinRps,
111+
int PeakDurationSeconds,
112+
int MediumDurationSeconds,
113+
int MinDurationSeconds,
114+
int TotalTestTimeSeconds,
115+
string? SaFilePath,
116+
string TestQuery
117+
);
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
using System.Runtime.CompilerServices;
2+
using Microsoft.Extensions.Logging;
3+
4+
namespace Ydb.Sdk.Ado.Stress.Loader;
5+
6+
public class LoadPattern(StressTestConfig config, ILogger logger)
7+
{
8+
public async IAsyncEnumerable<int> GetLoadStepsAsync(
9+
[EnumeratorCancellation] CancellationToken cancellationToken = default
10+
)
11+
{
12+
var totalDurationMs = config.TotalTestTimeSeconds * 1000;
13+
var elapsed = 0;
14+
15+
while (elapsed < totalDurationMs && !cancellationToken.IsCancellationRequested)
16+
{
17+
logger.LogInformation("[{Now}]: Peak load phase! Expected RPS: {PickRps}", DateTime.Now, config.PeakRps);
18+
yield return config.PeakRps;
19+
var peakDurationMs = Math.Min(config.PeakDurationSeconds * 1000, totalDurationMs - elapsed);
20+
await Task.Delay(peakDurationMs, cancellationToken);
21+
elapsed += peakDurationMs;
22+
23+
if (elapsed >= totalDurationMs)
24+
{
25+
break;
26+
}
27+
28+
logger.LogInformation("[{Now}]: Medium load phase (after peak)! Expected RPS: {MediumRps}", DateTime.Now,
29+
config.MediumRps);
30+
yield return config.MediumRps;
31+
var mediumDurationMs = Math.Min(config.MediumDurationSeconds * 1000, totalDurationMs - elapsed);
32+
await Task.Delay(mediumDurationMs, cancellationToken);
33+
elapsed += mediumDurationMs;
34+
35+
if (elapsed >= totalDurationMs)
36+
{
37+
break;
38+
}
39+
40+
logger.LogInformation("[{Now}]: Minimum load phase! Expected RPS: {MediumRps}", DateTime.Now,
41+
config.MinRps);
42+
yield return config.MinRps;
43+
var minDurationMs = Math.Min(config.MinDurationSeconds * 1000, totalDurationMs - elapsed);
44+
await Task.Delay(minDurationMs, cancellationToken);
45+
elapsed += minDurationMs;
46+
47+
if (elapsed >= totalDurationMs)
48+
{
49+
break;
50+
}
51+
52+
logger.LogInformation("[{Now}]: Medium load phase (before next peak)! Expected RPS: {MediumRps}",
53+
DateTime.Now, config.MediumRps);
54+
yield return config.MediumRps;
55+
var finalMediumDurationMs = Math.Min(config.MediumDurationSeconds * 1000, totalDurationMs - elapsed);
56+
await Task.Delay(finalMediumDurationMs, cancellationToken);
57+
elapsed += finalMediumDurationMs;
58+
}
59+
}
60+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
using System.CommandLine;
2+
using Ydb.Sdk.Ado.Stress.Loader;
3+
4+
return await Cli.RootCommand.InvokeAsync(args);

0 commit comments

Comments
 (0)