Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Ydb.Sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Feat: Implement `YdbRetryPolicy` with AWS-inspired Exponential Backoff and Jitter.
- Dev: LogLevel `Warning` -> `Debug` on DeleteSession has been `RpcException`.

## v0.22.0
Expand Down
13 changes: 13 additions & 0 deletions src/Ydb.Sdk/src/Ado/Internal/Random.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Ydb.Sdk.Ado.Internal;

public interface IRandom
{
public int Next(int maxValue);
}

internal class ThreadLocalRandom : IRandom
{
internal static readonly ThreadLocalRandom Instance = new();

public int Next(int maxValue) => Random.Shared.Next(maxValue);
}
18 changes: 16 additions & 2 deletions src/Ydb.Sdk/src/Ado/Internal/StatusCodeUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,25 @@ public static class StatusCodeUtils
internal static StatusCode Code(this StatusIds.Types.StatusCode statusCode) =>
Enum.IsDefined(typeof(StatusCode), (int)statusCode) ? (StatusCode)statusCode : StatusCode.Unavailable;

internal static bool IsNotSuccess(this StatusIds.Types.StatusCode code) =>
code != StatusIds.Types.StatusCode.Success;
internal static bool IsNotSuccess(this StatusIds.Types.StatusCode statusCode) =>
statusCode != StatusIds.Types.StatusCode.Success;

internal static string ToMessage(this StatusCode statusCode, IReadOnlyList<IssueMessage> issueMessages) =>
issueMessages.Count == 0
? $"Status: {statusCode}"
: $"Status: {statusCode}, Issues:{Environment.NewLine}{issueMessages.IssuesToString()}";

internal static bool IsTransient(this StatusCode statusCode) => statusCode is
StatusCode.BadSession or
StatusCode.SessionBusy or
StatusCode.Aborted or
StatusCode.Unavailable or
StatusCode.Overloaded or
StatusCode.SessionExpired or
StatusCode.ClientTransportResourceExhausted;

internal static bool IsTransientWhenIdempotent(this StatusCode statusCode) => statusCode.IsTransient() ||
statusCode is StatusCode.Undetermined or
StatusCode.ClientTransportUnknown or
StatusCode.ClientTransportUnavailable;
}
6 changes: 6 additions & 0 deletions src/Ydb.Sdk/src/Ado/RetryPolicy/IRetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Ydb.Sdk.Ado.RetryPolicy;

public interface IRetryPolicy
{
public TimeSpan? GetNextDelay(YdbException ydbException, int attempt);
}
71 changes: 71 additions & 0 deletions src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicy.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using Ydb.Sdk.Ado.Internal;

namespace Ydb.Sdk.Ado.RetryPolicy;

