Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/slo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ jobs:
include:
- workload: AdoNet
read_rps: 1000
write_rps: 1000
write_rps: 100
- workload: Dapper
read_rps: 1000
write_rps: 1000
write_rps: 100
- workload: EF
read_rps: 500
write_rps: 500
read_rps: 1000
write_rps: 100

concurrency:
group: slo-${{ github.ref }}-${{ matrix.workload }}
Expand Down Expand Up @@ -64,8 +64,8 @@ jobs:
--prom-pgw http://localhost:9091 \
--report-period 250 \
--time 600 \
--read-rps ${{matrix.read_rps || 1000 }} \
--write-rps ${{matrix.write_rps || 1000 }} \
--read-rps ${{ matrix.read_rps }} \
--write-rps ${{ matrix.write_rps }} \
--read-timeout 1000 \
--write-timeout 1000

Expand Down
7 changes: 3 additions & 4 deletions slo/src/AdoNet/SloTableContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System.Data;
using Internal;
using Microsoft.Extensions.Logging;
using Polly;
using Ydb.Sdk;
using Ydb.Sdk.Ado;
Expand All @@ -9,9 +8,9 @@ namespace AdoNet;

public class SloTableContext : SloTableContext<YdbDataSource>
{
private static readonly AsyncPolicy Policy = Polly.Policy.Handle<YdbException>(exception => exception.IsTransient)
.WaitAndRetryAsync(10, attempt => TimeSpan.FromMilliseconds(attempt * 10),
(e, _, _, _) => { Logger.LogWarning(e, "Failed read / write operation"); });
private static readonly AsyncPolicy Policy = Polly.Policy
.Handle<YdbException>(exception => exception.IsTransient)
.RetryAsync(10);

protected override string Job => "AdoNet";

Expand Down
7 changes: 3 additions & 4 deletions slo/src/Dapper/SloTableContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using Dapper;
using Internal;
using Microsoft.Extensions.Logging;
using Polly;
using Ydb.Sdk;
using Ydb.Sdk.Ado;
Expand All @@ -9,9 +8,9 @@ namespace AdoNet.Dapper;

public class SloTableContext : SloTableContext<YdbDataSource>
{
private static readonly AsyncPolicy Policy = Polly.Policy.Handle<YdbException>(exception => exception.IsTransient)
.WaitAndRetryAsync(10, attempt => TimeSpan.FromMilliseconds(attempt * 10),
(e, _, _, _) => { Logger.LogWarning(e, "Failed read / write operation"); });
private static readonly AsyncPolicy Policy = Polly.Policy
.Handle<YdbException>(exception => exception.IsTransient)
.RetryAsync(10);

protected override string Job => "Dapper";

Expand Down
44 changes: 26 additions & 18 deletions slo/src/Internal/SloTableContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public interface ISloContext

public abstract class SloTableContext<T> : ISloContext
{
private const int IntervalMs = 100;

protected static readonly ILogger Logger = ISloContext.Factory.CreateLogger<SloTableContext<T>>();

private volatile int _maxId;
Expand Down Expand Up @@ -95,11 +97,13 @@ public async Task Run(RunConfig runConfig)

var writeLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions
{
Window = TimeSpan.FromMilliseconds(100), PermitLimit = runConfig.WriteRps / 10, QueueLimit = int.MaxValue
Window = TimeSpan.FromMilliseconds(IntervalMs), PermitLimit = runConfig.WriteRps / 10,
QueueLimit = int.MaxValue
});
var readLimiter = new FixedWindowRateLimiter(new FixedWindowRateLimiterOptions
{
Window = TimeSpan.FromMilliseconds(100), PermitLimit = runConfig.ReadRps / 10, QueueLimit = int.MaxValue
Window = TimeSpan.FromMilliseconds(IntervalMs), PermitLimit = runConfig.ReadRps / 10,
QueueLimit = int.MaxValue
});

var cancellationTokenSource = new CancellationTokenSource();
Expand All @@ -124,7 +128,7 @@ public async Task Run(RunConfig runConfig)
Logger.LogInformation("Run task is finished");
return;

Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
async Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
Func<T, RunConfig, Task<(int, StatusCode)>> action)
{
var metricFactory = Metrics.WithLabels(new Dictionary<string, string>
Expand Down Expand Up @@ -193,21 +197,22 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
["error_type"]
);

// ReSharper disable once MethodSupportsCancellation
return Task.Run(async () =>
var workJobs = new List<Task>();

for (var i = 0; i < 10; i++)
{
while (!cancellationTokenSource.Token.IsCancellationRequested)
workJobs.Add(Task.Run(async () =>
{
using var lease = await rateLimitPolicy
.AcquireAsync(cancellationToken: cancellationTokenSource.Token);

if (!lease.IsAcquired)
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
continue;
}
using var lease = await rateLimitPolicy
.AcquireAsync(cancellationToken: cancellationTokenSource.Token);

if (!lease.IsAcquired)
{
await Task.Delay(Random.Shared.Next(IntervalMs / 2), cancellationTokenSource.Token);
}

_ = Task.Run(async () =>
{
try
{
pendingOperations.Inc();
Expand Down Expand Up @@ -235,11 +240,14 @@ Task ShootingTask(RateLimiter rateLimitPolicy, string operationType,
{
Logger.LogError(e, "Fail operation!");
}
}, cancellationTokenSource.Token);
}
}
}, cancellationTokenSource.Token));
}

// ReSharper disable once MethodSupportsCancellation
await Task.WhenAll(workJobs);

Logger.LogInformation("{ShootingName} shooting is stopped", operationType);
});
Logger.LogInformation("{ShootingName} shooting is stopped", operationType);
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
- Dev: LogLevel `Warning` -> `Debug` on DeleteSession has been `RpcException`.

