Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/Ydb.Sdk.AdoNet.QuickStart/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,8 @@ ORDER BY -- Sorting of the results.

while (await ydbDataReader.ReadAsync())
{
_logger.LogInformation("season_title: {}, series_title: {}, series_id: {}, season_id: {}",
_logger.LogInformation("season_title: {SeasonTitle}, series_title: {SeriesTitle}, " +
"series_id: {SeriesId}, season_id: {SeasonId}",
ydbDataReader.GetString("season_title"), ydbDataReader.GetString("series_title"),
ydbDataReader.GetUint64(2), ydbDataReader.GetUint64(3));
}
Expand Down
2 changes: 1 addition & 1 deletion src/EFCore.Ydb/src/Storage/Internal/YdbDatabaseCreator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public override async Task DeleteAsync(CancellationToken cancellationToken = def
.GetSchemaAsync("Tables", [null, "TABLE"], cancellationToken);

var dropTableOperations = (from DataRow row in dataTable.Rows
select new DropTableOperation { Name = row["table_name"].ToString() }).ToList();
select new DropTableOperation { Name = row["table_name"].ToString()! }).ToList();

await Dependencies.MigrationCommandExecutor.ExecuteNonQueryAsync(Dependencies.MigrationsSqlGenerator
.Generate(dropTableOperations), connection, cancellationToken);
Expand Down
1 change: 1 addition & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Feat: Implement `YdbRetryPolicy` with AWS-inspired Exponential Backoff and Jitter.
- Dev: LogLevel `Warning` -> `Debug` on DeleteSession has been `RpcException`.

## v0.22.0
Expand Down
13 changes: 13 additions & 0 deletions src/Ydb.Sdk/src/Ado/Internal/Random.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Ydb.Sdk.Ado.Internal;

public interface IRandom
{
public int Next(int maxValue);
}

internal class ThreadLocalRandom : IRandom
{
internal static readonly ThreadLocalRandom Instance = new();

public int Next(int maxValue) => Random.Shared.Next(maxValue);
}
18 changes: 16 additions & 2 deletions src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,25 @@ public static class StatusCodeUtils
internal static StatusCode Code(this StatusIds.Types.StatusCode statusCode) =>
Enum.IsDefined(typeof(StatusCode), (int)statusCode) ? (StatusCode)statusCode : StatusCode.Unavailable;

internal static bool IsNotSuccess(this StatusIds.Types.StatusCode code) =>
code != StatusIds.Types.StatusCode.Success;
internal static bool IsNotSuccess(this StatusIds.Types.StatusCode statusCode) =>
statusCode != StatusIds.Types.StatusCode.Success;

internal static string ToMessage(this StatusCode statusCode, IReadOnlyList<IssueMessage> issueMessages) =>
issueMessages.Count == 0
? $"Status: {statusCode}"
: $"Status: {statusCode}, Issues:{Environment.NewLine}{issueMessages.IssuesToString()}";

internal static bool IsTransient(this StatusCode statusCode) => statusCode is
StatusCode.BadSession or
StatusCode.SessionBusy or
StatusCode.Aborted or
StatusCode.Unavailable or
StatusCode.Overloaded or
StatusCode.SessionExpired or
StatusCode.ClientTransportResourceExhausted;

internal static bool IsTransientWhenIdempotent(this StatusCode statusCode) => statusCode.IsTransient() ||
statusCode is StatusCode.Undetermined or
StatusCode.ClientTransportUnknown or
StatusCode.ClientTransportUnavailable;
}
6 changes: 6 additions & 0 deletions src/Ydb.Sdk/src/Ado/RetryPolicy/IRetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Ydb.Sdk.Ado.RetryPolicy;

public interface IRetryPolicy
{
public TimeSpan? GetNextDelay(YdbException ydbException, int attempt);
}
71 changes: 71 additions & 0 deletions src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using Ydb.Sdk.Ado.Internal;

namespace Ydb.Sdk.Ado.RetryPolicy;