/// <summary>
/// See <a href="https://aws.amazon.com/ru/blogs/architecture/exponential-backoff-and-jitter/">AWS paper</a>
/// </summary>
public class YdbRetryPolicy : IRetryPolicy
{
public static readonly YdbRetryPolicy Default = new(YdbRetryPolicyConfig.Default);

private readonly int _maxAttempt;
private readonly int _fastBackoffBaseMs;
private readonly int _slowBackoffBaseMs;
private readonly int _fastCeiling;
private readonly int _slowCeiling;
private readonly int _fastCapBackoffMs;
private readonly int _slowCapBackoffMs;
private readonly bool _enableRetryIdempotence;
private readonly IRandom _random;

public YdbRetryPolicy(YdbRetryPolicyConfig config)
{
_maxAttempt = config.MaxAttempt;
_fastBackoffBaseMs = config.FastBackoffBaseMs;
_slowBackoffBaseMs = config.SlowBackoffBaseMs;
_fastCeiling = (int)Math.Ceiling(Math.Log(config.FastCapBackoffMs + 1, 2));
_slowCeiling = (int)Math.Ceiling(Math.Log(config.SlowCapBackoffMs + 1, 2));
_fastCapBackoffMs = config.FastCapBackoffMs;
_slowCapBackoffMs = config.SlowCapBackoffMs;
_enableRetryIdempotence = config.EnableRetryIdempotence;
_random = ThreadLocalRandom.Instance;
}

internal YdbRetryPolicy(YdbRetryPolicyConfig config, IRandom random) : this(config)
{
_random = random;
}

public TimeSpan? GetNextDelay(YdbException ydbException, int attempt)
{
if (attempt >= _maxAttempt || !_enableRetryIdempotence && !ydbException.IsTransient)
return null;

return ydbException.Code switch
{
StatusCode.BadSession or StatusCode.SessionBusy => TimeSpan.Zero,
StatusCode.Aborted or StatusCode.Undetermined =>
FullJitter(_fastBackoffBaseMs, _fastCapBackoffMs, _fastCeiling, attempt, _random),
StatusCode.Unavailable or StatusCode.ClientTransportUnknown or StatusCode.ClientTransportUnavailable =>
EqualJitter(_fastBackoffBaseMs, _fastCapBackoffMs, _fastCeiling, attempt, _random),
StatusCode.Overloaded or StatusCode.ClientTransportResourceExhausted =>
EqualJitter(_slowBackoffBaseMs, _slowCapBackoffMs, _slowCeiling, attempt, _random),
_ => null
};
}

private static TimeSpan FullJitter(int backoffBaseMs, int capMs, int ceiling, int attempt, IRandom random) =>
TimeSpan.FromMilliseconds(random.Next(CalculateBackoff(backoffBaseMs, capMs, ceiling, attempt) + 1));

private static TimeSpan EqualJitter(int backoffBaseMs, int capMs, int ceiling, int attempt, IRandom random)
{
var calculatedBackoff = CalculateBackoff(backoffBaseMs, capMs, ceiling, attempt);
var temp = calculatedBackoff / 2;

return TimeSpan.FromMilliseconds(temp + calculatedBackoff % 2 + random.Next(temp + 1));
}

private static int CalculateBackoff(int backoffBaseMs, int capMs, int ceiling, int attempt) =>
Math.Min(backoffBaseMs * (1 << Math.Min(ceiling, attempt)), capMs);
}
18 changes: 18 additions & 0 deletions src/Ydb.Sdk/src/Ado/RetryPolicy/YdbRetryPolicyConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace Ydb.Sdk.Ado.RetryPolicy;

public class YdbRetryPolicyConfig
{
public static readonly YdbRetryPolicyConfig Default = new();

public int MaxAttempt { get; init; } = 10;

public int FastBackoffBaseMs { get; init; } = 5;

public int SlowBackoffBaseMs { get; init; } = 50;

public int FastCapBackoffMs { get; init; } = 500;

public int SlowCapBackoffMs { get; init; } = 5_000;

public bool EnableRetryIdempotence { get; init; } = false;
}
35 changes: 18 additions & 17 deletions src/Ydb.Sdk/src/Ado/YdbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,6 @@ public YdbConnection(YdbConnectionStringBuilder connectionStringBuilder)
ConnectionStringBuilder = connectionStringBuilder;
}

public IBulkUpsertImporter BeginBulkUpsertImport(
string name,
IReadOnlyList<string> columns,
CancellationToken cancellationToken = default)
{
ThrowIfConnectionClosed();
if (CurrentTransaction is { Completed: false })
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");

var database = ConnectionStringBuilder.Database.TrimEnd('/');
var tablePath = name.StartsWith(database) ? name : $"{database}/{name}";

var maxBytes = ConnectionStringBuilder.MaxSendMessageSize;

return new BulkUpsertImporter(Session.Driver, tablePath, columns, maxBytes, cancellationToken);
}