## v0.22.0

- Added `YdbDbType` property to `YdbParameter`, allowing to explicitly specify YDB-specific data types for parameter mapping.
Expand All @@ -21,7 +23,6 @@
- Added new ADO.NET options:
- `MinSessionPool`: The minimum connection pool size.
- `SessionIdleTimeout`: The time (in seconds) to wait before closing idle session in the pool if the count of all sessions exceeds `MinSessionPool`.
- `SessionPruningInterval`: How many seconds the pool waits before attempting to prune idle sessions (see `SessionIdleTimeout`).
- Fixed bug `Reader`: unhandled exception in `TryReadRequestBytes(long bytes)`.
- Handle `YdbException` on `DeleteSession`.
- Do not invoke `DeleteSession` if the session is not active.
Expand Down
2 changes: 1 addition & 1 deletion src/Ydb.Sdk/src/Ado/Session/PoolingSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ internal override async Task DeleteSession()
}
catch (Exception e)
{
_logger.LogWarning(e, "Error occurred while deleting session[{SessionId}] (NodeId = {NodeId})",
_logger.LogDebug(e, "Error occurred while deleting session[{SessionId}] (NodeId = {NodeId})",
SessionId, NodeId);
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/Ydb.Sdk/src/Ado/YdbConnectionStringBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Ydb.Sdk.Auth;
using Ydb.Sdk.Pool;
using Ydb.Sdk.Transport;

namespace Ydb.Sdk.Ado;
Expand All @@ -29,8 +28,8 @@ private void InitDefaultValues()
_port = 2136;
_database = "/local";
_minSessionPool = 0;
_maxSessionPool = SessionPoolDefaultSettings.MaxSessionPool;
_createSessionTimeout = SessionPoolDefaultSettings.CreateSessionTimeoutSeconds;
_maxSessionPool = 100;
_createSessionTimeout = 5;
_sessionIdleTimeout = 300;
_useTls = false;
_connectTimeout = GrpcDefaultSettings.ConnectTimeoutSeconds;
Expand Down
Loading