/// <summary>
/// See <a href="https://aws.amazon.com/ru/blogs/architecture/exponential-backoff-and-jitter/">AWS paper</a>
/// </summary>
public class YdbRetryPolicy : IRetryPolicy
{
public static readonly YdbRetryPolicy Default = new(YdbRetryPolicyConfig.Default);

private readonly int _maxAttempt;
private readonly int _fastBackoffBaseMs;
private readonly int _slowBackoffBaseMs;
private readonly int _fastCeiling;
private readonly int _slowCeiling;
private readonly int _fastCapBackoffMs;
private readonly int _slowCapBackoffMs;
private readonly bool _enableRetryIdempotence;
private readonly IRandom _random;

public YdbRetryPolicy(YdbRetryPolicyConfig config)
{
_maxAttempt = config.MaxAttempt;
_fastBackoffBaseMs = config.FastBackoffBaseMs;
_slowBackoffBaseMs = config.SlowBackoffBaseMs;
_fastCeiling = (int)Math.Ceiling(Math.Log(config.FastCapBackoffMs + 1, 2));
_slowCeiling = (int)Math.Ceiling(Math.Log(config.SlowCapBackoffMs + 1, 2));
_fastCapBackoffMs = config.FastCapBackoffMs;
_slowCapBackoffMs = config.SlowCapBackoffMs;
_enableRetryIdempotence = config.EnableRetryIdempotence;
_random = ThreadLocalRandom.Instance;
}

internal YdbRetryPolicy(YdbRetryPolicyConfig config, IRandom random) : this(config)
{
_random = random;
}

public TimeSpan? GetNextDelay(YdbException ydbException, int attempt)
{
if (attempt >= _maxAttempt || (!_enableRetryIdempotence && !ydbException.IsTransient))
return null;

return ydbException.Code switch
{
StatusCode.BadSession or StatusCode.SessionBusy => TimeSpan.Zero,
StatusCode.Aborted or StatusCode.Undetermined =>
FullJitter(_fastBackoffBaseMs, _fastCapBackoffMs, _fastCeiling, attempt, _random),
StatusCode.Unavailable or StatusCode.ClientTransportUnknown or StatusCode.ClientTransportUnavailable =>
EqualJitter(_fastBackoffBaseMs, _fastCapBackoffMs, _fastCeiling, attempt, _random),
StatusCode.Overloaded or StatusCode.ClientTransportResourceExhausted =>
EqualJitter(_slowBackoffBaseMs, _slowCapBackoffMs, _slowCeiling, attempt, _random),
_ => null
};
}

private static TimeSpan FullJitter(int backoffBaseMs, int capMs, int ceiling, int attempt, IRandom random) =>
TimeSpan.FromMilliseconds(random.Next(CalculateBackoff(backoffBaseMs, capMs, ceiling, attempt) + 1));

private static TimeSpan EqualJitter(int backoffBaseMs, int capMs, int ceiling, int attempt, IRandom random)
{
var calculatedBackoff = CalculateBackoff(backoffBaseMs, capMs, ceiling, attempt);
var temp = calculatedBackoff / 2;

return TimeSpan.FromMilliseconds(temp + calculatedBackoff % 2 + random.Next(temp + 1));
}

private static int CalculateBackoff(int backoffBaseMs, int capMs, int ceiling, int attempt) =>
Math.Min(backoffBaseMs * (1 << Math.Min(ceiling, attempt)), capMs);
}
18 changes: 18 additions & 0 deletions src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicyConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace Ydb.Sdk.Ado.RetryPolicy;

public class YdbRetryPolicyConfig
{
public static readonly YdbRetryPolicyConfig Default = new();

public int MaxAttempt { get; init; } = 10;

public int FastBackoffBaseMs { get; init; } = 5;

public int SlowBackoffBaseMs { get; init; } = 50;

public int FastCapBackoffMs { get; init; } = 500;

public int SlowCapBackoffMs { get; init; } = 5_000;

public bool EnableRetryIdempotence { get; init; } = false;
}
35 changes: 18 additions & 17 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,6 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
ConnectionStringBuilder = connectionStringBuilder;
}

public IBulkUpsertImporter BeginBulkUpsertImport(
string name,
IReadOnlyList<string> columns,
CancellationToken cancellationToken = default)
{
ThrowIfConnectionClosed();
if (CurrentTransaction is { Completed: false })
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");

var database = ConnectionStringBuilder.Database.TrimEnd('/');
var tablePath = name.StartsWith(database) ? name : $"{database}/{name}";

var maxBytes = ConnectionStringBuilder.MaxSendMessageSize;

return new BulkUpsertImporter(Session.Driver, tablePath, columns, maxBytes, cancellationToken);
}

protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
{
ThrowIfConnectionClosed();
Expand Down Expand Up @@ -279,4 +262,22 @@ public override async ValueTask DisposeAsync()
/// to their pool.
/// </summary>
public static Task ClearAllPools() => PoolManager.ClearAllPools();

public IBulkUpsertImporter BeginBulkUpsertImport(
string name,
IReadOnlyList<string> columns,
CancellationToken cancellationToken = default)
{
ThrowIfConnectionClosed();

if (CurrentTransaction is { Completed: false })
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");

var database = ConnectionStringBuilder.Database.TrimEnd('/');
var tablePath = name.StartsWith(database) ? name : $"{database}/{name}";

var maxBytes = ConnectionStringBuilder.MaxSendMessageSize;

return new BulkUpsertImporter(Session.Driver, tablePath, columns, maxBytes, cancellationToken);
}
}
7 changes: 2 additions & 5 deletions src/Ydb.Sdk/src/Ado/YdbException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ internal YdbException(RpcException e) : this(e.Status.Code(), "Transport RPC cal
internal static YdbException FromServer(StatusIds.Types.StatusCode statusCode, IReadOnlyList<IssueMessage> issues)
{
var code = statusCode.Code();

var message = code.ToMessage(issues);

return new YdbException(code, message);
Expand All @@ -28,10 +27,8 @@ internal YdbException(StatusCode statusCode, string message, Exception? innerExc
: base(message, innerException)
{
Code = statusCode;
var policy = RetrySettings.DefaultInstance.GetRetryRule(statusCode).Policy;

IsTransient = policy == RetryPolicy.Unconditional;
IsTransientWhenIdempotent = policy != RetryPolicy.None;
IsTransient = statusCode.IsTransient();
IsTransientWhenIdempotent = statusCode.IsTransientWhenIdempotent();
// TODO: Add SQLSTATE message with order with https://en.wikipedia.org/wiki/SQLSTATE
}

Expand Down
21 changes: 1 addition & 20 deletions src/Ydb.Sdk/src/Pool/EndpointPool.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Immutable;
using Microsoft.Extensions.Logging;
using Ydb.Sdk.Ado.Internal;

namespace Ydb.Sdk.Pool;

Expand Down Expand Up @@ -152,23 +153,3 @@ private record PriorityEndpoint(string Endpoint)
}

public record EndpointSettings(long NodeId, string Endpoint, string LocationDc);

public interface IRandom
{
public int Next(int maxValue);
}

internal class ThreadLocalRandom : IRandom
{
internal static readonly ThreadLocalRandom Instance = new();

[ThreadStatic] private static Random? _random;

private static Random ThreadStaticRandom => _random ??= new Random();

private ThreadLocalRandom()
{
}

public int Next(int maxValue) => ThreadStaticRandom.Next(maxValue);
}
5 changes: 2 additions & 3 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Stress.Loader/StressLoadTank.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@ Starting YDB ADO.NET Stress Test Tank
Peak RPS: {PeakRps}
Medium RPS: {MediumRps}
Min RPS: {MinRps}
Load Pattern: Peak({PeakDuration}s) -> Medium({MediumDuration}s) -> Min({MinDuration}s) -> Medium({MediumDuration}s)
Load Pattern: Peak({PeakDuration}s) -> Medium({MediumDuration}s) -> Min({MinDuration}s)
Total Test Time: {TotalTime}s
Test Query: {TestQuery}
""",
_config.PeakRps, _config.MediumRps, _config.MinRps, _config.PeakDurationSeconds,
_config.MediumDurationSeconds, _config.MinDurationSeconds, _config.MediumDurationSeconds,
_config.TotalTestTimeSeconds, _config.TestQuery
_config.MediumDurationSeconds, _config.MinDurationSeconds, _config.TotalTestTimeSeconds, _config.TestQuery
);

var ctsRunJob = new CancellationTokenSource();
Expand Down
5 changes: 3 additions & 2 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Pool/EndpointPoolTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Immutable;
using Moq;
using Xunit;
using Ydb.Sdk.Ado.Internal;
using Ydb.Sdk.Ado.Tests.Utils;
using Ydb.Sdk.Pool;

Expand Down Expand Up @@ -112,7 +113,7 @@ public void PessimizeEndpoint_Reset_WhenPessimizedMajorityNodesThenResetAndAddNe
listNewEndpointSettings.Add(new EndpointSettings(6, "n6.ydb.tech", "VLA"));
listNewEndpointSettings.Add(new EndpointSettings(7, "n7.ydb.tech", "MAN"));

_endpointPool.Reset(listNewEndpointSettings.ToImmutableArray());
_endpointPool.Reset([..listNewEndpointSettings]);

for (var it = 0; it < listNewEndpointSettings.Count; it++)
{
Expand Down Expand Up @@ -141,7 +142,7 @@ public void PessimizeEndpoint_Reset_WhenResetNewNodes_ReturnRemovedNodes()
listNewEndpointSettings.Add(new EndpointSettings(6, "n6.ydb.tech", "VLA"));
listNewEndpointSettings.Add(new EndpointSettings(7, "n7.ydb.tech", "MAN"));

var removed = _endpointPool.Reset(listNewEndpointSettings.ToImmutableArray());
var removed = _endpointPool.Reset([..listNewEndpointSettings]);

Assert.Equal(2, removed.Length);
Assert.Equal("n1.ydb.tech", removed[0]);
Expand Down
Loading
Loading