protected override YdbTransaction BeginDbTransaction(IsolationLevel isolationLevel)
{
ThrowIfConnectionClosed();
Expand Down Expand Up @@ -279,4 +262,22 @@ public override async ValueTask DisposeAsync()
/// to their pool.
/// </summary>
public static Task ClearAllPools() => PoolManager.ClearAllPools();

public IBulkUpsertImporter BeginBulkUpsertImport(
string name,
IReadOnlyList<string> columns,
CancellationToken cancellationToken = default)
{
ThrowIfConnectionClosed();

if (CurrentTransaction is { Completed: false })
throw new InvalidOperationException("BulkUpsert cannot be used inside an active transaction.");

var database = ConnectionStringBuilder.Database.TrimEnd('/');
var tablePath = name.StartsWith(database) ? name : $"{database}/{name}";

var maxBytes = ConnectionStringBuilder.MaxSendMessageSize;

return new BulkUpsertImporter(Session.Driver, tablePath, columns, maxBytes, cancellationToken);
}
}
7 changes: 2 additions & 5 deletions src/Ydb.Sdk/src/Ado/YdbException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ internal YdbException(RpcException e) : this(e.Status.Code(), "Transport RPC cal
internal static YdbException FromServer(StatusIds.Types.StatusCode statusCode, IReadOnlyList<IssueMessage> issues)
{
var code = statusCode.Code();

var message = code.ToMessage(issues);

return new YdbException(code, message);
Expand All @@ -28,10 +27,8 @@ internal YdbException(StatusCode statusCode, string message, Exception? innerExc
: base(message, innerException)
{
Code = statusCode;
var policy = RetrySettings.DefaultInstance.GetRetryRule(statusCode).Policy;

IsTransient = policy == RetryPolicy.Unconditional;
IsTransientWhenIdempotent = policy != RetryPolicy.None;
IsTransient = statusCode.IsTransient();
IsTransientWhenIdempotent = statusCode.IsTransientWhenIdempotent();
// TODO: Add SQLSTATE message with order with https://en.wikipedia.org/wiki/SQLSTATE
}

Expand Down
21 changes: 1 addition & 20 deletions src/Ydb.Sdk/src/Pool/EndpointPool.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Immutable;
using Microsoft.Extensions.Logging;
using Ydb.Sdk.Ado.Internal;

namespace Ydb.Sdk.Pool;

Expand Down Expand Up @@ -152,23 +153,3 @@ private record PriorityEndpoint(string Endpoint)
}

public record EndpointSettings(long NodeId, string Endpoint, string LocationDc);

public interface IRandom
{
public int Next(int maxValue);
}

internal class ThreadLocalRandom : IRandom
{
internal static readonly ThreadLocalRandom Instance = new();

[ThreadStatic] private static Random? _random;

private static Random ThreadStaticRandom => _random ??= new Random();

private ThreadLocalRandom()
{
}

public int Next(int maxValue) => ThreadStaticRandom.Next(maxValue);
}
5 changes: 3 additions & 2 deletions src/Ydb.Sdk/test/Ydb.Sdk.Ado.Tests/Pool/EndpointPoolTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Collections.Immutable;
using Moq;
using Xunit;
using Ydb.Sdk.Ado.Internal;
using Ydb.Sdk.Ado.Tests.Utils;
using Ydb.Sdk.Pool;

Expand Down Expand Up @@ -112,7 +113,7 @@ public void PessimizeEndpoint_Reset_WhenPessimizedMajorityNodesThenResetAndAddNe
listNewEndpointSettings.Add(new EndpointSettings(6, "n6.ydb.tech", "VLA"));
listNewEndpointSettings.Add(new EndpointSettings(7, "n7.ydb.tech", "MAN"));

_endpointPool.Reset(listNewEndpointSettings.ToImmutableArray());
_endpointPool.Reset([..listNewEndpointSettings]);

for (var it = 0; it < listNewEndpointSettings.Count; it++)
{
Expand Down Expand Up @@ -141,7 +142,7 @@ public void PessimizeEndpoint_Reset_WhenResetNewNodes_ReturnRemovedNodes()
listNewEndpointSettings.Add(new EndpointSettings(6, "n6.ydb.tech", "VLA"));
listNewEndpointSettings.Add(new EndpointSettings(7, "n7.ydb.tech", "MAN"));

var removed = _endpointPool.Reset(listNewEndpointSettings.ToImmutableArray());
var removed = _endpointPool.Reset([..listNewEndpointSettings]);

Assert.Equal(2, removed.Length);
Assert.Equal("n1.ydb.tech", removed[0]);
Expand Down
Loading
Loading