Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions src/Ydb.Sdk/src/Ado/Session/PoolingSessionSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Microsoft.Extensions.Logging;

namespace Ydb.Sdk.Ado.Session;

internal sealed class PoolingSessionSource : ISessionSource<IPoolingSession>
{
private readonly ILogger<PoolingSessionSource> _logger;
private readonly IPoolingSessionFactory _sessionFactory;

private readonly int _minSessionSize;
Expand All @@ -35,12 +33,10 @@ internal sealed class PoolingSessionSource : ISessionSource<IPoolingSession>
private volatile int _idleCount;

public PoolingSessionSource(
IDriver driver,
IPoolingSessionFactory sessionFactory,
YdbConnectionStringBuilder settings
)
{
_logger = driver.LoggerFactory.CreateLogger<PoolingSessionSource>();
_sessionFactory = sessionFactory;

_minSessionSize = settings.MinSessionPool;
Expand Down Expand Up @@ -71,7 +67,7 @@ YdbConnectionStringBuilder settings
_pruningTimerEnabled = false;
}

public ValueTask<IPoolingSession> OpenSession(CancellationToken cancellationToken) =>
public ValueTask<IPoolingSession> OpenSession(CancellationToken cancellationToken = default) =>
TryGetIdleSession(out var session) ? new ValueTask<IPoolingSession>(session) : RentAsync(cancellationToken);

public void Return(IPoolingSession session)
Expand Down Expand Up @@ -153,7 +149,7 @@ private async ValueTask<IPoolingSession> RentAsync(CancellationToken cancellatio

// Only start pruning if we've incremented open count past _min.
// Note that we don't do it only once, on equality, because the thread which incremented open count past _min might get exception
// on NpgsqlSession.Open due to timeout, CancellationToken or other reasons.
// on Session.Open due to timeout, CancellationToken or other reasons.
if (numSessions >= _minSessionSize)
{
UpdatePruningTimer();
Expand Down Expand Up @@ -280,10 +276,6 @@ private static void PruneIdleSessions(object? state)
pool._pruningTimer.Change(pool._pruningSamplingInterval, Timeout.InfiniteTimeSpan);
}

if (pool._logger.IsEnabled(LogLevel.Debug))
{
}

while (toPrune > 0 &&
pool._numSessions > pool._minSessionSize &&
pool._idleSessionReader.TryRead(out var session) &&
Expand Down
31 changes: 30 additions & 1 deletion src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,37 @@
// * Legends *
Mean : Arithmetic mean of all
measurements
Error : Half of 99.9% confidence
interval
StdDev : Standard deviation of all
measurements
Median : Value separating the higher half of all measurements (50th
percentile)
Completed Work Items : The number of work items that have been processed in ThreadPool (per single
operation)
Lock Contentions : The number of times there was contention upon trying to take a Monitor's lock (per single
operation)
Gen0 : GC Generation 0 collects per 1000
operations
Allocated : Allocated memory per single operation (managed only, inclusive, 1KB =
1024B)
1 ns : 1 Nanosecond (0.000000001 sec)

# YDB .NET SDK Session Pool Benchmarks

| Method | Mean | Error | StdDev | Completed Work Items | Lock Contentions | Gen0 | Allocated |
|--------------------------|-------------:|------------:|------------:|---------------------:|-----------------:|-------:|----------:|
| SingleThreaded_OpenClose | 130.2 ns | 0.91 ns | 0.71 ns | 0.0000 | - | 0.0257 | 216 B |
| MultiThreaded_OpenClose | 41,667.8 ns | 1,065.07 ns | 3,140.37 ns | 20.0018 | 0.3466 | 1.0376 | 8851 B |
| HighContention_OpenClose | 130,331.1 ns | 2,569.39 ns | 6,106.44 ns | 100.0000 | 1.9094 | 5.1270 | 43421 B |
| SessionReuse_Pattern | 204,351.2 ns | 4,038.25 ns | 7,485.16 ns | 20.0000 | 3.6716 | 5.6152 | 47762 B |
| SessionReuse_Pattern | 204,351.2 ns | 4,038.25 ns | 7,485.16 ns | 20.0000 | 3.6716 | 5.6152 | 47762 B |

# YDB .NET SDK Session Source Benchmarks (Npgsql)

| Method | Mean | Error | StdDev | Median | Completed Work Items | Lock Contentions | Gen0 | Allocated |
|--------------------------|--------------:|-------------:|-------------:|--------------:|---------------------:|-----------------:|-------:|----------:|
| SingleThreaded_OpenClose | 25.82 ns | 0.141 ns | 0.125 ns | 25.78 ns | - | - | - | - |
| MultiThreaded_OpenClose | 20,893.61 ns | 829.087 ns | 2,431.569 ns | 19,694.30 ns | 20.0033 | 0.0303 | 0.5188 | 4526 B |
| HighContention_OpenClose | 108,688.27 ns | 2,160.177 ns | 3,298.819 ns | 108,755.99 ns | 100.0017 | 3.8002 | 2.5635 | 21839 B |
| SessionReuse_Pattern | 130,849.34 ns | 2,616.397 ns | 4,977.967 ns | 129,920.39 ns | 20.0000 | 5.4443 | 0.4883 | 4588 B |

116 changes: 116 additions & 0 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Benchmarks/SessionSourceBenchmark.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using BenchmarkDotNet.Attributes;
using Ydb.Query;
using Ydb.Sdk.Ado.Session;
using Ydb.Sdk.Value;

namespace Ydb.Sdk.Ado.Benchmarks;

[MemoryDiagnoser]
[ThreadingDiagnoser]
public class SessionSourceBenchmark
{
private PoolingSessionSource _poolingSessionSource = null!;
private const int SessionPoolSize = 50;
private const int ConcurrentTasks = 20;

[GlobalSetup]
public void Setup()
{
var settings = new YdbConnectionStringBuilder { MaxSessionPool = SessionPoolSize };

_poolingSessionSource = new PoolingSessionSource(new MockSessionFactory(), settings);
}

[Benchmark]
public async Task SingleThreaded_OpenClose()
{
var session = await _poolingSessionSource.OpenSession();
session.Close();
}

[Benchmark]
public async Task MultiThreaded_OpenClose()
{
var tasks = new Task[ConcurrentTasks];

for (var i = 0; i < ConcurrentTasks; i++)
{
tasks[i] = Task.Run(async () =>
{
var session = await _poolingSessionSource.OpenSession();
session.Close();
});
}

await Task.WhenAll(tasks);
}

[Benchmark]
public async Task HighContention_OpenClose()
{
const int highContentionTasks = 100;
var tasks = new Task[highContentionTasks];

for (var i = 0; i < highContentionTasks; i++)
{
tasks[i] = Task.Run(async () =>
{
var session = await _poolingSessionSource.OpenSession();
session.Close();
});
}

await Task.WhenAll(tasks);
}

[Benchmark]
public async Task SessionReuse_Pattern()
{
const int iterations = 10;
var tasks = new Task[ConcurrentTasks];

for (var i = 0; i < ConcurrentTasks; i++)
{
tasks[i] = Task.Run(async () =>
{
for (var j = 0; j < iterations; j++)
{
var session = await _poolingSessionSource.OpenSession();
session.Close();
}
});
}

await Task.WhenAll(tasks);
}
}

internal class MockSessionFactory : IPoolingSessionFactory
{
public IPoolingSession NewSession(PoolingSessionSource source) => new PoolingMockSession(source);
}

internal class PoolingMockSession(PoolingSessionSource source) : IPoolingSession
{
public bool IsBroken => false;

public void Close() => source.Return(this);

public Task Open(CancellationToken cancellationToken) => Task.CompletedTask;

public Task DeleteSession() => Task.CompletedTask;

public ValueTask<IServerStream<ExecuteQueryResponsePart>> ExecuteQuery(string query,
Dictionary<string, YdbValue> parameters, GrpcRequestSettings settings,
TransactionControl? txControl) => throw new NotImplementedException();

public Task CommitTransaction(string txId, CancellationToken cancellationToken = default) =>
throw new NotImplementedException();

public Task RollbackTransaction(string txId, CancellationToken cancellationToken = default) =>
throw new NotImplementedException();

public void OnNotSuccessStatusCode(StatusCode code)
{
}
}
117 changes: 117 additions & 0 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/Cli.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
using System.CommandLine;
using System.CommandLine.Binding;

namespace Ydb.Sdk.Ado.Stress.Loader;

public static class Cli
{
private static readonly Argument<string> ConnectionString = new(
"connectionString",
"YDB connection string ADO NET format"
);

private static readonly Option<int> PeakRps = new("--peak-rps", () => 1000,
"Peak RPS load (top of the step)");

private static readonly Option<int> MediumRps = new("--medium-rps", () => 100,
"Medium RPS load (middle of the step)");

private static readonly Option<int> MinRps = new("--min-rps", () => 1,
"Minimum RPS load (bottom of the step, 1-2 RPS)");

private static readonly Option<int> PeakDurationSeconds = new("--peak-duration", () => 600,
"Duration of peak load in seconds");

private static readonly Option<int> MediumDurationSeconds = new("--medium-duration", () => 1800,
"Duration of medium load in seconds");

private static readonly Option<int> MinDurationSeconds = new("--min-duration", () => 1800,
"Duration of minimum load in seconds");

private static readonly Option<int> TotalTestTimeSeconds = new("--total-time", () => 14400,
"Total test duration in seconds");

private static readonly Option<string?> SaFilePath = new("--sa-file-path",
"Path to Service Account file for authentication");

private static readonly Option<string> TestQuery = new("--test-query",
() => "SELECT 1 as test_column",
"SQL query to execute during stress test"
);

public static readonly RootCommand RootCommand = new("YDB ADO.NET Stress Test Tank - Variable Load Generator")
{
ConnectionString,
PeakRps,
MediumRps,
MinRps,
PeakDurationSeconds,
MediumDurationSeconds,
MinDurationSeconds,
TotalTestTimeSeconds,
SaFilePath,
TestQuery
};

static Cli()
{
RootCommand.SetHandler(async config =>
{
var stressLoader = new StressTestTank(config);
await stressLoader.RunAsync();
},
new ConfigBinder(
ConnectionString,
PeakRps,
MediumRps,
MinRps,
PeakDurationSeconds,
MediumDurationSeconds,
MinDurationSeconds,
TotalTestTimeSeconds,
SaFilePath,
TestQuery
)
);
}
}

public class ConfigBinder(
Argument<string> connectionString,
Option<int> peakRps,
Option<int> mediumRps,
Option<int> minRps,
Option<int> peakDurationSeconds,
Option<int> mediumDurationSeconds,
Option<int> minDurationSeconds,
Option<int> totalTestTimeSeconds,
Option<string?> saFilePath,
Option<string> testQuery
) : BinderBase<StressTestConfig>
{
protected override StressTestConfig GetBoundValue(BindingContext bindingContext) => new(
ConnectionString: bindingContext.ParseResult.GetValueForArgument(connectionString),
PeakRps: bindingContext.ParseResult.GetValueForOption(peakRps),
MediumRps: bindingContext.ParseResult.GetValueForOption(mediumRps),
MinRps: bindingContext.ParseResult.GetValueForOption(minRps),
PeakDurationSeconds: bindingContext.ParseResult.GetValueForOption(peakDurationSeconds),
MediumDurationSeconds: bindingContext.ParseResult.GetValueForOption(mediumDurationSeconds),
MinDurationSeconds: bindingContext.ParseResult.GetValueForOption(minDurationSeconds),
TotalTestTimeSeconds: bindingContext.ParseResult.GetValueForOption(totalTestTimeSeconds),
SaFilePath: bindingContext.ParseResult.GetValueForOption(saFilePath),
TestQuery: bindingContext.ParseResult.GetValueForOption(testQuery)!
);
}

public record StressTestConfig(
string ConnectionString,
int PeakRps,
int MediumRps,
int MinRps,
int PeakDurationSeconds,
int MediumDurationSeconds,
int MinDurationSeconds,
int TotalTestTimeSeconds,
string? SaFilePath,
string TestQuery
);
60 changes: 60 additions & 0 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/LoadPattern.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
using System.Runtime.CompilerServices;
using Microsoft.Extensions.Logging;

namespace Ydb.Sdk.Ado.Stress.Loader;

public class LoadPattern(StressTestConfig config, ILogger logger)
{
public async IAsyncEnumerable<int> GetLoadStepsAsync(
[EnumeratorCancellation] CancellationToken cancellationToken = default
)
{
var totalDurationMs = config.TotalTestTimeSeconds * 1000;
var elapsed = 0;

while (elapsed < totalDurationMs && !cancellationToken.IsCancellationRequested)
{
logger.LogInformation("[{Now}]: Peak load phase! Expected RPS: {PickRps}", DateTime.Now, config.PeakRps);
yield return config.PeakRps;
var peakDurationMs = Math.Min(config.PeakDurationSeconds * 1000, totalDurationMs - elapsed);
await Task.Delay(peakDurationMs, cancellationToken);
elapsed += peakDurationMs;

if (elapsed >= totalDurationMs)
{
break;
}

logger.LogInformation("[{Now}]: Medium load phase (after peak)! Expected RPS: {MediumRps}", DateTime.Now,
config.MediumRps);
yield return config.MediumRps;
var mediumDurationMs = Math.Min(config.MediumDurationSeconds * 1000, totalDurationMs - elapsed);
await Task.Delay(mediumDurationMs, cancellationToken);
elapsed += mediumDurationMs;

if (elapsed >= totalDurationMs)
{
break;
}

logger.LogInformation("[{Now}]: Minimum load phase! Expected RPS: {MediumRps}", DateTime.Now,
config.MinRps);
yield return config.MinRps;
var minDurationMs = Math.Min(config.MinDurationSeconds * 1000, totalDurationMs - elapsed);
await Task.Delay(minDurationMs, cancellationToken);
elapsed += minDurationMs;

if (elapsed >= totalDurationMs)
{
break;
}

logger.LogInformation("[{Now}]: Medium load phase (before next peak)! Expected RPS: {MediumRps}",
DateTime.Now, config.MediumRps);
yield return config.MediumRps;
var finalMediumDurationMs = Math.Min(config.MediumDurationSeconds * 1000, totalDurationMs - elapsed);
await Task.Delay(finalMediumDurationMs, cancellationToken);
elapsed += finalMediumDurationMs;
}
}
}
4 changes: 4 additions & 0 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
using System.CommandLine;
using Ydb.Sdk.Ado.Stress.Loader;

return await Cli.RootCommand.InvokeAsync(args);
Loading
Loading