Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 35 additions & 16 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/Cli.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ public static class Cli
private static readonly Option<int> TotalTestTimeSeconds = new("--total-time", () => 14400,
"Total test duration in seconds");

private static readonly Option<string?> SaFilePath = new("--sa-file-path",
"Path to Service Account file for authentication");
private static readonly Option<string?> SaFilePath =
new("--sa-file-path", "Path to Service Account file for authentication");

private static readonly Option<string> TestQuery = new("--test-query",
() => "SELECT 1 as test_column",
"SQL query to execute during stress test"
);
private static readonly Option<string> TestQuery =
new("--test-query", () => "SELECT 1 as test_column", "SQL query to execute during stress test");

private static readonly Option<int> ThrottlingInterval =
new("--throttling-interval", () => 100, "Throttling interval in seconds");

private static readonly Option<int> WorkerCount =
new("--worker-count", () => 100, "Worker count");

private static readonly Command StressLoadCommand = new(
"cycle",
Expand All @@ -53,7 +57,9 @@ public static class Cli
MinDurationSeconds,
TotalTestTimeSeconds,
SaFilePath,
TestQuery
TestQuery,
ThrottlingInterval,
WorkerCount
};

private static readonly Command LoadCommand = new(
Expand All @@ -63,7 +69,8 @@ public static class Cli
TotalTestTimeSeconds,
ConnectionString,
SaFilePath,
TestQuery
TestQuery,
WorkerCount
};

public static readonly RootCommand RootCommand = new("YDB ADO.NET Stress Test Tank - Variable Load Generator")
Expand All @@ -89,7 +96,9 @@ static Cli()
MinDurationSeconds,
TotalTestTimeSeconds,
SaFilePath,
TestQuery
TestQuery,
ThrottlingInterval,
WorkerCount
)
);

Expand All @@ -102,7 +111,8 @@ static Cli()
ConnectionString,
TotalTestTimeSeconds,
SaFilePath,
TestQuery
TestQuery,
WorkerCount
)
);
}
Expand All @@ -118,7 +128,9 @@ public class StressConfigBinder(
Option<int> minDurationSeconds,
Option<int> totalTestTimeSeconds,
Option<string?> saFilePath,
Option<string> testQuery
Option<string> testQuery,
Option<int> throttlingInterval,
Option<int> workerCount
) : BinderBase<StressConfig>
{
protected override StressConfig GetBoundValue(BindingContext bindingContext) => new(
Expand All @@ -131,22 +143,26 @@ Option<string> testQuery
MinDurationSeconds: bindingContext.ParseResult.GetValueForOption(minDurationSeconds),
TotalTestTimeSeconds: bindingContext.ParseResult.GetValueForOption(totalTestTimeSeconds),
SaFilePath: bindingContext.ParseResult.GetValueForOption(saFilePath),
TestQuery: bindingContext.ParseResult.GetValueForOption(testQuery)!
TestQuery: bindingContext.ParseResult.GetValueForOption(testQuery)!,
ThrottlingInterval: bindingContext.ParseResult.GetValueForOption(throttlingInterval),
WorkerCount: bindingContext.ParseResult.GetValueForOption(workerCount)
);
}

public class LoadConfigBinder(
Argument<string> connectionString,
Option<int> totalTestTimeSeconds,
Option<string?> saFilePath,
Option<string> testQuery
Option<string> testQuery,
Option<int> workerCount
) : BinderBase<LoadConfig>
{
protected override LoadConfig GetBoundValue(BindingContext bindingContext) => new(
bindingContext.ParseResult.GetValueForArgument(connectionString),
bindingContext.ParseResult.GetValueForOption(totalTestTimeSeconds),
bindingContext.ParseResult.GetValueForOption(saFilePath),
bindingContext.ParseResult.GetValueForOption(testQuery)!
bindingContext.ParseResult.GetValueForOption(testQuery)!,
bindingContext.ParseResult.GetValueForOption(workerCount)
);
}

Expand All @@ -160,12 +176,15 @@ public record StressConfig(
int MinDurationSeconds,
int TotalTestTimeSeconds,
string? SaFilePath,
string TestQuery
string TestQuery,
int ThrottlingInterval,
int WorkerCount
);

public record LoadConfig(
string ConnectionString,
int TotalTestTimeSeconds,
string? SaFilePath,
string TestQuery
string TestQuery,
int WorkerCount
);
51 changes: 30 additions & 21 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/StressLoadTank.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,35 +110,44 @@ private async Task StartWorkersForRpsAsync(int targetRps, CancellationToken canc

_logger.LogInformation("Starting shooting for {TargetRps} RPS", targetRps);

while (!cancellationToken.IsCancellationRequested)
var workers = new List<Task>();
for (var i = 0; i < _config.WorkerCount; i++)
{
using var lease = await rateLimiter.AcquireAsync(1, cancellationToken);

if (!lease.IsAcquired)
{
continue;
}

_ = Task.Run(async () =>
workers.Add(Task.Run(async () =>
{
try
while (!cancellationToken.IsCancellationRequested)
{
await using var ydbConnection = new YdbConnection(_settings);
await ydbConnection.OpenAsync(cancellationToken);
await new YdbCommand(ydbConnection) { CommandText = _config.TestQuery }
.ExecuteNonQueryAsync(cancellationToken);
}
catch (YdbException e)
{
if (e.Code == StatusCode.ClientTransportTimeout)
// ReSharper disable once AccessToDisposedClosure
using var lease = await rateLimiter.AcquireAsync(1, cancellationToken);
await Task.Delay(Random.Shared.Next(_config.ThrottlingInterval), cancellationToken);

if (!lease.IsAcquired)
{
return;
await Task.Delay(10, cancellationToken);
continue;
}

_logger.LogError(e, "Fail operation");
try
{
await using var ydbConnection = new YdbConnection(_settings);
await ydbConnection.OpenAsync(cancellationToken);
await new YdbCommand(ydbConnection) { CommandText = _config.TestQuery }
.ExecuteNonQueryAsync(cancellationToken);
}
catch (YdbException e)
{
if (e.Code == StatusCode.ClientTransportTimeout)
{
return;
}

_logger.LogError(e, "Fail operation");
}
}
}, cancellationToken);
}, cancellationToken));
}

await Task.WhenAll(workers);
}

private void ValidateConfig()
Expand Down
Loading